In [None]:
import re
import typing
import os

In [None]:
import pandas as pd
import nltk

nltk.download("stopwords")

from nltk.corpus import stopwords
# from nltk.stem import SnowballStemmer
from pymorphy3 import MorphAnalyzer
# from pymorphy2 import MorphAnalyzer

In [None]:
DATASET_RAW_PATH = os.path.realpath("./assets/data/raw-data/labeled.csv")
LABEL_X = "comment"
LABEL_Y = "toxic"
DATASET_LANG = "russian"

LOG = True

In [None]:
def load_data(data_path: str, extract_method: typing.Callable = pd.read_csv, **pandas_kwargs: dict[str, any]) -> typing.Collection:
    df = extract_method(data_path, **pandas_kwargs)
    if LOG:
        print("Data loaded!   Shape is:", df.shape)
        print(df.head(), "\n\n")
    return df

In [None]:
text_documents = load_data(DATASET_RAW_PATH)

In [None]:
def lemmatize_corpus(documents: pd.DataFrame, lemmatize: typing.Callable[[str], str], stopwords_list: typing.Iterable[str]):
    word_pattern = re.compile(r"[А-Яа-яЁёA-Za-z]+")
    ret = []
    for doc in documents:
        doc = doc.lower()
        doc_lemmas = [lemmatize(token.group()) for token in word_pattern.finditer(doc) if token.group() not in stopwords_list]
        ret.append(doc_lemmas)
    
    return ret

In [None]:
morph_analyzer = MorphAnalyzer()
lemmatize = lambda token: morph_analyzer.normal_forms(token)[0]
lemmatized_docs = lemmatize_corpus(text_documents[LABEL_X], lemmatize=lemmatize, stopwords_list = stopwords.words(DATASET_LANG))
lemmatized_docs

In [None]:
from collections import Counter
import heapq

In [None]:
def t_score_fn(trig: tuple[str], trig_freq: int, token_freqs: dict[str, int], n: int) -> float:
    unig_prod = token_freqs[trig[0]]
    for i in range(1, len(trig)):
        unig_prod *= token_freqs[trig[i]]

    return (trig_freq - (unig_prod /( n ** 2))) / (trig_freq ** .5)

In [None]:
def get_token_freqs(docs: list[list[str]]) -> dict[str, int]:
    token_freqs = Counter()
    for d in docs:
        token_freqs.update(d)
    return token_freqs

In [None]:
def get_trig_freqs(docs: list[list[str]]) -> dict[tuple[str], int]:
    trig_freqs = Counter()
    for d in docs:
        for trig_idx in range(len(d) - 2):
            trig_freqs[tuple(d[trig_idx : trig_idx + 3])] += 1
    
    return trig_freqs

In [None]:
def make_trigrams_tsocres(docs: list[list[str]]) -> dict[tuple[str], float]:
    scores = {}
    trig_freqs = get_trig_freqs(docs)
    token_freqs = get_token_freqs(docs)
    n = 0
    for d in docs:
        n += len(d)
    
    for trig, trig_freq in trig_freqs.items():
        scores[trig] = t_score_fn(trig, trig_freq, token_freqs, n)

    return scores

In [None]:
def get_top_k(scores: dict[tuple[str], float], k=10) -> list:
    return list(sorted(scores.items(), key = lambda x: x[1], reverse=True)[:k])

In [None]:
scores = make_trigrams_tsocres(lemmatized_docs)

In [None]:
get_top_k(scores, 30)

In [None]:
import nltk
from nltk.collocations import *


trigram_measures = nltk.collocations.TrigramAssocMeasures()
finder_thr = TrigramCollocationFinder.from_documents(lemmatized_docs)
finder_thr.nbest(trigram_measures.student_t, 30)