# Classification

In [None]:
%load_ext lab_black
# %load_ext nb_black # for jupyter nootebook
%load_ext autoreload
%autoreload 2

In [None]:
import string
import re
from time import time

import numpy as np
import pandas as pd

In [None]:
from tqdm.notebook import tqdm

In [None]:
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

In [None]:
import matplotlib
from matplotlib.cm import tab10
import matplotlib.pyplot as plt

plt.style.use("seaborn-notebook")

SMALL_SIZE = 12
MEDIUM_SIZE = 15
BIGGER_SIZE = 18

plt.rc("font", size=SMALL_SIZE)
plt.rc("axes", titlesize=BIGGER_SIZE)
plt.rc("axes", labelsize=MEDIUM_SIZE)
plt.rc("xtick", labelsize=SMALL_SIZE)
plt.rc("ytick", labelsize=SMALL_SIZE)
plt.rc("legend", fontsize=SMALL_SIZE)
plt.rc("figure", titlesize=BIGGER_SIZE)

r = matplotlib.patches.Rectangle(
    (0, 0), 1, 1, fill=False, edgecolor="none", visible=False
)

In [None]:
def load_datasets():
    ac_frame = pd.read_csv("../data/agatha_christie.csv")

    X_ac = ac_frame["text"]
    y_ac = ac_frame["book"]

    np_frame = pd.read_csv("../data/newspaper_articles.csv")

    X_np = np_frame["STORY"]
    y_np = np_frame["SECTION"].map(
        {0: "Politics", 1: "Technology", 2: "Entertainment", 3: "Business"}
    )

    ja_frame = pd.read_csv("../data/jane_austen.csv")

    X_ja = ja_frame["x_text"]
    y_ja = ja_frame["y_book"].apply(
        lambda x: " ".join(y.capitalize() for y in x.split("_"))
    )

    sh_frame = pd.read_csv("../data/sherlock_holmes.csv")

    X_sh = sh_frame["rawtext"]
    y_sh = sh_frame["label"].map(
        {
            0: "The Valley of Fear",
            1: "The Memoirs of Sherlock Holmes",
            2: "The Return of Sherlock Holmes",
            3: "Adventures of Sherlock Holmes",
        }
    )

    return [X_ac, X_sh, X_ja, X_np], [y_ac, y_sh, y_ja, y_np]

In [None]:
names = ["Agatha Christie", "Sherlock Holmes", "Jane Austen", "Newspapers"]
X_datasets, y_datasets = load_datasets()

In [None]:
stop_words = set(stopwords.words("english"))
stemmer = SnowballStemmer("english")
table = str.maketrans("", "", string.punctuation + "——")


def remove_stop_words_and_tokenize(text: str) -> str:
    tokens = word_tokenize(text)
    tokens = [w.lower() for w in tokens]

    stripped = [w.translate(table) for w in tokens]

    words = [word for word in stripped if word.isalpha()]

    words = [w for w in words if not w in stop_words]

    words = [stemmer.stem(w) for w in words]

    return " ".join(words)

In [None]:
X_prep = []

for name, X in tqdm(zip(names, X_datasets), total=4):
    X_prep.append(
        [remove_stop_words_and_tokenize(x) for x in tqdm(X.values, desc=name)]
    )

## Classification

In [None]:
import warnings

warnings.filterwarnings("ignore")

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score

from sklearn.preprocessing import LabelEncoder

from sklearn.decomposition import PCA, TruncatedSVD

from sklearn.feature_extraction.text import TfidfVectorizer

from umap import UMAP

from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import MultinomialNB, ComplementNB
from sklearn.tree import DecisionTreeClassifier

from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from xgboost import XGBClassifier

In [None]:
file_names = ["agatha_christie", "sherlock_holmes", "jane_austen", "newspaper_articles"]
embeddings = ["tfidf", "average_fasttext", "avg_glove", "distiluse", "roberta"]

In [None]:
X_all_emb = []
for embedding in embeddings[1:]:
    X_emb = []
    for file_name in tqdm(file_names, desc=embedding):
        X_emb.append(pd.read_csv(f"../data/{file_name}-{embedding}.csv", index_col=0))
    X_all_emb.append(X_emb)

In [None]:
rs1 = re.compile(r"[\ \(].*")
rs2 = re.compile(r"[<].*\.")

In [None]:
kfold = StratifiedKFold(shuffle=True, random_state=42)

