<a href="https://colab.research.google.com/github/Hillascher5/nlp-tweets-sentiment-analysis/blob/main/Evaluate_saved_models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Evaluate Saved Best Models

Evaluate models of all types, From Trainer API and manual code models.

In [None]:
# # Needed for Google Colab
# !pip install --quiet evaluate transformers optuna datasets nltk scikit-learn
# !pip install numpy==1.26.4

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from datasets import Dataset
from sklearn.metrics import classification_report
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import numpy as np
import random
import torch
import nltk
nltk.download('stopwords')

import os
os.environ["WANDB_DISABLED"] = "true"

In [None]:
def set_seed_all(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed_all(42)

#### Load test set

In [None]:
df_test = pd.read_csv("data/Corona_NLP_test_to_evaluate.csv", encoding='latin1')

In [None]:
# Mapping sentiments to unique numeric IDs
unique_labels = sorted(df_test["Sentiment"].unique())
label2id = {label: idx for idx, label in enumerate(unique_labels)}
df_test["label"] = df_test["Sentiment"].map(label2id)

In [None]:
# Minimal pre-processing
def light_preprocess(text):
    return text.strip()                             # Remove unnecessary spaces

is_preprocessed = "minimal_preprocess"
df_test["clean_text"] = df_test["OriginalTweet"].apply(light_preprocess)

In [None]:
# Convert DataFrame to Hugging Face Dataset
hf_test = Dataset.from_pandas(df_test[["clean_text", "label"]])

### Load saved models

In [None]:
# model_path_hf = "models/HF_Trainer/"
model_path_hf = "/content/drive/MyDrive/NLP_tweet_classification_covid19_project/models/HF_Trainer/"

bert_model_name_hf = "bert_best_model_stratify_maxl_256_minimal_preprocess_5000_samples_optuna"
bert_model = AutoModelForSequenceClassification.from_pretrained(model_path_hf + bert_model_name_hf)
bert_tokenizer = AutoTokenizer.from_pretrained(model_path_hf + bert_model_name_hf)

roberta_model_name_hf = "roberta_best_model_stratify_maxl_256_minimal_preprocess_5000_samples_optuna"
roberta_model = AutoModelForSequenceClassification.from_pretrained(model_path_hf + roberta_model_name_hf)
roberta_tokenizer = AutoTokenizer.from_pretrained(model_path_hf + roberta_model_name_hf)

deberta_model_name_hf = "deberta_best_model_stratify_maxl_128_minimal_preprocess_5000_samples_optuna"
deberta_model = AutoModelForSequenceClassification.from_pretrained(model_path_hf + deberta_model_name_hf)
deberta_tokenizer = AutoTokenizer.from_pretrained(model_path_hf + deberta_model_name_hf)

# model_path_manual = "models/Manual_finetune_min_preproc_5000_samples_opt/"
model_path_manual = "/content/drive/MyDrive/NLP_tweet_classification_covid19_project/models/Manual_finetune_min_preproc_5000_samples_opt/"

bert_model_name_manual = "manual2hf_bert-base-uncased_5000_samples_opt"
bert_model_man = AutoModelForSequenceClassification.from_pretrained(model_path_manual + bert_model_name_manual)
bert_tokenizer_man = AutoTokenizer.from_pretrained(model_path_manual + bert_model_name_manual)

roberta_model_name_manual = "manual2hf_roberta-base_5000_samples_opt"
roberta_model_man = AutoModelForSequenceClassification.from_pretrained(model_path_manual + roberta_model_name_manual)
roberta_tokenizer_man = AutoTokenizer.from_pretrained(model_path_manual + roberta_model_name_manual)

deberta_model_name_manual = "manual2hf_deberta-base_5000_samples_opt"
deberta_model_man = AutoModelForSequenceClassification.from_pretrained(model_path_manual + deberta_model_name_manual)
deberta_tokenizer_man = AutoTokenizer.from_pretrained(model_path_manual + deberta_model_name_manual)

In [None]:
# Tokenize function
def tokenize_function_bert(examples):
    return bert_tokenizer(examples["clean_text"], truncation=True, padding='max_length', max_length=256)

def tokenize_function_roberta(examples):
    return roberta_tokenizer(examples["clean_text"], truncation=True, padding='max_length', max_length=256)

def tokenize_function_deberta(examples):
    return deberta_tokenizer(examples["clean_text"], truncation=True, padding='max_length', max_length=128)

def tokenize_function_bert_man(examples):
    return bert_tokenizer_man(examples["clean_text"], truncation=True, padding='max_length', max_length=256)

def tokenize_function_roberta_man(examples):
    return roberta_tokenizer_man(examples["clean_text"], truncation=True, padding='max_length', max_length=256)

def tokenize_function_deberta_man(examples):
    return deberta_tokenizer_man(examples["clean_text"], truncation=True, padding='max_length', max_length=128)

In [None]:
# Tokenize
tokenized_bert_test = hf_test.map(tokenize_function_bert, batched=True)
tokenized_bert_test.set_format("torch", columns=["input_ids", "attention_mask", "label"])

tokenized_roberta_test = hf_test.map(tokenize_function_roberta, batched=True)
tokenized_roberta_test.set_format("torch", columns=["input_ids", "attention_mask", "label"])

tokenized_deberta_test = hf_test.map(tokenize_function_deberta, batched=True)
tokenized_deberta_test.set_format("torch", columns=["input_ids", "attention_mask", "label"])

tokenized_bert_test_man = hf_test.map(tokenize_function_bert_man, batched=True)
tokenized_bert_test_man.set_format("torch", columns=["input_ids", "attention_mask", "label"])

tokenized_roberta_test_man = hf_test.map(tokenize_function_roberta_man, batched=True)
tokenized_roberta_test_man.set_format("torch", columns=["input_ids", "attention_mask", "label"])

tokenized_deberta_test_man = hf_test.map(tokenize_function_deberta_man, batched=True)
tokenized_deberta_test_man.set_format("torch", columns=["input_ids", "attention_mask", "label"])

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)

    return {
        'accuracy': accuracy_score(labels, predictions),
        'f1_macro': f1_score(labels, predictions, average='macro'),
        'precision_macro': precision_score(labels, predictions, average='macro', zero_division=0),
        'recall_macro': recall_score(labels, predictions, average='macro'),
    }

