# Binary Human-Written vs Machine-Generated Text Classification - Custom tokenizer + TF-IDF + ML

### Imports

In [1]:
# Standard Library Imports
import gc
import os

# Data Handling and Processing
import pandas as pd
import numpy as np

# Machine Learning Models
from sklearn.linear_model import SGDClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import VotingClassifier
from sklearn.svm import LinearSVC

# Model Selection, Model Evaluation and Metrics
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, f1_score

# Text Processing and Feature Extraction
from sklearn.feature_extraction.text import TfidfVectorizer


# Tokenizer and Preprocessing for NLP
from transformers import PreTrainedTokenizerFast
from tokenizers import (
    models,
    normalizers,
    pre_tokenizers,
    trainers,
    Tokenizer,
)

# Dataset Handling and Progress Bar
from datasets import Dataset
from tqdm.auto import tqdm

# Utils
import string
import time


### Data retrieval

In [2]:
def get_data(train_path, val_path, test_path):
    """Function to retrieve data from files

    Args:
        train_path (str): the path to the json training dataset
        val_path (str): the path to the json validation dataset
        test_path (str): the path to the json test dataset

    Returns:
        (pandas.Dataframe, pandas.Dataframe, pandas.Dataframe): the respective pandas dataframes
    """

    train_df = pd.read_json(train_path, lines=True)
    val_df = pd.read_json(val_path, lines=True)
    test_df = pd.read_json(test_path, lines=True)
    
    return train_df, val_df, test_df

train_path = "./data/subtaskA_train_monolingual.jsonl"
val_path = "./data/subtaskA_dev_monolingual.jsonl"
test_path = "./data/subtaskA_test_monolingual.jsonl"

train, val, test = get_data(train_path, val_path, test_path)

min_samples = train[train['label'] == 0]['source'].value_counts().min()
train_subset = train.groupby(['label', 'source']).sample(min_samples, random_state=42).reset_index(drop=True)

print(f"Length of training set: {len(train)}")
print(f"Length of sub-sampled training set: {len(train_subset)}")

print(f"Proportion of subset training set: {round(len(train_subset)/len(train)*100, 2)}%")

Length of training set: 119757
Length of sub-sampled training set: 23570
Proportion of subset training set: 19.68%


In [3]:
def dummy(text):
    """A dummy function to use as tokenizer for TfidfVectorizer. It returns the text as it is since we will have already tokenized it.
    """
    return text

### Pipeline to run custom tokenizer & compute TF-IDF features