for embedding, X_emb in tqdm(zip(embeddings[1:], X_all_emb), total=len(embeddings[1:])):

    for name, file_name, X, y_l in tqdm(
        zip(names, file_names, X_emb, y_datasets), desc=embedding, total=len(names)
    ):
        y = LabelEncoder().fit_transform(y_l)

        results = []

        for i, indexes in tqdm(enumerate(kfold.split(X.values, y)), total=5, desc=name):
            train_index, test_index = indexes

            for prep in tqdm(
                (
                    None,
                    PCA(n_components=50, random_state=42),
                ),
                desc=f"Split: {i}",
            ):
                
                prep_start_time = time()
                
                X_train = X.values[train_index]
                y_train = y[train_index]

                X_test = X.values[test_index]
                y_test = y[test_index]
                
                if prep is not None:
                    X_train = prep.fit_transform(X_train, y_train)
                    X_test = prep.transform(X_test)
                
                prep_end_time = time()

                for clf in tqdm(
                    (
                        KNeighborsClassifier(metric="cosine"),
                        LinearSVC(),
                        MLPClassifier(random_state=42),                        
                        CatBoostClassifier(thread_count=16, random_state=42, verbose=0),
                        LGBMClassifier(n_jobs=6, random_state=42, silent=True),
                        XGBClassifier(n_jobs=6, random_state=42, verbosity=0),
                    ),
                    desc=str(prep),
                ):
                    clf_start_time = time()
                    
                    clf.fit(X_train, y_train)

                    predicted = clf.predict(X_test)

                    acc = accuracy_score(y_true=y_test, y_pred=predicted)
                    
                    clf_end_time = time()
                    
                    results.append(
                        {
                            "embedding": embedding,
                            "name": name,
                            "split": i,
                            "preprocessing": str(prep),
                            "classifier": rs1.sub("", rs2.sub("", str(clf))).strip(),
                            "accuracy": acc,
                            "prep_time": prep_end_time - prep_start_time,
                            "clf_time": clf_end_time - clf_start_time,
                        }
                    )

        pd.DataFrame(results).round(4).to_csv(f"../results/{file_name}-{embedding}-results.csv")


In [None]:
from typing import Iterable, Any, Optional

from sklearn.base import TransformerMixin

from gensim.models import KeyedVectors

In [None]:
w2v = KeyedVectors.load_word2vec_format(
    "wiki-news-300d-1M.vec", binary=False, limit=200_000
)


class FastTextIDFTransformer(TransformerMixin):
    __splitter = re.compile(r"[\W_]")
    __table = str.maketrans("", "", string.punctuation)

    def __init__(self, func: str = "mean", stop_words: Iterable[str] = []) -> None:
        self.func = func
        self.stop_words = stop_words

    def fit(self, X: Any, y: Any = None) -> "FastTextTransformer":

        tfidf = TfidfVectorizer(stop_words=self.stop_words)
        tfidf.fit(X)

        self.max_idf = max(tfidf.idf_)

        self.idfs = {
            word: idf for word, idf in zip(tfidf.get_feature_names(), tfidf.idf_)
        }

        return self

    def _word_vec(self, word: str) -> Optional[np.ndarray]:

        if word in w2v:
            if word in self.idfs:
                return w2v[word] * self.idfs[word]
            else:
                return w2v[word] * self.max_idf

        return None

    def transform(self, X: Any, y: Any = None) -> np.ndarray:
        results = []

        for x in X:
            vec = []

            tokens = word_tokenize(x)

            for token in tokens:
                w = self._word_vec(token)

                if w is not None:
                    vec.append(w)
                    continue
                else:
                    for sub in self.__splitter.split(token):
                        w = self._word_vec(token)
                        if w is not None:
                            vec.append(w)
                            continue

            if self.func == "mean":
                results.append(np.mean(vec, axis=0))
            else:
                results.append(np.sum(vec, axis=0))

        return np.vstack(results)

    def fit_transform(self, X: Any, y: Any = None) -> np.ndarray:
        self.fit(X)

        return self.transform(X)

In [None]:
kfold = StratifiedKFold(shuffle=True, random_state=42)

signi_stop_words = {}