In [None]:
# Define trainers
bert_trainer_hf = Trainer(model=bert_model, tokenizer=bert_tokenizer, compute_metrics=compute_metrics)
roberta_trainer_hf = Trainer(model=roberta_model, tokenizer=roberta_tokenizer, compute_metrics=compute_metrics)
deberta_trainer_hf = Trainer(model=deberta_model, tokenizer=deberta_tokenizer, compute_metrics=compute_metrics)

bert_trainer_man = Trainer(model=bert_model_man, tokenizer=bert_tokenizer_man, compute_metrics=compute_metrics)
roberta_trainer_man = Trainer(model=roberta_model_man, tokenizer=roberta_tokenizer_man, compute_metrics=compute_metrics)
deberta_trainer_man = Trainer(model=deberta_model_man, tokenizer=deberta_tokenizer_man, compute_metrics=compute_metrics)

In [None]:
# Evaluate
bert_metrics = bert_trainer_hf.evaluate(tokenized_bert_test)
print(bert_metrics)

roberta_metrics = roberta_trainer_hf.evaluate(tokenized_roberta_test)
print(roberta_metrics)

deberta_metrics = deberta_trainer_hf.evaluate(tokenized_deberta_test)
print(deberta_metrics)

bert_metrics_man = bert_trainer_man.evaluate(tokenized_bert_test_man)
print(bert_metrics_man)

roberta_metrics_man = roberta_trainer_man.evaluate(tokenized_roberta_test_man)
print(roberta_metrics_man)

deberta_metrics_man = deberta_trainer_man.evaluate(tokenized_deberta_test_man)
print(deberta_metrics_man)

In [None]:
# Define which metrics to keep
keys_to_include = [
    'eval_loss',
    'eval_accuracy',
    'eval_f1_macro',
    'eval_precision_macro',
    'eval_recall_macro'
]

# Organize metrics
metrics_table = pd.DataFrame([
    {"Model": "BERT (HF)", **{k: v for k, v in bert_metrics.items() if k in keys_to_include}},
    {"Model": "RoBERTa (HF)", **{k: v for k, v in roberta_metrics.items() if k in keys_to_include}},
    {"Model": "DeBERTa (HF)", **{k: v for k, v in deberta_metrics.items() if k in keys_to_include}},
    {"Model": "BERT (Manual)", **{k: v for k, v in bert_metrics_man.items() if k in keys_to_include}},
    {"Model": "RoBERTa (Manual)", **{k: v for k, v in roberta_metrics_man.items() if k in keys_to_include}},
    {"Model": "DeBERTa (Manual)", **{k: v for k, v in deberta_metrics_man.items() if k in keys_to_include}},
])

