In [1]:
from typing import Any, Optional
from pathlib import Path
import json
import glob

import numpy as np
import pandas as pd
import spacy
from nltk.corpus import stopwords

import gensim
import gensim.corpora as corpora
from gensim.utils import simple_preprocess
from gensim.models import CoherenceModel, TfidfModel

import pyLDAvis
import pyLDAvis.gensim_models as gensimvis

import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
pyLDAvis.enable_notebook()

## Tools

In [4]:
def get_data_path(file: Optional[str] = None) -> Path:
    return Path.joinpath(Path.cwd().parent, "data", file)


def read_data(file: str) -> None:
    with open(file, "r", encoding="utf-8") as f:
        data = json.load(f)
    return data


def export_data(file: str, data: Any) -> None:
    with open(file, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4)
        

def pipe(raw_input: Any, *functions, **functions_with_args) -> Any:
    output = raw_input

    if functions:
        for function in functions:
            output = function(output)

    if functions_with_args:
        for function, args_list in functions_with_args.items():
            output = eval(function)(output, *args_list)

    return output

## Preprocessing Tools

In [5]:
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
stop_words = stopwords.words("english")


def lemmatize(texts: list[str]) -> list[str]:
    def _lemmatize(text: str, allowed_postags: Optional[list[str]] = None) -> str:
        if allowed_postags is None:
            allowed_postags = ["NOUN", "ADJ", "VERB", "ADV"]

        doc = nlp(text)
        return " ".join([token.lemma_ for token in doc if token.pos_ in allowed_postags])
    
    return [_lemmatize(text) for text in texts]


def gensim_preprocess(lemmatized_texts: list[str]) -> list[list[str]]:
    def _gensim_preprocess(text: str) -> list[str]:
        return simple_preprocess(text, deacc=True)
    
    return [_gensim_preprocess(text) for text in lemmatized_texts]


def remove_stopwords(texts: list[list[str]]) -> list[list[str]]:
    def _remove_stopwords(words: list[str]) -> list[str]:
        return [word for word in words if word not in stop_words]
    
    return [_remove_stopwords(words) for words in texts]
    

def generate_ngrams(words: list[list[str]]):
    bigram_phrases = gensim.models.Phrases(words, min_count=5, threshold=50)
    trigram_phrases = gensim.models.Phrases(bigram_phrases[words], threshold=50)
    
    bigram = gensim.models.phrases.Phraser(bigram_phrases)
    trigram = gensim.models.phrases.Phraser(trigram_phrases)
    
    def _make_bigrams(words: list[list[str]]):
        return (bigram[doc] for doc in words)

    def _make_trigrams(words: list[list[str]]):
        return (trigram[bigram[doc]] for doc in words)
    
    data_bigrams = _make_bigrams(words)
    data_bigrams_trigrams = _make_trigrams(data_bigrams)
    
    return list(data_bigrams_trigrams)


def make_corpus(words: list[list[str]]) -> tuple[list[tuple[int, int]], corpora.dictionary.Dictionary]:
    id2word = corpora.Dictionary(words)
    return [id2word.doc2bow(text) for text in words], id2word


def apply_tfidf(corpus: list[tuple[int, int]], id2word: corpora.dictionary.Dictionary) -> list[tuple[int, int]]:
    tfidf = TfidfModel(corpus, id2word=id2word)
    
    low_value = 0.03
    words = []
    words_missing_in_tfidf = []
    
    for i in range(0, len(corpus)):
        bow = corpus[i]
        low_value_words = []
        
        tfidf_ids = [id for id, value in tfidf[bow]]
        bow_ids = [id for id, value in bow]
        low_value_words = [id for id, value in tfidf[bow] if value < low_value]
        drops = low_value_words + words_missing_in_tfidf

        for item in drops:
            words.append(id2word[item])

        words_missing_in_tfidf = [id for id in bow_ids if id not in tfidf_ids]

        new_bow = [b for b in bow if b[0] not in low_value_words and b[0] not in words_missing_in_tfidf]
        corpus[i] = new_bow
        
    return corpus, id2word


def preprocess_text(texts: list[str]) -> Any:
    return pipe(
        texts,
        lemmatize,
        gensim_preprocess,
        remove_stopwords,
        generate_ngrams,
        make_corpus,
    )

## Data

In [6]:
data = read_data(get_data_path("ushmm_dn.json"))["texts"]

In [9]:
sample = data[:2]

In [13]:
data = pd.read_csv(get_data_path("keychron_K2_reviews.csv"))["content"].values.tolist()[:-1]

### Sample

In [None]:
sample = data[0]

In [None]:
half_length = int(len(sample) / 2)

In [None]:
sample = [sample[:half_length]] + [sample[half_length:]]

## Preprocessing

In [10]:
corpus, id2word = preprocess_text(sample)

In [14]:
corpus, id2word = preprocess_text(data)

### TF-IDF

In [15]:
corpus, id2word = apply_tfidf(corpus, id2word)

## Model

In [16]:
lda_model = gensim.models.ldamodel.LdaModel(
    corpus=corpus,
    id2word=id2word,
    num_topics=10,
    random_state=100,
    update_every=1,
    chunksize=100,
    passes=10,
    alpha="auto"
)

## Visualization

In [19]:
vis = gensimvis.prepare(
    lda_model, 
    corpus, 
    id2word, 
    mds="mmds",
    R=10
)

  default_term_info = default_term_info.sort_values(
  from imp import reload
  from imp import reload
  from imp import reload
  from imp import reload


In [20]:
vis

## Export figure as html

In [None]:
pyLDAvis.save_html(vis, str(get_data_path("lda_keychron_reviews.html")))