In [4]:
class TfIdfPipeline:
    def __init__(self, train, target, split, lowercase=False):
        """Constructor for the TfIdfPipeline class

        Args:
            train (pandas Dataframe): training dataset
            target (pandas Dataframe): either validation or test dataset
            split (string): either 'val' or 'test', describes the target dataset
            lowercase (bool, optional): Whether or not to convert all words to lowercase. Defaults to False.
        """
        self.train = train
        self.target = target

        self.split = split
        assert self.split in ['val', 'test'], "split must be either 'val' or 'test'"

        self.lowercase = lowercase

    def trainingset_num_unique_words(self):
        """Function to get the number of unique words in the training dataset

        Returns:
            int: the number of unique words in the training dataset
        """
        unique_words = set()
        for text in self.train['text']:
            unique_words.update(text.lower().split())

        unique_words = {word.strip(string.punctuation) for word in unique_words}
        return len(unique_words)
    

    def get_tokenizer(self, num_unique_words):
        """Function to get a tokenizer for the dataset

        Args:
            num_unique_words (int): the number of unique words in the training dataset

        Returns:
            PreTrainedTokenizerFast: the tokenizer for the dataset
        """
        VOCAB_SIZE = num_unique_words // 2

        # Initializing the tokenizer with Byte-Pair Encoding (BPE) model.
        # The [UNK] token is used to represent unknown words during tokenization.
        raw_tokenizer = Tokenizer(models.BPE(unk_token="[UNK]"))

        # Configuring the tokenizer's normalization and pre-tokenization steps.
        # NFC normalization is applied for consistent character representation.
        raw_tokenizer.normalizer = normalizers.Sequence([normalizers.NFC()] + [normalizers.Lowercase()] if self.lowercase else [])
        raw_tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel()

        # Specifying special tokens for the tokenizer and initializing the BPE trainer.
        # The trainer is configured with the desired vocabulary size and the special tokens.
        special_tokens = ["[UNK]", "[SEP]"]
        trainer = trainers.BpeTrainer(vocab_size=VOCAB_SIZE, special_tokens=special_tokens)

        # Converting the val/test data to a Huggingface dataset for easier handling.
        target_dataset = Dataset.from_pandas(self.target[['text']])

        # Function to generate batches of text data for training.
        # This approach helps in managing memory usage when dealing with large datasets.
        def target_corpus_iter(): 
            for i in range(0, len(target_dataset), 1000):
                yield target_dataset[i : i + 1000]["text"]

        # Training the tokenizer on the dataset using the defined trainer.
        raw_tokenizer.train_from_iterator(target_corpus_iter(), trainer=trainer)

        # Wrapping the trained tokenizer with Huggingface's PreTrainedTokenizerFast for additional functionalities.
        # This step integrates the tokenizer with Huggingface's ecosystem, enabling easy use with their models.
        tokenizer = PreTrainedTokenizerFast(
            tokenizer_object=raw_tokenizer,
            unk_token="[UNK]",
            sep_token="[SEP]",
        )

        return tokenizer
    
    def tokenize(self, tokenizer):
        """Use the tokenizer to tokenize the text data in the training and target datasets

        Args:
            tokenizer (PreTrainedTokenizerFast): the tokenizer to use

        Returns:
            (list[list[str]], list[list[str]]): the tokenized text data for the training and target datasets
        """

        # Tokenizing the text data in the 'train' DataFrame and storing the results.
        print("Tokenizing training set")
        tokenized_texts_train = []
        for text in tqdm(self.train['text'].tolist()):
            tokenized_texts_train.append(tokenizer.tokenize(text))

        # Tokenizing the text data in the target DataFrame and storing the results.
        print(f"Tokenizing {self.split} set")
        tokenized_texts_target = []
        for text in tqdm(self.target['text'].tolist()):
            tokenized_texts_target.append(tokenizer.tokenize(text))

        return tokenized_texts_train, tokenized_texts_target
    
    def vectorize(self, tokenized_texts_train, tokenized_texts_target):
        """Function to vectorize the tokenized text data using the TfidfVectorizer

        Args:
            tokenized_texts_train (list[list[str]]): the tokenized text data for the training dataset
            tokenized_texts_target (list[list[str]]): the tokenized text data for the target dataset

        Returns:
            (scipy.sparse.csr_matrix, scipy.sparse.csr_matrix): the vectorized text data for the training and target datasets
        """

        # Initialize TfidfVectorizer for val set
        # Parameters: 
        # - ngram_range=(3, 5): Use 3 to 5 word n-grams.
        # - lowercase=True/False: Whether to mantain case sensitivity.
        # - sublinear_tf=True: Apply sublinear term frequency scaling.
        # - analyzer, tokenizer, preprocessor: Use custom 'dummy' functions.
        # - token_pattern=None: Disable default token pattern.
        # - strip_accents='unicode': Remove accents using Unicode.
        vectorizer = TfidfVectorizer(ngram_range=(3, 5), lowercase=self.lowercase, sublinear_tf=True, analyzer='word',
                                    tokenizer=dummy, preprocessor=dummy, token_pattern=None, strip_accents='unicode')

        # Fit vectorizer on val data to learn vocabulary
        print(f"Fitting Tf-Idf vectorizer to {self.split} set...", end=" ")
        start = time.time()
        vectorizer.fit(tokenized_texts_target)
        end = time.time()
        print(f"completed in {round(end - start)} seconds")
        vocab = vectorizer.vocabulary_  # Extract learned vocabulary

        # Reinitialize TfidfVectorizer for training set using target set's vocabulary
        vectorizer = TfidfVectorizer(ngram_range=(3, 5), lowercase=False, sublinear_tf=True, vocabulary=vocab,
                                    analyzer='word', tokenizer=dummy, preprocessor=dummy, token_pattern=None, 
                                    strip_accents='unicode')

        # Transform training and val data into TF-IDF vectors
        print("Fit-transforming Tf-Idf vectorizer to training set...", end=" ")
        start = time.time()
        tf_train = vectorizer.fit_transform(tokenized_texts_train)
        end = time.time()
        print(f"completed in {round(end - start)} seconds")

        print(f"Transforming {self.split} set...", end=" ")
        start = time.time()
        tf_target = vectorizer.transform(tokenized_texts_target)
        end = time.time()
        print(f"completed in {round(end - start)} seconds")

        # Cleanup: Free up memory
        del vectorizer
        gc.collect()

        return tf_train, tf_target
    
    def run(self):
        """Function to run the pipeline

        Returns:
            (scipy.sparse.csr_matrix, scipy.sparse.csr_matrix): the vectorized text data for the training and target datasets
        """
        num_unique_words = self.trainingset_num_unique_words()
        tokenizer = self.get_tokenizer(num_unique_words)
        tokenized_texts_train, tokenized_texts_target = self.tokenize(tokenizer)
        tf_train, tf_target = self.vectorize(tokenized_texts_train, tokenized_texts_target)
        return tf_train, tf_target

