In [None]:
import numpy as np
from datasets import load_dataset

# Dataset of victorian era sentences of different authors. Downloads from huggingface in parquet format 
data = load_dataset('contemmcm/victorian_authorship')

In [None]:
from sklearn.linear_model import RidgeClassifierCV, LogisticRegressionCV
from sklearn.decomposition import PCA
from sklearn.pipeline import make_pipeline
from sklearn.neighbors import KNeighborsClassifier, RadiusNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from copy import deepcopy
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, top_k_accuracy_score
from tqdm.auto import tqdm
from sklearn.ensemble import RandomForestClassifier
import pandas as pd


# Performs basic classification metrics for multi label classification. Return as dict to build dataframe later.
def perform_metric(y_true, y_pred):
    return {
        "acc": accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred, zero_division=np.nan, average='macro'),
        'recall': recall_score(y_true, y_pred, zero_division=np.nan, average='macro'),
        'f1': f1_score(y_true, y_pred, zero_division=np.nan, average='macro')
    }


# Finds human-readable name for a model. KNN adds number of neighbours to name
def get_name(model):
    if isinstance(model, KNeighborsClassifier):
        return f'{model.__class__.__name__}({str(model.n_neighbors)})'
    return model.__class__.__name__


#Runs train, evaluate pipeline on model zoo
def train_model(X_train, y_train, X_test, y_test):
    # Building zoo. Trying to reduce dimension by PCA with different number of components.
    components_zoo = [None, 50, 400]
    classifiers_zoo = [
        RidgeClassifierCV(),
        # Ridge classifier with cross-validation. Replaces classification task with regression one.  
        make_pipeline(StandardScaler(), LogisticRegressionCV()),
        # logistic regression. Works well only with scaled data.
        KNeighborsClassifier(50),  #KNN
        KNeighborsClassifier(100),  #KNN
        RandomForestClassifier(n_estimators=50),  # Select 50 to try to avoid overfit
        RadiusNeighborsClassifier(metric='cosine')  # Non-cosine works bad 
    ]
    zoo = []
    handles = []  # Names for index for resulting DF.

    for component in components_zoo:
        for model in classifiers_zoo:
            if component is None:  # No PCA at all. 
                zoo.append(deepcopy(model))
                handles.append(get_name(model))
            else:
                zoo.append(make_pipeline(PCA(component), model))  # Chain with PCA
                handles.append('PCA+' + get_name(model))

    train_results = []
    test_results = []

    for model in tqdm(zoo[len(train_results):]):
        model.fit(X_train, y_train)  # Not use fit_predict because not all classifiers support. 
        y_train_hat = model.predict(X_train)  # train preds
        y_test_hat = model.predict(X_test)  # test preds

        train_results.append(perform_metric(y_train, y_train_hat))
        test_results.append(perform_metric(y_test, y_test_hat))

    train_df = pd.DataFrame(train_results,
                            index=handles[:len(train_results)])  # Build train df. For consistency truncate handles
    test_df = pd.DataFrame(test_results, index=handles[:len(test_results)])  # Build test df

    return train_df, test_df

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer


# Will use weighted inverse frequencies as main feature

def prepare_data(train_split, test_split):
    vectorizer = TfidfVectorizer(max_features=1000, strip_accents='ascii', stop_words='english')
    # max features for speed reasons. remove accents for different encodings. remove stop words to potentially make features more useful 
    X_train, y_train = vectorizer.fit_transform(train_split['text']).toarray(), train_split['author']
    # Will use already fitted vocab
    X_test, y_test = vectorizer.transform(test_split['text']).toarray(), test_split['author']
    # Assert shapes match
    assert X_train.shape[1] == X_test.shape[1]
    return X_train, y_train, X_test, y_test

In [None]:
X_train, y_train, X_test, y_test = prepare_data(data['train'].shuffle().select(range(10000)),
                                                data['test'].shuffle().select(range(10000)))
# Data is to big. Taking subset
victorian_train, victorian_test = train_model(X_train, y_train, X_test, y_test)

In [None]:
victorian_train

In [None]:
victorian_test

In [None]:
print(f'Best test F1: {victorian_test["f1"].max()}')

In [None]:
# Blogs or comments. Should be harder 
blogs = load_dataset('night12/authorTextIdentification')
# Renaming for consistency
blogs = blogs.rename_column('author_id', 'author')

In [None]:
X_train, y_train, X_test, y_test = prepare_data(blogs['train'].shuffle().select(range(10000)),
                                                blogs['validation'].shuffle().select(range(6000)))

blogs_train, blogs_test = train_model(X_train, y_train, X_test, y_test)

In [None]:
blogs_train

In [None]:
blogs_test

In [None]:
print(f'Best test F1: {blogs_test["f1"].max()}')