# Set index to model name for clarity
metrics_table.set_index("Model", inplace=True)

# Round values for better readability
metrics_table = metrics_table.round(4)

from IPython.display import display
display(metrics_table)

In [None]:
# Predictions for classification report
bert_preds = bert_trainer_hf.predict(tokenized_bert_test)
bert_y_pred = bert_preds.predictions.argmax(axis=-1)
bert_y_true = bert_preds.label_ids
print('BERT HF Trainer model:')
print(classification_report(bert_y_true, bert_y_pred, digits=4))

roberta_preds = roberta_trainer_hf.predict(tokenized_roberta_test)
roberta_y_pred = roberta_preds.predictions.argmax(axis=-1)
roberta_y_true = roberta_preds.label_ids
print('RoBERTa HF Trainer model:')
print(classification_report(roberta_y_true, roberta_y_pred, digits=4))

deberta_preds = deberta_trainer_hf.predict(tokenized_deberta_test)
deberta_y_pred = deberta_preds.predictions.argmax(axis=-1)
deberta_y_true = deberta_preds.label_ids
print('DeBERTa HF Trainer model:')
print(classification_report(deberta_y_true, deberta_y_pred, digits=4))

bert_preds_man = bert_trainer_man.predict(tokenized_bert_test_man)
bert_y_pred_man = bert_preds_man.predictions.argmax(axis=-1)
bert_y_true_man = bert_preds_man.label_ids
print('BERT manual code model:')
print(classification_report(bert_y_true_man, bert_y_pred_man, digits=4))

roberta_preds_man = roberta_trainer_man.predict(tokenized_roberta_test_man)
roberta_y_pred_man = roberta_preds_man.predictions.argmax(axis=-1)
roberta_y_true_man = roberta_preds_man.label_ids
print('RoBERTA manual code model:')
print(classification_report(roberta_y_true_man, roberta_y_pred_man, digits=4))

deberta_preds_man = deberta_trainer_man.predict(tokenized_deberta_test_man)
deberta_y_pred_man = deberta_preds_man.predictions.argmax(axis=-1)
deberta_y_true_man = deberta_preds_man.label_ids
print('DeBERTA manual code model:')
print(classification_report(deberta_y_true_man, deberta_y_pred_man, digits=4))


### Ensemble Evaluation

In [None]:
# Tokenize texts, run in eval mode and outputs softmax probabilities for each class
def model_probs(model, tokenizer, texts, batch_size=32, max_length=256):
    model.eval()
    inputs = tokenizer(
        list(texts),
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors="pt"
    )
    ds = TensorDataset(inputs["input_ids"], inputs["attention_mask"])
    dl = DataLoader(ds, batch_size=batch_size)

    probs_all = []
    with torch.no_grad():
        for input_ids, attn_mask in dl:
            input_ids = input_ids.to(device)
            attn_mask = attn_mask.to(device)
            logits = model(input_ids=input_ids, attention_mask=attn_mask).logits
            probs = torch.nn.functional.softmax(logits, dim=1).cpu().numpy()
            probs_all.append(probs)
    return np.concatenate(probs_all, axis=0)

In [None]:
# Load best finetuned checkpoints
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

best_bert_path = f"models/HF_Trainer/bert_best_model_stratify_maxl_256_{is_preprocessed}_{num_train_samples}_samples_optuna"
bert_model = AutoModelForSequenceClassification.from_pretrained(best_bert_path).to(device)
bert_tokenizer = AutoTokenizer.from_pretrained(best_bert_path)

best_roberta_path = f"models/HF_Trainer/roberta_best_model_stratify_maxl_256_{is_preprocessed}_{num_train_samples}_samples_optuna"
roberta_model = AutoModelForSequenceClassification.from_pretrained(best_roberta_path).to(device)
roberta_tokenizer = AutoTokenizer.from_pretrained(best_roberta_path)

best_deberta_path = f"models/HF_Trainer/deberta_best_model_stratify_maxl_128_{is_preprocessed}_{num_train_samples}_samples_optuna"
deberta_model = AutoModelForSequenceClassification.from_pretrained(best_deberta_path).to(device)
deberta_tokenizer = AutoTokenizer.from_pretrained(best_deberta_path)

