# Building and evaluating transformer models and their optimal hyperparameters
### This notebook focuses on building and evaluating transformer-based models for text classification. It includes data preprocessing, tokenization, model training, and evaluation using various performance metrics. Additionally, it leverages Optuna for hyperparameter tuning, optimizing parameters like learning rate, batch size, and number of epochs to achieve the best results.


## To be run on Google Colab in order to leverage GPU, this file is described in the following steps:

- Mount Google Drive folder and access the repository files
- Install necessary environments into GPU environment
- Load datasets from file path
- Establish model configurations
- Define helper functions for model training and optimisation process 
- Define helper functions for dataset preparation
- Run model


In [None]:
from google.colab import drive
drive.mount('/content/drive')

# On first time access, uncomment to clone the repository and save to Google Drive 
# This is to have access to the CSV datasets and code
# %cd /content/drive/MyDrive/
# !git clone https://github.com/hannahishimwe/FakeProfileDetection.git

# Uncomment to navigate to the repository folder
# %cd /content/drive/MyDrive/FakeProfileDetection

In [None]:
%%capture
!pip install pandas transformers datasets torch transformers[torch] ray[tune] emoji evaluate optuna

In [None]:
import pandas as pd
from transformers import RobertaTokenizer, RobertaForSequenceClassification, DistilBertTokenizer, DistilBertForSequenceClassification, TrainingArguments, Trainer, ElectraForSequenceClassification, ElectraTokenizer
from dev.CleanDatasets import CleanDatasets
from datasets import Dataset, Value
import optuna
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

from torch.utils.data import DataLoader
import torch.nn.functional as F
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt



In [None]:
PATH_TO_TRAINING_CSV = "/content/drive/MyDrive/FakeProfileDetection/dev/csv/structured_train.csv"
PATH_TO_TEST_CSV = "/content/drive/MyDrive/FakeProfileDetection/dev/csv/structured_test.csv"
PATH_TO_VALIDATION_CSV = "/content/drive/MyDrive/FakeProfileDetection/dev/csv/structured_validation.csv"

In [None]:
models_dict = {
    "roberta":{
        "model_name": "DistilBertForSequenceClassification",
        "tokenizer_name": "DistilBertTokenizerFast",
        "checkpoint":"distilroberta-base",
        "output_dir": "/content/drive/MyDrive/FakeProfileDetection/roberta_optuna",

    },
    "bert":{
        "model_name": "DistilBertForSequenceClassification",
        "tokenizer_name": "DistilBertTokenizerFast",
        "checkpoint":"distilbert-base-uncased",
        "output_dir": "/content/drive/MyDrive/FakeProfileDetection/distilbert_optuna",

    },
    "electra":{
        "model_name": "ElectraForSequenceClassification",
        "tokenizer_name": "ElectraTokenizerFast",
        "checkpoint":"google/electra-small-discriminator",
        "output_dir": "/content/drive/MyDrive/FakeProfileDetection/electra_optuna",

    }
}


In [None]:
columns_to_rename_dict = {"screen_name": "username", "account.type": "is_human"}
columns_to_drop_list = ["class_type"]
column_to_binary = "account.type"

def clean_dataframe(dataframe):
  df_cleaner = CleanDatasets(dataframe, columns_to_rename_dict, columns_to_drop_list, column_to_binary)
  clean_df = df_cleaner.clean_df()
  df_to_dataset = Dataset.from_pandas(clean_df)
  return  df_to_dataset

def tokenize_function(examples, tokenizer):
    return tokenizer(examples["text"], padding="max_length", truncation=True)


def process_dataset(dataset, tokenizer):
    # Tokenize dataset
    dataset = dataset.map(lambda x: tokenize_function(x, tokenizer), batched=True)

    # Rename target column and remove unnecessary ones
    dataset = (dataset.rename_column("is_human", "labels")
                      .remove_columns(["username"])  # Ensure "text" is tokenized before removing
                      .cast_column("labels", Value("int64")))
    return dataset

def prepare_datasets(train_dataset, validation_dataset, tokenizer):
    return process_dataset(train_dataset, tokenizer), process_dataset(validation_dataset, tokenizer)

In [None]:
"""
This script:
- uses the helper function to load in each dataset as a Pandas DataFrame
- adds them to a list to allow for iterative cleaning using the helper function
- returns a dictionary of cleaned datasets

"""
full_training_df = pd.read_csv(PATH_TO_TRAINING_CSV)
training_df = full_training_df.iloc[:5000].copy() #as advised to by supervisor, condense the dataset for faster training
validation_df = pd.read_csv(PATH_TO_VALIDATION_CSV)
testing_df = pd.read_csv(PATH_TO_TEST_CSV)