In [5]:
seeds = [42 , 91, 184, 333, 647]

### Hyperparameters search on ML Models

In [6]:
def hyperparameters_search(tf_train, y_train, tf_val, y_val):
    """Function to search for the best hyperparameters for the models

    Args:
        tf_train (scipy.sparse.csr_matrix): the vectorized training text data
        y_train (numpy.ndarray): the training labels
        tf_val (scipy.sparse.csr_matrix): the vectorized validation text data
        y_val (numpy.ndarray): the validation labels

    Returns:
        (dict, dict, dict): the best hyperparameters & respective validation accuracies and validation f1 scores for the models
    """
    model_names = ['MultinomialNB', 'SVM', 'SGD']
    hyperparams = [
        {'alpha': [0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0]},
        {'C': [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0]},
        {'alpha': [1e-7, 1e-6, 1e-5, 1e-4, 0.001, 0.01, 0.1]},
    ]

    best_hyperparams = {model_name: [] for model_name in model_names}
    val_accuracies = {model_name: [] for model_name in model_names}
    val_f1_scores = {model_name: [] for model_name in model_names}

    val_accuracies['Ensemble'] = []
    val_f1_scores['Ensemble'] = []

    for seed in seeds:
        print(f"======== Training with seed: {seed} =======")
        models = [MultinomialNB(), LinearSVC(max_iter=4000, random_state=seed), SGDClassifier(max_iter=8000, random_state=seed)]

        for model_name, model, hyperparam_grid in zip(model_names, models, hyperparams):

            gs = GridSearchCV(
                estimator=model, 
                param_grid=hyperparam_grid, 
                scoring='accuracy',
                cv=5,
                refit=True,
                verbose=True)
            
            print(f"Training {model_name}:", end=" ")
            
            start = time.time()
            gs.fit(tf_train, y_train)
            end = time.time()

            print(f"Completed in {round(end - start)} seconds")

            y_pred = gs.predict(tf_val)
            accuracy = accuracy_score(y_val, y_pred)
            f1 = f1_score(y_val, y_pred)

            best_hyperparams[model_name].append(gs.best_params_)
            val_accuracies[model_name].append(accuracy)
            val_f1_scores[model_name].append(f1)

        print(f"Training Ensemble: ")

        # The weights for the ensemble model are the validation accuracies of the individual models
        ensemble = VotingClassifier(
            estimators=[
                ('MultinomialNB', MultinomialNB(**best_hyperparams['MultinomialNB'][-1])),
                ('SVM', LinearSVC(**best_hyperparams['SVM'][-1], max_iter=4000, random_state=seed)),
                ('SGD', SGDClassifier(**best_hyperparams['SGD'][-1], max_iter=8000, random_state=seed)),
            ],
            weights=[val_accuracies[model_name][-1] for model_name in model_names], voting='hard', verbose=True)
        
        ensemble.fit(tf_train, y_train)

        gc.collect()

        val_preds = ensemble.predict(tf_val)
        accuracy = accuracy_score(y_val, val_preds)
        f1 = f1_score(y_val, val_preds)

        val_accuracies['Ensemble'].append(accuracy)
        val_f1_scores['Ensemble'].append(f1)
    
    return best_hyperparams, val_accuracies, val_f1_scores

In [7]:
def add_random_and_majority_baseline(y_train, y_target, accuracies, f1_scores):
    """Function to add the random and majority classifiers to the accuracies and f1_scores dictionaries

    Args:
        y_train (numpy.ndarray): the training labels
        y_target (numpy.ndarray): the target labels
        accuracies (dict): the accuracies of the other models so far
        f1_scores (dict): the f1 scores of the other models so far
    """
    # Random classifier
    y_preds = np.random.randint(2, size=len(y_target))
    acc = accuracy_score(y_target, y_preds)
    f1 = f1_score(y_target, y_preds)
    accuracies['Random'] = [acc for _ in seeds]
    f1_scores['Random'] = [f1 for _ in seeds]

    # Majority classifier
    most_common = np.bincount(y_train).argmax()
    y_preds = most_common * np.ones_like(y_target)
    acc = accuracy_score(y_target, y_preds)
    f1 = f1_score(y_target, y_preds)
    accuracies['Majority'] = [acc for _ in seeds]
    f1_scores['Majority'] = [f1 for _ in seeds]