for embedding in tqdm(["tfidf", "tfidf_wosigni"]):

    for name, file_name, X, y_l in tqdm(
        zip(names, file_names, X_prep, y_datasets), desc=embedding, total=len(names)
    ):
        y = LabelEncoder().fit_transform(y_l)

        results = []
        
        if file_name.startswith("news") and embedding.endswith("gni"):
            continue
        
        X = np.array(X)

        for i, indexes in tqdm(enumerate(kfold.split(X, y)), total=5, desc=name):
            train_index, test_index = indexes

            for prep in tqdm(
                (
                    None,
                    PCA(n_components=50, random_state=42),
                    TruncatedSVD(n_components=50, random_state=42),
                ),
                desc=f"Split: {i}",
            ):
                        
                X_train = X[train_index]
                y_train = y[train_index]

                X_test = X[test_index]
                y_test = y[test_index]

                tfidf = TfidfVectorizer()

                if embedding == "tfidf":
                    tfidf = TfidfVectorizer()
                else:
                    tfidf = TfidfVectorizer(stop_words=signi_stop_words[name])

                X_train = tfidf.fit_transform(X_train)
                X_test = tfidf.transform(X_test)

                if embedding == "tfidf":
                    tree = DecisionTreeClassifier(max_depth=3, random_state=42)
                    tree.fit(X_train, y_train)

                    features = np.array(tfidf.get_feature_names())

                    ns_stop = features[tree.feature_importances_ > 0]

                    signi_stop_words.setdefault(name, set())

                    for ns in ns_stop:
                        signi_stop_words[name].add(ns)

                prep_start_time = time()

                if prep is not None:
                    X_train = prep.fit_transform(X_train, y_train)
                    X_test = prep.transform(X_test)

                prep_end_time = time()

                for clf in tqdm(
                    (
                        KNeighborsClassifier(metric="cosine"),
                        LinearSVC(),
                        MLPClassifier(random_state=42),
                        MultinomialNB(),
                        ComplementNB(),
                        LGBMClassifier(n_jobs=6, random_state=42, silent=True),
                        XGBClassifier(n_jobs=6, random_state=42, verbosity=0),
                    ),
                    desc=str(prep),
                ):

                    clf_start_time = time()

                    clf.fit(X_train, y_train)

                    predicted = clf.predict(X_test)

                    acc = accuracy_score(y_true=y_test, y_pred=predicted)

                    clf_end_time = time()

                    results.append(
                        {
                            "embedding": embedding,
                            "name": name,
                            "split": i,
                            "preprocessing": str(prep),
                            "classifier": rs1.sub("", rs2.sub("", str(clf))).strip(),
                            "accuracy": acc,
                            "prep_time": prep_end_time - prep_start_time,
                            "clf_time": clf_end_time - clf_start_time,
                        }
                    )

        pd.DataFrame(results).round(4).to_csv(
            f"../results/{file_name}-{embedding}-results.csv"
        )


In [None]:
kfold = StratifiedKFold(shuffle=True, random_state=42)

for embedding in tqdm(["fasttextidf", "fasttextidf_wosigni"]):

    for name, file_name, X, y_l in tqdm(
        zip(names, file_names, X_prep, y_datasets), desc=embedding, total=len(names)
    ):
        y = LabelEncoder().fit_transform(y_l)

        results = []

        if file_name.startswith("news") and embedding.endswith("gni"):
            continue

        X = np.array(X)

        for i, indexes in tqdm(enumerate(kfold.split(X, y)), total=5, desc=name):
            train_index, test_index = indexes

            for prep in tqdm(
                (
                    None,
                    PCA(n_components=50, random_state=42),
                ),
                desc=f"Split: {i}",
            ):

                X_train = X[train_index]
                y_train = y[train_index]

                X_test = X[test_index]
                y_test = y[test_index]

                if embedding == "fasttextidf":
                    tfidf = FastTextIDFTransformer()
                else:
                    tfidf = FastTextIDFTransformer(stop_words=signi_stop_words[name])

                X_train = tfidf.fit_transform(X_train)
                X_test = tfidf.transform(X_test)

                prep_start_time = time()

                if prep is not None:
                    X_train = prep.fit_transform(X_train, y_train)
                    X_test = prep.transform(X_test)

                prep_end_time = time()

                for clf in tqdm(
                    (
                        KNeighborsClassifier(metric="cosine"),
                        LinearSVC(),
                        MLPClassifier(random_state=42),
                        CatBoostClassifier(thread_count=16, random_state=42, verbose=0),
                        LGBMClassifier(n_jobs=6, random_state=42, silent=True),
                        XGBClassifier(n_jobs=6, random_state=42, verbosity=0),
                    ),
                    desc=str(prep),
                ):

                    clf_start_time = time()

                    clf.fit(X_train, y_train)

                    predicted = clf.predict(X_test)

                    acc = accuracy_score(y_true=y_test, y_pred=predicted)

                    clf_end_time = time()

                    results.append(
                        {
                            "embedding": embedding,
                            "name": name,
                            "split": i,
                            "preprocessing": str(prep),
                            "classifier": rs1.sub("", rs2.sub("", str(clf))).strip(),
                            "accuracy": acc,
                            "prep_time": prep_end_time - prep_start_time,
                            "clf_time": clf_end_time - clf_start_time,
                        }
                    )

        pd.DataFrame(results).round(4).to_csv(
            f"../results/{file_name}-{embedding}-results.csv"
        )