df_list = [training_df, validation_df, testing_df]
dataset_list = ["training_dataset", "validation_dataset", "testing_dataset"]
cleaned_datasets = {}

for i, df in enumerate(df_list):
    cleaned_datasets[dataset_list[i]] = clean_dataframe(df)

In [None]:
"""
This script defines all helper functions necessary to train model and facilitate hyperparameter tuning
"""

"""
Function to get tokenizer and model for a given model name from the previously establised dictionary of models

Params: model_key: str - key to identify model in models_dict
        num_labels: int - number of labels in the dataset
Returns: tokenizer: AutoTokenizer - tokenizer for the model
         model: AutoModelForSequenceClassification - model for the task

"""
def get_tokenizer_and_model(model_key, num_labels=2):
    model_info = models_dict.get(model_key)
    if not model_info:
        raise ValueError(f"Model '{model_key}' not found in models_dict")

    tokenizer = AutoTokenizer.from_pretrained(model_info['checkpoint'])
    model = AutoModelForSequenceClassification.from_pretrained(model_info['checkpoint'], num_labels=num_labels)

    return tokenizer, model

"""
Function to get the output directory from the previously establised dictionary of models

Params: model_key: str - key to identify model in models_dict
Returns: output_dir: str - output directory for the model
         
"""
def get_model_output_dir(model_key):
  model_info = models_dict.get(model_key)
  return model_info['output_dir']

"""
Function to compute evaluation metrics (accuracy, precision, recall, F1-score) through the trainer API
in the training process

Params: p: EvalPrediction - prediction object from the trainer API
Returns: dictionary of evaluation metrics
         
"""
def compute_metrics(p):
    preds = p.predictions.argmax(axis=-1)
    labels = p.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "precision": precision_score(labels, preds, average='binary'),
        "recall": recall_score(labels, preds, average='binary'),
        "f1": f1_score(labels, preds, average='binary')
    }

"""
Function which defines the optimization objective for hyperparameter tuning using Optuna,
this guides the trials to find the best hyperparameters for the model

Params: trial: Trial - trial object from Optuna
        model_key: str - key to identify model in models_dict
        train_tokenized: Dataset - tokenized training dataset
        test_tokenized: Dataset - tokenized test dataset
Returns: F1-score of the model
         
"""
def objective(trial, model_key, train_tokenized, test_tokenized):
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 5e-5)
    batch_size = trial.suggest_categorical('batch_size', [8, 16, 32])
    num_epochs = trial.suggest_int('num_epochs', 3, 5)

    tokenizer, model = get_tokenizer_and_model(model_key)
    output_dir = get_model_output_dir(model_key)

    training_args = TrainingArguments(
        output_dir=output_dir,
        evaluation_strategy="epoch",
        learning_rate=learning_rate,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        num_train_epochs=num_epochs,
        weight_decay=0.01,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_tokenized,
        eval_dataset=test_tokenized,
        compute_metrics=compute_metrics,
    )

    trainer.train()
    eval_results = trainer.evaluate()
    return eval_results['eval_f1']

"""
Function to run hyperparameter tuning using Optuna, optimising model performance to maximise F1-score over multiple trials.

Params: model_key: str - key to identify model in models_dict
        train_tokenized: Dataset - tokenized training dataset
        test_tokenized: Dataset - tokenized test dataset
        n_trials: int - number of trials to run
Returns: None
         
"""
def run_optuna(model_key, train_dataset, test_dataset, n_trials=4):
    tokenizer, _ = get_tokenizer_and_model(model_key)
    train_tokenized, validation_tokenized = prepare_datasets(cleaned_datasets["training_dataset"], cleaned_datasets["validation_dataset"], tokenizer)

    study = optuna.create_study(direction='maximize')
    study.optimize(lambda trial: objective(trial, model_key, train_tokenized, validation_tokenized), n_trials=n_trials)
    
    print("Best hyperparameters:", study.best_params)

In [None]:
"""
Run hyperparameter tuning for a particular model
"""
run_optuna("electra", cleaned_datasets["training_dataset"], cleaned_datasets["validation_dataset"])

In [None]:
"""
Uncomment to run hyperparameter tuning for all models
"""

# model_keys = list(models_dict.keys())
# for key in model_keys:
#   run_optuna(key, cleaned_datasets["training_dataset"], cleaned_datasets["validation_dataset"])