In [8]:
def create_results_table(accuracies, f1_scores):
    """Function to create the results table

    Args:
        accuracies (dict): the validation or test accuracies
        f1_scores (dict): the validation or test f1 scores

    Returns:
        pandas.DataFrame: the results table
    """
    table = pd.DataFrame({
        'Seed': seeds + ['Mean', 'Std'],
        'Random Accuracy': accuracies['Random'] + [np.mean(accuracies['Random']), np.std(accuracies['Random'])],
        'Random F1': f1_scores['Random'] + [np.mean(f1_scores['Random']), np.std(f1_scores['Random'])],
        'Majority Accuracy': accuracies['Majority'] + [np.mean(accuracies['Majority']), np.std(accuracies['Majority'])],
        'Majority F1': f1_scores['Majority'] + [np.mean(f1_scores['Majority']), np.std(f1_scores['Majority'])],
        'MultinomialNB Accuracy': accuracies['MultinomialNB'] + [np.mean(accuracies['MultinomialNB']), np.std(accuracies['MultinomialNB'])],
        'MultinomialNB F1': f1_scores['MultinomialNB'] + [np.mean(f1_scores['MultinomialNB']), np.std(f1_scores['MultinomialNB'])],
        'SVM Accuracy': accuracies['SVM'] + [np.mean(accuracies['SVM']), np.std(accuracies['SVM'])],
        'SVM F1': f1_scores['SVM'] + [np.mean(f1_scores['SVM']), np.std(f1_scores['SVM'])],
        'SGD Accuracy': accuracies['SGD'] + [np.mean(accuracies['SGD']), np.std(accuracies['SGD'])],
        'SGD F1': f1_scores['SGD'] + [np.mean(f1_scores['SGD']), np.std(f1_scores['SGD'])],
        'Ensemble Accuracy': accuracies['Ensemble'] + [np.mean(accuracies['Ensemble']), np.std(accuracies['Ensemble'])],
        'Ensemble F1': f1_scores['Ensemble'] + [np.mean(f1_scores['Ensemble']), np.std(f1_scores['Ensemble'])],
    }) 

    return table

Run using the whole training set

In [9]:
tf_train, tf_val = TfIdfPipeline(train, val, 'val', lowercase=False).run()
y_train, y_val = train['label'].values, val['label'].values

best_hyperparams, val_accuracies, val_f1_scores = hyperparameters_search(tf_train, y_train, tf_val, y_val)

# Add random classifier and majority classifier to the results
add_random_and_majority_baseline(y_train, y_val, val_accuracies, val_f1_scores)


Tokenizing training set


  0%|          | 0/119757 [00:00<?, ?it/s]

Tokenizing val set


  0%|          | 0/5000 [00:00<?, ?it/s]

Fitting Tf-Idf vectorizer to val set... completed in 64 seconds
Fit-transforming Tf-Idf vectorizer to training set... Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "d:\Users\edo\envs\nlp\Lib\site-packages\IPython\core\interactiveshell.py", line 3548, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\merli\AppData\Local\Temp\ipykernel_11720\225727777.py", line 1, in <module>
    tf_train, tf_val = TfIdfPipeline(train, val, 'val', lowercase=False).run()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\merli\AppData\Local\Temp\ipykernel_11720\3121380320.py", line 167, in run
    tf_train, tf_target = self.vectorize(tokenized_texts_train, tokenized_texts_target)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\merli\AppData\Local\Temp\ipykernel_11720\3121380320.py", line 142, in vectorize
    tf_train = vectorizer.fit_transform(tokenized_texts_train)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\Users\edo\envs\nlp\Lib\site-packages\sklearn\feature_

: 

In [None]:
val_results_table = create_results_table(val_accuracies, val_f1_scores)

In [None]:
val_results_table

In [3]:
# Save the results to a csv file
results_dir = './predictions'

if not os.path.exists(results_dir):
    os.makedirs(results_dir)

val_results_table.to_csv(f'{results_dir}/tfidf_val_results.csv')