val_texts = list(val_df["clean_text"])
val_labels = val_df["label"].values

test_texts = list(test_df["clean_text"])
test_labels = test_df["label"].values

# Generate class probability predictions on validation and test
bert_val_probs= model_probs(bert_model, bert_tokenizer, val_texts,  batch_size=32, max_length=256)
roberta_val_probs = model_probs(roberta_model, roberta_tokenizer, val_texts, batch_size=32, max_length=256)
deberta_val_probs = model_probs(deberta_model, deberta_tokenizer, val_texts, batch_size=32, max_length=128)

bert_test_probs = model_probs(bert_model, bert_tokenizer, test_texts,  batch_size=32, max_length=256)
roberta_test_probs = model_probs(roberta_model, roberta_tokenizer, test_texts, batch_size=32, max_length=256)
deberta_test_probs = model_probs(deberta_model, deberta_tokenizer, test_texts, batch_size=32, max_length=128)

In [None]:
def ensemble_probs(probs_list, weights):
    w = np.array(weights, dtype=np.float32)
    w = w / w.sum()
    out = np.zeros_like(probs_list[0], dtype=np.float32)
    for wi, pi in zip(w, probs_list):
        out += wi * pi
    return out

def metrics_from_probs(probs, y_true):
    preds = probs.argmax(axis=1)
    return {
        "accuracy": accuracy_score(y_true, preds),
        "macro_f1": f1_score(y_true, preds, average="macro")
    }

def grid_search_weights_val(probs_list, y_true, step=0.05):
    n = len(probs_list)
    best = {"weights": None, "macro_f1": -1.0, "accuracy": 0.0}
    grid = np.arange(0.0, 1.0 + 1e-9, step)

    if n == 2:
        for w0 in grid:
            w = [w0, 1.0 - w0]
            scores = metrics_from_probs(ensemble_probs(probs_list, w), y_true)
            if scores["macro_f1"] > best["macro_f1"]:
                best = {"weights": w, **scores}
        return best

    if n == 3:
        for w0 in grid:
            for w1 in grid:
                s = w0 + w1
                if s <= 1.0 + 1e-9:
                    w2 = 1.0 - s
                    w = [w0, w1, w2]
                    scores = metrics_from_probs(ensemble_probs(probs_list, w), y_true)
                    if scores["macro_f1"] > best["macro_f1"]:
                        best = {"weights": w, **scores}
        return best

    raise ValueError("Only supports 2 or 3 models in this helper.")

# Lists for 3-model case (BERT + RoBERTa + DeBerta)
val_probs_three_list  = [bert_val_probs, roberta_val_probs, deberta_val_probs]
test_probs_three_list = [bert_test_probs, roberta_test_probs, deberta_test_probs]

# Lists for 2-model case (BERT/RoBERTa/DeBerta)
val_probs_two_list  = [bert_val_probs, deberta_val_probs]
test_probs_two_list = [bert_test_probs, deberta_test_probs]

# Grid search on validation
best_three = grid_search_weights_val(val_probs_three_list, val_labels, step=0.05)
print("3-Ensemble Best weights on validation:", best_three)

# Apply best weights to test
ens_test_probs_three = ensemble_probs(test_probs_three_list, best_three["weights"])
test_scores_three = metrics_from_probs(ens_test_probs_three, test_labels)

print(f" Test (weighted 3-ensemble) Accuracy: {test_scores_three['accuracy']:.4f}")
print(f" Test (weighted 3-ensemble) Macro-F1: {test_scores_three['macro_f1']:.4f}")

# Grid search on validation
best_two = grid_search_weights_val(val_probs_two_list, val_labels, step=0.05)
print("2-Ensemble Best weights on validation:", best_two)

# Apply best weights to test
ens_test_probs_two = ensemble_probs(test_probs_two_list, best_two["weights"])
test_scores_two = metrics_from_probs(ens_test_probs_two, test_labels)

print(f" Test (weighted 2-ensemble) Accuracy: {test_scores_two['accuracy']:.4f}")
print(f" Test (weighted 2-ensemble) Macro-F1: {test_scores_two['macro_f1']:.4f}")

In [None]:
# from IPython.display import Javascript

# def disconnect_runtime():
#     display(Javascript('google.colab.kernel.disconnect()'))

# disconnect_runtime()