In [None]:
def test_models(best_hyperparameters, tf_train, y_train, tf_test, y_test, val_accuracies):
    """Function to test the models on the test dataset

    Args:
        best_hyperparameters (dict): the best hyperparameters for the models
        tf_train (scipy.sparse.csr_matrix): the vectorized training text data
        y_train (numpy.ndarray): the training labels
        tf_test (scipy.sparse.csr_matrix): the vectorized test text data
        y_test (numpy.ndarray): the test labels
        val_accuracies (dict): the validation accuracies of the models, used to weight the ensemble model

    Returns:
        (dict, dict): the test accuracies and test f1 scores for the models
    """
    model_names = ['MultinomialNB', 'SVM', 'SGD']

    test_accuracies = {model_name: [] for model_name in model_names}
    test_f1_scores = {model_name: [] for model_name in model_names}

    test_accuracies['Ensemble'] = []
    test_f1_scores['Ensemble'] = []

    for i, seed in enumerate(seeds):
        print(f"======== Training on test vocabulary with seed: {seed} =======")

        models = [MultinomialNB(), LinearSVC(max_iter=4000, random_state=seed), SGDClassifier(max_iter=8000, random_state=seed)]
        for model_name, model in zip(model_names, models):
            # Set the best hyperparameters for the model
            model.set_params(**best_hyperparameters[model_name][i])

            # Train the model on the training data
            print(f"Training {model_name}...", end=" ")
            
            start = time.time()
            model.fit(tf_train, y_train)
            end = time.time()

            print(f"completed in {round(end - start)} seconds")

            # Test the model on the test data
            y_preds = model.predict(tf_test)
            accuracy = accuracy_score(y_test, y_preds)
            f1 = f1_score(y_test, y_preds)

            test_accuracies[model_name].append(accuracy)
            test_f1_scores[model_name].append(f1)

        # Train the ensemble model on the training data, using the validation accuracies as weights
        print(f"Training Ensemble: ")

        ensemble = VotingClassifier(
            estimators=[
                ('MultinomialNB', MultinomialNB(**best_hyperparameters['MultinomialNB'][i])),
                ('SVM', LinearSVC(**best_hyperparameters['SVM'][i], max_iter=4000, random_state=seed)),
                ('SGD', SGDClassifier(**best_hyperparameters['SGD'][i], max_iter=8000, random_state=seed)),
            ],
            weights=[val_accuracies[model_name][i] for model_name in model_names], voting='hard', verbose=True)
        
        ensemble.fit(tf_train, y_train)

        gc.collect()

        # Test the ensemble model on the test data
        test_preds = ensemble.predict(tf_test)
        accuracy = accuracy_score(y_test, test_preds)
        f1 = f1_score(y_test, test_preds)

        test_accuracies['Ensemble'].append(accuracy)
        test_f1_scores['Ensemble'].append(f1)

    return test_accuracies, test_f1_scores

In [None]:
tf_train, tf_test = TfIdfPipeline(train, test, 'test', lowercase=False).run()
y_train, y_test = train['label'].values, test['label'].values

test_accuracies, test_f1_scores = test_models(best_hyperparams, tf_train, y_train, tf_test, y_test, val_accuracies)

add_random_and_majority_baseline(y_train, y_test, test_accuracies, test_f1_scores)

test_results_table = create_results_table(test_accuracies, test_f1_scores)

In [None]:
# save the results in a csv file
test_results_table.to_csv(f'{results_dir}/tfidf_test_results_full.csv')

In [None]:
test_results_table

Run using a balanced subset of the training set

In [None]:
tf_train_subset, tf_val = TfIdfPipeline(train_subset, val, 'val', lowercase=False).run()
y_train_subset, y_val = train_subset['label'].values, val['label'].values

best_hyperparams, val_accuracies, val_f1_scores = hyperparameters_search(tf_train_subset, y_train_subset, tf_val, y_val)

# Add random classifier and majority classifier to the results
add_random_and_majority_baseline(y_train_subset, y_val, val_accuracies, val_f1_scores)

val_results_table = create_results_table(val_accuracies, val_f1_scores)

val_results_table.to_csv(f'{results_dir}/tfidf_val_results_subset.csv')

In [None]:
tf_train_subset, tf_test = TfIdfPipeline(train_subset, test, 'test', lowercase=False).run()
y_train_subset, y_test = train_subset['label'].values, test['label'].values

test_accuracies, test_f1_scores = test_models(best_hyperparams, tf_train_subset, y_train_subset, tf_test, y_test, val_accuracies)

add_random_and_majority_baseline(y_train_subset, y_test, test_accuracies, test_f1_scores)

test_results_table = create_results_table(test_accuracies, test_f1_scores)

test_results_table.to_csv(f'{results_dir}/tfidf_test_results_subset.csv')