In [None]:
# 1: Set up environment and mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Set up project paths
import os

# Define project directory
project_dir = '/content/drive/MyDrive/final-sms-scam-detection'
os.chdir(project_dir)
print(f"Working directory: {os.getcwd()}")

In [None]:
# Install Libraries
!pip install torch transformers datasets evaluate peft optuna matplotlib seaborn scikit-learn pandas numpy captum lime -q

In [None]:
# 3: Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
import time
import re
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.utils.class_weight import compute_class_weight
import lime
from lime.lime_text import LimeTextExplainer
from captum.attr import IntegratedGradients
from pathlib import Path
import evaluate

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding
)

from peft import (
    LoraConfig,
    TaskType,
    get_peft_model,
)

from datasets import Dataset, DatasetDict
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    matthews_corrcoef, roc_auc_score, average_precision_score,
    confusion_matrix, classification_report, roc_curve, precision_recall_curve
)
import warnings
import optuna

In [None]:
# Set plotting style
sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (12, 8)

In [None]:
# Set random seed for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(RANDOM_SEED)

In [None]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Set CUDA_LAUNCH_BLOCKING for better error messages
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

# Ignore specific warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="transformers")

In [None]:
# 4: Load data
# Define paths
data_dir = "data/processed/"
model_dir = "models/llm/"
results_dir = "results/"

# Create directories if they don't exist
os.makedirs(model_dir, exist_ok=True)
os.makedirs(os.path.join(results_dir, "metrics"), exist_ok=True)
os.makedirs(os.path.join(results_dir, "visualizations"), exist_ok=True)

# Load data
train_df = pd.read_csv(os.path.join(data_dir, "train.csv"))
val_df = pd.read_csv(os.path.join(data_dir, "val.csv"))
test_df = pd.read_csv(os.path.join(data_dir, "test.csv"))

print(f"Loaded data: Train: {len(train_df)}, Validation: {len(val_df)}, Test: {len(test_df)}")

In [None]:
# Check your data structure
print("\nData structure:")
print("Train columns:", train_df.columns.tolist())
print("Sample row:", train_df.iloc[0])

# Check class distribution (for balancing)
print("\nClass distribution:")
for split_name, df in [("Train", train_df), ("Val", val_df), ("Test", test_df)]:
    class_counts = df['label'].value_counts()
    print(f"{split_name}: {class_counts.to_dict()}")
    print(f"  Imbalance ratio: {class_counts.max() / class_counts.min():.2f}")

In [None]:
# Setup label mapping for consistency
id2label = {0: "Legitimate", 1: "Scam"}
label2id = {"Legitimate": 0, "Scam": 1}

# Convert pandas DataFrames to Hugging Face datasets
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

# Create a DatasetDict to group the datasets
dataset_dict = DatasetDict({
    "train": train_dataset,
    "valid": val_dataset,
    "test": test_dataset
})

print("\nDataset structure:")
print(dataset_dict)

In [None]:
# 5: Define helper functions

# Text preprocessing function
def preprocess_function(examples, tokenizer, max_length=128):
    """Tokenize and preprocess text examples.

    Args:
        examples: Dictionary of examples to process.
        tokenizer: Tokenizer to use for tokenization.
        max_length: Maximum sequence length.

    Returns:
        Dictionary of tokenized examples.
    """
    # FIXED: Use "message" column (original text) instead of cleaned_text
    text_column = "message" if "message" in examples else "cleaned_text"
    return tokenizer(
        examples[text_column],
        max_length=max_length,
        padding='max_length',
        truncation=True
    )

# Function to tokenize all datasets
def tokenize_data(dataset_dict, tokenizer, max_length=128):
    """Tokenize all splits in a dataset dictionary.

    Args:
        dataset_dict: Dictionary of datasets to tokenize.
        tokenizer: Tokenizer to use.
        max_length: Maximum sequence length.

    Returns:
        Dictionary of tokenized datasets.
    """
    return dataset_dict.map(
        lambda examples: preprocess_function(examples, tokenizer, max_length),
        batched=True
    )

# Function to compute evaluation metrics
def compute_metrics(eval_pred):
    """Compute evaluation metrics for model predictions.

    Args:
        eval_pred: Tuple of (predictions, labels).

    Returns:
        Dictionary of metrics.
    """
    # Get predictions and labels
    predictions, labels = eval_pred

    # Apply softmax to get probabilities
    probabilities = torch.nn.functional.softmax(torch.tensor(predictions), dim=-1).numpy()

    # Use probabilities of the positive class for ROC AUC and PR AUC
    positive_class_probs = probabilities[:, 1]

    # Predict most probable class
    predicted_classes = np.argmax(predictions, axis=1)

    # Calculate metrics
    accuracy = accuracy_score(labels, predicted_classes)
    precision = precision_score(labels, predicted_classes)
    recall = recall_score(labels, predicted_classes)
    f1 = f1_score(labels, predicted_classes)
    mcc = matthews_corrcoef(labels, predicted_classes)
    roc_auc = roc_auc_score(labels, positive_class_probs)
    pr_auc = average_precision_score(labels, positive_class_probs)

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "mcc": mcc,
        "roc_auc": roc_auc,
        "pr_auc": pr_auc
    }

# Function to create data collator
def create_data_collator(tokenizer):
    """Create a data collator for batching examples.

    Args:
        tokenizer: Tokenizer to use.

    Returns:
        Data collator object.
    """
    return DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
class WeightedTrainer(Trainer):
    def __init__(self, class_weights=None, **kwargs):
        super().__init__(**kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")

        if self.class_weights is not None:
            # Apply class weights to handle imbalance
            loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
            loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        else:
            loss = outputs.loss

        return (loss, outputs) if return_outputs else loss

In [None]:
def calculate_class_weights(dataset, device):
    """Calculate class weights for imbalanced dataset."""
    # Extract labels from dataset
    labels = [example['label'] for example in dataset]

    # Calculate class weights
    class_weights = compute_class_weight(
        class_weight='balanced',
        classes=np.unique(labels),
        y=labels
    )

    # Convert to tensor
    class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(device)

    print(f"Class weights: Legitimate={class_weights[0]:.3f}, Scam={class_weights[1]:.3f}")
    return class_weights_tensor

In [None]:
def evaluate_baseline_model(model_name, tokenizer, test_dataset, device):
    """Evaluate baseline model performance before fine-tuning."""
    print(f"\n{'='*50}")
    print(f"Evaluating baseline model: {model_name}")
    print(f"{'='*50}")

    # Load pre-trained model without fine-tuning
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=2,
        id2label=id2label,
        label2id=label2id
    ).to(device)

    # Set up minimal training args for evaluation
    training_args = TrainingArguments(
        output_dir="./temp_baseline",
        per_device_eval_batch_size=32,
        report_to="none"
    )

    # Create trainer for evaluation
    trainer = Trainer(
        model=model,
        args=training_args,
        tokenizer=tokenizer,
        data_collator=create_data_collator(tokenizer),
        compute_metrics=compute_metrics
    )

    # Evaluate on test set
    results = trainer.evaluate(test_dataset)

    print(f"Baseline Results for {model_name}:")
    for metric, value in results.items():
        if metric.startswith('eval_'):
            print(f"  {metric}: {value:.4f}")

    # Clean up
    del model
    del trainer
    torch.cuda.empty_cache()

    return results

In [None]:
# Explainability functions
def setup_lime_explainer(model, tokenizer, device, class_names=['Legitimate', 'Scam']):
    """Set up LIME explainer for model interpretability."""

    def predict_proba(texts):
        """Prediction function for LIME."""
        # Tokenize texts
        inputs = tokenizer(
            texts,
            max_length=128,
            padding=True,
            truncation=True,
            return_tensors="pt"
        ).to(device)

        # Get predictions
        model.eval()
        with torch.no_grad():
            outputs = model(**inputs)
            probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)

        return probabilities.cpu().numpy()

    # Create LIME explainer
    explainer = LimeTextExplainer(class_names=class_names)

    return explainer, predict_proba

In [None]:
def explain_prediction(explainer, predict_fn, text, num_features=10):
    """Generate explanation for a single prediction."""
    explanation = explainer.explain_instance(
        text,
        predict_fn,
        num_features=num_features,
        num_samples=1000
    )
    return explanation

In [None]:
# Function to visualize training metrics
# Fixed visualization function to handle metrics with different lengths
def visualize_training_metrics(metrics, model_name):
    """Visualize training and evaluation metrics with improved handling of different length metrics.

    Args:
        metrics: Dictionary of training metrics.
        model_name: Name of the model for saving files.
    """
    # Create directory for plots
    results_dir = Path(f"results/visualizations/{model_name}")
    results_dir.mkdir(parents=True, exist_ok=True)

    # Extract metrics, handling missing values
    train_loss = [x['loss'] for x in metrics if 'loss' in x]
    eval_loss = [x['eval_loss'] for x in metrics if 'eval_loss' in x]

    # Make sure we have data to plot
    if train_loss and eval_loss:
        # Use the minimum length to avoid dimension mismatch
        min_length = min(len(train_loss), len(eval_loss))
        epochs = range(1, min_length + 1)

        # Plot training and evaluation loss
        plt.figure(figsize=(12, 6))
        plt.plot(epochs, train_loss[:min_length], label='Training Loss', marker='o')
        plt.plot(epochs, eval_loss[:min_length], label='Validation Loss', marker='o')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.title('Training Loss vs Validation Loss')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(results_dir / "loss_curve.png")
        plt.show()

    # Extract evaluation metrics
    if 'eval_accuracy' in metrics[0]:
        eval_accuracy = [x.get('eval_accuracy', None) for x in metrics if 'eval_loss' in x]
        eval_f1 = [x.get('eval_f1', None) for x in metrics if 'eval_loss' in x]
        eval_mcc = [x.get('eval_mcc', None) for x in metrics if 'eval_loss' in x]

        # Remove None values
        eval_accuracy = [x for x in eval_accuracy if x is not None]
        eval_f1 = [x for x in eval_f1 if x is not None]
        eval_mcc = [x for x in eval_mcc if x is not None]

        # Plot each metric separately to avoid dimension issues
        if eval_accuracy:
            plt.figure(figsize=(10, 6))
            plt.plot(range(1, len(eval_accuracy) + 1), eval_accuracy, label='Accuracy', marker='o', color='blue')
            plt.xlabel('Epochs')
            plt.ylabel('Score')
            plt.title('Validation Accuracy')
            plt.legend()
            plt.grid(True)
            plt.tight_layout()
            plt.savefig(results_dir / "accuracy_curve.png")
            plt.show()

        if eval_f1:
            plt.figure(figsize=(10, 6))
            plt.plot(range(1, len(eval_f1) + 1), eval_f1, label='F1 Score', marker='o', color='green')
            plt.xlabel('Epochs')
            plt.ylabel('Score')
            plt.title('Validation F1 Score')
            plt.legend()
            plt.grid(True)
            plt.tight_layout()
            plt.savefig(results_dir / "f1_curve.png")
            plt.show()

        if eval_mcc:
            plt.figure(figsize=(10, 6))
            plt.plot(range(1, len(eval_mcc) + 1), eval_mcc, label='MCC', marker='o', color='purple')
            plt.xlabel('Epochs')
            plt.ylabel('Score')
            plt.title('Validation MCC')
            plt.legend()
            plt.grid(True)
            plt.tight_layout()
            plt.savefig(results_dir / "mcc_curve.png")
            plt.show()

# Function to visualize confusion matrix
def plot_confusion_matrix(cm, classes, model_name):
    """Plot confusion matrix.

    Args:
        cm: Confusion matrix to plot.
        classes: List of class names.
        model_name: Name of the model for saving files.
    """
    results_dir = Path(f"results/visualizations/{model_name}")
    results_dir.mkdir(parents=True, exist_ok=True)

    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title("Confusion Matrix")
    plt.tight_layout()
    plt.savefig(results_dir / "confusion_matrix.png")
    plt.show()

# Function to plot ROC curve
def plot_roc_curve(fpr, tpr, roc_auc, model_name):
    """Plot ROC curve.

    Args:
        fpr: False positive rates.
        tpr: True positive rates.
        roc_auc: ROC AUC score.
        model_name: Name of the model for saving files.
    """
    results_dir = Path(f"results/visualizations/{model_name}")
    results_dir.mkdir(parents=True, exist_ok=True)

    plt.figure(figsize=(10, 8))
    plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC Curve (AUC = {roc_auc:.3f})')
    plt.plot([0, 1], [0, 1], color='gray', lw=2, linestyle='--')
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("Receiver Operating Characteristic (ROC) Curve")
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(results_dir / "roc_curve.png")
    plt.show()

# Function to plot PR curve
def plot_pr_curve(precision, recall, pr_auc, model_name):
    """Plot precision-recall curve.

    Args:
        precision: Precision values.
        recall: Recall values.
        pr_auc: PR AUC score.
        model_name: Name of the model for saving files.
    """
    results_dir = Path(f"results/visualizations/{model_name}")
    results_dir.mkdir(parents=True, exist_ok=True)

    plt.figure(figsize=(10, 8))
    plt.step(recall, precision, color='green', lw=2, where='post',
             label=f'PR Curve (AUC = {pr_auc:.3f})')
    plt.fill_between(recall, precision, alpha=0.2, color='green', step='post')
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Precision-Recall Curve")
    plt.legend(loc="lower left")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(results_dir / "pr_curve.png")
    plt.show()

In [None]:
def train_model(model, tokenizer, tokenized_data, training_args, model_name):
    """Train a model and evaluate it on validation data.
    Args:
        model: Model to train.
        tokenizer: Tokenizer to use.
        tokenized_data: Tokenized dataset.
        training_args: Training arguments.
        model_name: Name of the model for saving.
    Returns:
        Trained Trainer object.
    """
    print(f"\n=== Training {model_name} ===")

    # Create results directory
    results_dir = Path(f"results/{model_name}")
    results_dir.mkdir(parents=True, exist_ok=True)

    # Calculate class weights for handling imbalance
    class_weights = calculate_class_weights(tokenized_data["train"], device)

    # Count trainable parameters
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    total_params = sum(p.numel() for p in model.parameters())
    print(f"Trainable parameters: {trainable_params:,} ({trainable_params / total_params:.2%} of total)")

    # Use WeightedTrainer instead of regular Trainer
    trainer = WeightedTrainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_data["train"],
        eval_dataset=tokenized_data["valid"],
        tokenizer=tokenizer,
        data_collator=create_data_collator(tokenizer),
        compute_metrics=compute_metrics,
        class_weights=class_weights
    )

    # Start timing the training process
    start_time = time.time()

    # Train the model
    train_result = trainer.train()

    # End timing and calculate training time
    end_time = time.time()
    training_time = end_time - start_time
    training_time_minutes = round(training_time / 60, 2)

    # Print training time and metrics
    print(f"\nTraining completed in {training_time_minutes} minutes")
    print(f"Training Loss: {train_result.training_loss:.4f}")

    # Run evaluation
    eval_results = trainer.evaluate()
    print("\nValidation Results:")
    for key, value in eval_results.items():
        print(f"{key}: {value:.4f}")

    # Save the model
    model_save_path = results_dir / "final_model"
    trainer.save_model(str(model_save_path))
    print(f"Model saved to {model_save_path}")

    # Visualize training metrics
    visualize_training_metrics(trainer.state.log_history, model_name)

    # Save training metrics
    metrics_file = results_dir / "training_metrics.json"
    with open(metrics_file, "w") as f:
        json.dump({
            "training_time_seconds": training_time,
            "training_time_minutes": training_time_minutes,
            "final_train_loss": train_result.training_loss,
            "eval_results": eval_results
        }, f, indent=4)

    return trainer

In [None]:
def evaluate_model(trainer, tokenized_test_data, model_name):
    """Evaluate a trained model on test data.
    Args:
        trainer: Trained Trainer object.
        tokenized_test_data: Tokenized test dataset.
        model_name: Name of the model for saving results.
    Returns:
        Dictionary of evaluation results.
    """
    print(f"\n=== Evaluating {model_name} on Test Set ===")

    # Run prediction on test set
    test_results = trainer.predict(tokenized_test_data)

    # Extract predictions and labels
    predictions = test_results.predictions
    labels = test_results.label_ids

    # Apply softmax to get probabilities
    probabilities = torch.nn.functional.softmax(torch.tensor(predictions), dim=-1).numpy()

    # Use probabilities of the positive class for ROC AUC and PR AUC
    positive_class_probs = probabilities[:, 1]

    # Predict most probable class
    predicted_classes = np.argmax(predictions, axis=1)

    # Calculate metrics
    accuracy = accuracy_score(labels, predicted_classes)
    precision = precision_score(labels, predicted_classes)
    recall = recall_score(labels, predicted_classes)
    f1 = f1_score(labels, predicted_classes)
    mcc = matthews_corrcoef(labels, predicted_classes)
    roc_auc = roc_auc_score(labels, positive_class_probs)
    pr_auc = average_precision_score(labels, positive_class_probs)
    cm = confusion_matrix(labels, predicted_classes)

    # Print metrics
    print("\nTest Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Matthews Correlation Coefficient: {mcc:.4f}")
    print(f"ROC AUC: {roc_auc:.4f}")
    print(f"PR AUC: {pr_auc:.4f}")

    # Detailed classification report
    print("\nClassification Report:")
    class_names = list(id2label.values())
    report = classification_report(labels, predicted_classes, target_names=class_names)
    print(report)

    # Save results
    results_dir = Path(f"results/{model_name}")
    results_dir.mkdir(parents=True, exist_ok=True)

    # Save metrics to CSV for easy comparison
    metrics_df = pd.DataFrame({
        'Model': [model_name],
        'Accuracy': [accuracy],
        'Precision': [precision],
        'Recall': [recall],
        'F1': [f1],
        'MCC': [mcc],
        'ROC_AUC': [roc_auc],
        'PR_AUC': [pr_auc]
    })
    metrics_df.to_csv(results_dir / "test_metrics.csv", index=False)

    # Save classification report
    with open(results_dir / "classification_report.txt", "w") as f:
        f.write(report)

    # Visualize confusion matrix
    plot_confusion_matrix(cm, class_names, model_name)

    # ROC curve
    fpr, tpr, _ = roc_curve(labels, positive_class_probs)
    plot_roc_curve(fpr, tpr, roc_auc, model_name)

    # Precision-Recall curve
    precision_points, recall_points, _ = precision_recall_curve(labels, positive_class_probs)
    plot_pr_curve(precision_points, recall_points, pr_auc, model_name)

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "mcc": mcc,
        "roc_auc": roc_auc,
        "pr_auc": pr_auc,
        "confusion_matrix": cm,
        "predicted_classes": predicted_classes,
        "probabilities": probabilities
    }

In [None]:
def predict_examples(model, tokenizer, examples, model_name):
    """Predict on a list of example messages.
    Args:
        model: Trained model.
        tokenizer: Tokenizer.
        examples: List of example messages.
        model_name: Name of the model.
    Returns:
        DataFrame with predictions.
    """
    # Ensure model is in evaluation mode
    model.eval()
    results = []

    # Process each example
    for text in examples:
        # Tokenize the text
        inputs = tokenizer(
            text,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=128
        ).to(device)

        # Make prediction
        with torch.no_grad():
            outputs = model(**inputs)

        # Get probabilities
        probs = torch.nn.functional.softmax(outputs.logits, dim=-1)

        # Get predicted class
        predicted_class = torch.argmax(probs, dim=1).item()
        prob_score = probs[0][predicted_class].item()

        # Store results
        results.append({
            "text": text,
            "predicted_class": id2label[predicted_class],
            "confidence": prob_score
        })

    # Convert to DataFrame
    results_df = pd.DataFrame(results)

    # Display results
    print(f"\n=== Example Predictions with {model_name} ===")
    for i, row in results_df.iterrows():
        print(f"\nExample {i+1}:")
        print(f"Text: {row['text']}")
        print(f"Prediction: {row['predicted_class']}")
        print(f"Confidence: {row['confidence']:.4f}")

    return results_df

In [None]:
# Define the fine-tuning techniques
fine_tuning_techniques = [
    "roberta_frozen",
    "roberta_full",
    "roberta_lora",
    "distilbert_frozen",
    "distilbert_full",
    "distilbert_lora"
]

# Set up model families
model_families = {
    "roberta": {
        "model_name": "roberta-base",
        "tokenizer": AutoTokenizer.from_pretrained("roberta-base")
    },
    "distilbert": {
        "model_name": "distilbert-base-uncased",
        "tokenizer": AutoTokenizer.from_pretrained("distilbert-base-uncased")
    }
}

# Tokenize datasets for each model family
print("Tokenizing datasets with original text")
tokenized_datasets = {}
for family, config in model_families.items():
    print(f"\nTokenizing datasets for {family}...")
    tokenized_datasets[family] = tokenize_data(dataset_dict, config["tokenizer"], max_length=128)
    print(f"{family} tokenization complete")

In [None]:
# Baseline evaluation

print(f"\n{'='*60}")
print("EVALUATING BASELINE MODELS (Before Fine-tuning)")
print(f"{'='*60}")

# Store results for later comparison
all_results = {}

baseline_results = {}
for family, config in model_families.items():
    baseline_results[family] = evaluate_baseline_model(
        config["model_name"],
        config["tokenizer"],
        tokenized_datasets[family]["test"],
        device
    )

# Store baseline results in the format expected by comparison
for family, results in baseline_results.items():
    # Convert the eval_* format to the expected format
    all_results[f"baseline_{family}"] = {
        'accuracy': results.get('eval_accuracy', 0),
        'precision': results.get('eval_precision', 0),
        'recall': results.get('eval_recall', 0),
        'f1': results.get('eval_f1', 0),
        'mcc': results.get('eval_mcc', 0),
        'roc_auc': results.get('eval_roc_auc', 0),
        'pr_auc': results.get('eval_pr_auc', 0)
    }

print("\nBaseline results stored:")
for key, value in all_results.items():
    print(f"{key}: MCC={value['mcc']:.4f}, F1={value['f1']:.4f}")

In [None]:
print(f"\n{'='*60}")
print("STARTING FINE-TUNING WITH HYPERPARAMETER OPTIMIZATION")
print(f"{'='*60}")

# Process each fine-tuning technique
for technique in fine_tuning_techniques:
    print(f"\n{'='*50}")
    print(f"Processing {technique}")
    print(f"{'='*50}")

    # Parse technique to get model family and fine-tuning approach
    family, approach = technique.split("_")

    # Get the appropriate model configuration and tokenized data
    model_name = model_families[family]["model_name"]
    tokenizer = model_families[family]["tokenizer"]
    datasets = tokenized_datasets[family]

    # Define the hyperparameter optimization function
    def objective(trial):
        # Define hyperparameter search space
        learning_rate = trial.suggest_float("learning_rate", 1e-5, 5e-4, log=True)
        batch_size = trial.suggest_categorical("batch_size", [8, 16, 32])
        weight_decay = trial.suggest_float("weight_decay", 0.01, 0.1)
        num_epochs = trial.suggest_int("num_epochs", 3, 8)

        # Initialize model based on the approach
        model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=2,
            id2label=id2label,
            label2id=label2id
        )

        if approach == "frozen":
            # Freeze most layers, only train the last layers and classifier
            for name, param in model.named_parameters():
                param.requires_grad = False

                # For RoBERTa, unfreeze last two layers
                if family == "roberta" and ("encoder.layer.10" in name or "encoder.layer.11" in name):
                    param.requires_grad = True

                # For DistilBERT, unfreeze last layer (has fewer layers)
                if family == "distilbert" and "transformer.layer.5" in name:
                    param.requires_grad = True

                # Always unfreeze classifier
                if "classifier" in name:
                    param.requires_grad = True

        elif approach == "lora":
            # Apply LoRA configuration
            lora_r = trial.suggest_int("lora_r", 4, 16)
            lora_alpha = trial.suggest_int("lora_alpha", 8, 32)
            lora_dropout = trial.suggest_float("lora_dropout", 0.05, 0.2)

            # Define target modules based on model family
            if family == "roberta":
                target_modules = ["query", "key", "value"]
            else:  # distilbert
                target_modules = ["q_lin", "v_lin"]

            # Define LoRA configuration
            peft_config = LoraConfig(
                task_type=TaskType.SEQ_CLS,
                r=lora_r,
                lora_alpha=lora_alpha,
                lora_dropout=lora_dropout,
                target_modules=target_modules
            )

            # Apply LoRA to the model
            model = get_peft_model(model, peft_config)

        # For "full" approach, we don't modify the model (all parameters are trainable by default)

        # Define training arguments
        training_args = TrainingArguments(
            output_dir=f"results/optuna_{technique}_trial_{trial.number}",
            learning_rate=learning_rate,
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            num_train_epochs=num_epochs,
            weight_decay=weight_decay,
            eval_strategy="epoch",
            logging_strategy="epoch",
            save_strategy="no",  # Don't save checkpoints during optimization
            load_best_model_at_end=False,
            report_to="none",
            fp16=torch.cuda.is_available()
        )

        # Use WeightedTrainer for hyperparameter optimization too
        class_weights = calculate_class_weights(datasets["train"], device)

        trainer = WeightedTrainer(
            model=model.to(device),
            args=training_args,
            train_dataset=datasets["train"],
            eval_dataset=datasets["valid"],
            tokenizer=tokenizer,
            data_collator=create_data_collator(tokenizer),
            compute_metrics=compute_metrics,
            class_weights=class_weights
        )

        # Train model
        trainer.train()

        # Evaluate model
        eval_result = trainer.evaluate()

        # Extract MCC score
        mcc_score = eval_result.get("eval_mcc", 0.0)

        # Clean up to save memory
        del model
        del trainer
        torch.cuda.empty_cache()

        return mcc_score

    # Run hyperparameter optimization
    print(f"Running hyperparameter optimization for {technique}...")
    study = optuna.create_study(direction="maximize", study_name=f"{technique}_hpo")
    study.optimize(objective, n_trials=10)  # Adjust number of trials as needed

    # Get best hyperparameters
    best_params = study.best_params
    print(f"\nBest hyperparameters for {technique}:")
    for key, value in best_params.items():
        print(f"  {key}: {value}")

    # Save best hyperparameters
    os.makedirs("results/hyperparameters", exist_ok=True)
    with open(f"results/hyperparameters/{technique}_best_params.json", "w") as f:
        json.dump(best_params, f, indent=4)

    # Build model with best hyperparameters
    print(f"\nTraining final {technique} model with best hyperparameters...")
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=2,
        id2label=id2label,
        label2id=label2id
    )

    # Apply appropriate configuration based on approach
    if approach == "frozen":
        # Freeze most layers, only train the last layers and classifier
        for name, param in model.named_parameters():
            param.requires_grad = False

            # For RoBERTa, unfreeze last two layers
            if family == "roberta" and ("encoder.layer.10" in name or "encoder.layer.11" in name):
                param.requires_grad = True

            # For DistilBERT, unfreeze last layer
            if family == "distilbert" and "transformer.layer.5" in name:
                param.requires_grad = True

            # Always unfreeze classifier
            if "classifier" in name:
                param.requires_grad = True

    elif approach == "lora":
        # Define LoRA configuration with best hyperparameters
        if family == "roberta":
            target_modules = ["query", "key", "value"]
        else:  # distilbert
            target_modules = ["q_lin", "v_lin"]

        peft_config = LoraConfig(
            task_type=TaskType.SEQ_CLS,
            r=best_params.get("lora_r", 8),  # Use default if not in best_params
            lora_alpha=best_params.get("lora_alpha", 16),
            lora_dropout=best_params.get("lora_dropout", 0.1),
            target_modules=target_modules
        )

        # Apply LoRA to the model
        model = get_peft_model(model, peft_config)

    # For "full" approach, we don't modify the model

    # Define training arguments
    training_args = TrainingArguments(
        output_dir=f"results/{technique}",
        learning_rate=best_params["learning_rate"],
        per_device_train_batch_size=best_params["batch_size"],
        per_device_eval_batch_size=best_params["batch_size"],
        num_train_epochs=best_params["num_epochs"],
        weight_decay=best_params.get("weight_decay", 0.01),
        eval_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="mcc",
        greater_is_better=True,
        fp16=torch.cuda.is_available(),
        report_to="none",
        logging_dir=f"results/{technique}/logs"
    )

    # Train the model (using our corrected train_model function)
    model = model.to(device)
    trainer = train_model(
        model,
        tokenizer,
        datasets,
        training_args,
        technique
    )

    # Evaluate on test set
    results = evaluate_model(
        trainer,
        datasets["test"],
        technique
    )

    # Store results for later comparison
    all_results[technique] = results

    # Save trainer and model reference for example message testing
    globals()[f"{technique}_trainer"] = trainer

    # Clear GPU memory
    torch.cuda.empty_cache()

print(f"\n{'='*60}")
print("ALL FINE-TUNING COMPLETE!")
print(f"{'='*60}")

In [None]:
# Create comparison DataFrame from all results
comparison_data = []
for technique, results in all_results.items():
    comparison_data.append({
        'Model': technique,
        'MCC': results.get('mcc', 0),
        'F1': results.get('f1', 0),
        'Accuracy': results.get('accuracy', 0),
        'Precision': results.get('precision', 0),
        'Recall': results.get('recall', 0),
        'ROC_AUC': results.get('roc_auc', 0),
        'PR_AUC': results.get('pr_auc', 0)
    })

In [None]:
# Create DataFrame for comparison
if comparison_data:
    comparison_df = pd.DataFrame(comparison_data)

    # Save combined results
    comparison_df.to_csv("results/all_llm_models_comparison.csv", index=False)

    # Display results
    print("\nAll LLM Models Comparison:")
    print(comparison_df.sort_values(by='MCC', ascending=False))

    # Plot comparison
    plt.figure(figsize=(14, 8))
    sns.barplot(data=comparison_df, x='Model', y='MCC')
    plt.title('Matthews Correlation Coefficient (MCC) Across LLM Models')
    plt.xticks(rotation=45, ha='right')
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig("results/visualizations/llm_models_mcc_comparison.png")
    plt.show()
else:
    print("No results available for comparison.")

In [None]:
# 9: Explainability Analysis on Real Test Data

print("\n" + "="*60)
print("EXPLAINABILITY ANALYSIS")
print("="*60)

# Select best performing model for explainability
if comparison_data:
    # Find the best model (highest MCC, excluding baselines)
    finetuned_results = [r for r in comparison_data if not r['Model'].startswith('baseline')]
    if finetuned_results:
        best_model_name = max(finetuned_results, key=lambda x: x['MCC'])['Model']
        print(f"\nRunning explainability analysis on best model: {best_model_name}")

        # Get the best model and tokenizer
        family = best_model_name.split("_")[0]
        best_trainer = globals()[f"{best_model_name}_trainer"]
        tokenizer = model_families[family]["tokenizer"]

        # Get some real test examples for explanation
        test_dataset = tokenized_datasets[family]["test"]

        # Select a few interesting examples from test set
        print("\nAnalyzing real test examples...")

        # Get predictions on test set to find interesting cases
        test_predictions = best_trainer.predict(test_dataset)
        probabilities = torch.nn.functional.softmax(torch.tensor(test_predictions.predictions), dim=-1).numpy()
        predicted_classes = np.argmax(test_predictions.predictions, axis=1)
        true_labels = test_predictions.label_ids

        # Find examples for explanation:
        # 1. High confidence correct predictions (both classes)
        # 2. Misclassified examples
        # 3. Low confidence predictions

        explanation_indices = []

        # High confidence correct spam detection
        spam_correct = np.where((predicted_classes == 1) & (true_labels == 1) & (probabilities[:, 1] > 0.9))[0]
        if len(spam_correct) > 0:
            explanation_indices.append(spam_correct[0])

        # High confidence correct legitimate detection
        legit_correct = np.where((predicted_classes == 0) & (true_labels == 0) & (probabilities[:, 0] > 0.9))[0]
        if len(legit_correct) > 0:
            explanation_indices.append(legit_correct[0])

        # Misclassified example
        misclassified = np.where(predicted_classes != true_labels)[0]
        if len(misclassified) > 0:
            explanation_indices.append(misclassified[0])

        # Low confidence prediction
        min_confidence = np.min(probabilities, axis=1)
        low_conf = np.where(min_confidence < 0.6)[0]
        if len(low_conf) > 0:
            explanation_indices.append(low_conf[0])

        # Run LIME explanations on selected examples
        explainer, predict_fn = setup_lime_explainer(
            best_trainer.model,
            tokenizer,
            device
        )

        for idx in explanation_indices[:4]:  # Limit to 4 examples
            # Get original text
            original_text = test_df.iloc[idx]['message']
            true_label = id2label[true_labels[idx]]
            predicted_label = id2label[predicted_classes[idx]]
            confidence = probabilities[idx].max()

            print(f"\n{'-'*80}")
            print(f"Example {idx + 1}:")
            print(f"Text: {original_text}")
            print(f"True Label: {true_label}")
            print(f"Predicted: {predicted_label} (confidence: {confidence:.3f})")

            # Generate explanation
            explanation = explain_prediction(explainer, predict_fn, original_text, num_features=8)

            print("Top contributing features:")
            for feature, weight in explanation.as_list()[:5]:
                direction = "→ SPAM" if weight > 0 else "→ LEGITIMATE"
                print(f"  '{feature}': {weight:.3f} {direction}")

print(f"\n{'='*60}")
print("ANALYSIS COMPLETE!")
print(f"{'='*60}")

In [None]:
# 11: Compare all models
print("\n=== Comparing All Models ===")

if 'comparison_df' in locals() and not comparison_df.empty:
    # Plot comparison of all metrics for all models
    metrics_to_plot = ['Accuracy', 'Precision', 'Recall', 'F1', 'MCC', 'ROC_AUC', 'PR_AUC']

    # Reshape data for plotting
    plot_data = comparison_df.melt(
        id_vars=['Model'],
        value_vars=metrics_to_plot,
        var_name='Metric',
        value_name='Score'
    )

    # Plot
    plt.figure(figsize=(14, 8))
    sns.barplot(data=plot_data, x='Metric', y='Score', hue='Model')
    plt.title('Performance Comparison Across LLM Models')
    plt.ylim(0, 1)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()
    plt.savefig("results/visualizations/llm_models_all_metrics_comparison.png")
    plt.show()

    # Compare RoBERTa vs DistilBERT (average across fine-tuning approaches)
    model_family_comparison = comparison_df.copy()
    model_family_comparison['Model_Family'] = model_family_comparison['Model'].apply(
        lambda x: 'RoBERTa' if x.startswith('roberta') else 'DistilBERT'
    )

    family_avg = model_family_comparison.groupby('Model_Family')[metrics_to_plot].mean().reset_index()

    # Plot model family comparison
    plt.figure(figsize=(14, 8))
    sns.barplot(data=family_avg.melt(
        id_vars=['Model_Family'],
        value_vars=metrics_to_plot,
        var_name='Metric',
        value_name='Score'
    ), x='Metric', y='Score', hue='Model_Family')
    plt.title('Average Performance by Model Family')
    plt.ylim(0, 1)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig("results/visualizations/model_family_comparison.png")
    plt.show()

    # Compare fine-tuning approaches (average across model families)
    comparison_df['Fine_Tuning_Approach'] = comparison_df['Model'].apply(
        lambda x: x.split('_')[1]  # Extract approach (frozen, full, lora)
    )

    approach_avg = comparison_df.groupby('Fine_Tuning_Approach')[metrics_to_plot].mean().reset_index()

    # Plot fine-tuning approach comparison
    plt.figure(figsize=(14, 8))
    sns.barplot(data=approach_avg.melt(
        id_vars=['Fine_Tuning_Approach'],
        value_vars=metrics_to_plot,
        var_name='Metric',
        value_name='Score'
    ), x='Metric', y='Score', hue='Fine_Tuning_Approach')
    plt.title('Average Performance by Fine-Tuning Approach')
    plt.ylim(0, 1)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig("results/visualizations/fine_tuning_approach_comparison.png")
    plt.show()
else:
    print("No results available for comparison.")

In [None]:
# 12: Compare with previous models (ML and DL)
print("\n=== Comparing All Models (ML, DL, and LLM) ===")

# Load results from previous models
ml_results_path = "results/metrics/baseline_ml_results_regularized.csv"
dl_results_path = "results/metrics/all_models_comparison.csv"

all_model_results = []

# 1. Add LLM results (current notebook)
if 'comparison_df' in locals() and not comparison_df.empty:
    llm_results = comparison_df.copy()
    llm_results['Model_Type'] = 'LLM'

    # Add fine-tuning approach
    llm_results['Fine_Tuning_Approach'] = llm_results['Model'].apply(
        lambda x: x.split('_')[-1] if 'baseline' not in x else 'baseline'
    )

    all_model_results.append(llm_results)
    print(f"Added {len(llm_results)} LLM models")

# 2. Load and process ML results
if Path(ml_results_path).exists():
    ml_df = pd.read_csv(ml_results_path)
    print(f"Found ML results file with columns: {ml_df.columns.tolist()}")

    # Standardize column names
    ml_standardized = pd.DataFrame()
    ml_standardized['Model'] = ml_df['Model']
    ml_standardized['MCC'] = ml_df.get('Test MCC', 0)
    ml_standardized['F1'] = ml_df.get('Test F1', 0)
    ml_standardized['Accuracy'] = ml_df.get('Test Accuracy', 0)
    ml_standardized['Precision'] = ml_df.get('Test Precision', 0)
    ml_standardized['Recall'] = ml_df.get('Test Recall', 0)
    ml_standardized['ROC_AUC'] = ml_df.get('Test ROC AUC', 0)
    ml_standardized['PR_AUC'] = ml_df.get('Test PR AUC', 0)
    ml_standardized['Model_Type'] = 'ML'
    ml_standardized['Fine_Tuning_Approach'] = 'N/A'

    all_model_results.append(ml_standardized)
    print(f"Added {len(ml_standardized)} ML models")
else:
    print("ML results file not found")

# 3. Load and process DL results
if Path(dl_results_path).exists():
    dl_combined_df = pd.read_csv(dl_results_path)
    print(f"Found DL results file with columns: {dl_combined_df.columns.tolist()}")

    # Filter to get only DL models (exclude ML models that might be in the file)
    dl_model_names = ['TextCNN', 'BiLSTM', 'CNN', 'LSTM', 'Bi-LSTM']  # Add your DL model names
    dl_only = dl_combined_df[dl_combined_df['Model'].isin(dl_model_names)]

    if len(dl_only) > 0:
        # Standardize column names for DL models
        dl_standardized = pd.DataFrame()
        dl_standardized['Model'] = dl_only['Model']
        dl_standardized['MCC'] = dl_only.get('Test MCC', 0)
        dl_standardized['F1'] = dl_only.get('Test F1', 0)
        dl_standardized['Accuracy'] = dl_only.get('Test Accuracy', 0)
        dl_standardized['Precision'] = dl_only.get('Test Precision', 0)
        dl_standardized['Recall'] = dl_only.get('Test Recall', 0)
        dl_standardized['ROC_AUC'] = dl_only.get('Test ROC AUC', 0)
        dl_standardized['PR_AUC'] = dl_only.get('Test PR AUC', 0)
        dl_standardized['Model_Type'] = 'DL'
        dl_standardized['Fine_Tuning_Approach'] = 'N/A'

        all_model_results.append(dl_standardized)
        print(f"Added {len(dl_standardized)} DL models")
    else:
        print("No DL models found in the combined results file")
else:
    print("DL results file not found")

# 4. Combine all results
if all_model_results:
    # Combine all dataframes
    final_comparison_df = pd.concat(all_model_results, ignore_index=True)

    # Remove any duplicate rows (same model name and type)
    final_comparison_df = final_comparison_df.drop_duplicates(subset=['Model', 'Model_Type'], keep='first')

    # Sort by MCC (descending)
    final_comparison_df = final_comparison_df.sort_values('MCC', ascending=False).reset_index(drop=True)

    # Save the clean combined results
    final_comparison_df.to_csv("results/final_comparison/all_models_clean_comparison.csv", index=False)

    # Display results
    print(f"\n{'='*80}")
    print("CLEAN FINAL COMPARISON - ALL MODELS")
    print(f"{'='*80}")

    display_columns = ['Model', 'Model_Type', 'MCC', 'F1', 'Accuracy', 'Precision', 'Recall', 'ROC_AUC', 'PR_AUC']
    print(final_comparison_df[display_columns].round(4))

    # Summary by model type
    print(f"\n{'='*50}")
    print("SUMMARY BY MODEL TYPE")
    print(f"{'='*50}")

    summary_stats = final_comparison_df.groupby('Model_Type')[['MCC', 'F1', 'ROC_AUC', 'PR_AUC']].agg(['mean', 'max', 'count']).round(4)
    print(summary_stats)

    # Best model overall
    best_model = final_comparison_df.iloc[0]
    print(f"\nBEST OVERALL MODEL:")
    print(f"   Model: {best_model['Model']} ({best_model['Model_Type']})")
    print(f"   MCC: {best_model['MCC']:.4f}")
    print(f"   F1: {best_model['F1']:.4f}")
    print(f"   ROC AUC: {best_model['ROC_AUC']:.4f}")

    # Create visualizations
    plt.figure(figsize=(16, 10))

    # Plot 1: Top 10 models by MCC
    plt.subplot(2, 2, 1)
    top_10 = final_comparison_df.head(10)
    colors = {'ML': 'skyblue', 'DL': 'lightgreen', 'LLM': 'lightcoral'}
    bar_colors = [colors.get(model_type, 'gray') for model_type in top_10['Model_Type']]

    plt.bar(range(len(top_10)), top_10['MCC'], color=bar_colors)
    plt.title('Top 10 Models by MCC')
    plt.xlabel('Models')
    plt.ylabel('MCC Score')
    plt.xticks(range(len(top_10)), top_10['Model'], rotation=45, ha='right')
    plt.grid(axis='y', alpha=0.3)

    # Add legend
    import matplotlib.patches as mpatches
    patches = [mpatches.Patch(color=color, label=label) for label, color in colors.items()]
    plt.legend(handles=patches, loc='upper right')

    # Plot 2: Model type comparison (average metrics)
    plt.subplot(2, 2, 2)
    type_avg = final_comparison_df.groupby('Model_Type')[['MCC', 'F1', 'ROC_AUC']].mean()
    type_avg.plot(kind='bar', ax=plt.gca())
    plt.title('Average Performance by Model Type')
    plt.ylabel('Score')
    plt.legend()
    plt.xticks(rotation=0)
    plt.grid(axis='y', alpha=0.3)

    # Plot 3: MCC vs F1 scatter
    plt.subplot(2, 2, 3)
    for model_type, color in colors.items():
        subset = final_comparison_df[final_comparison_df['Model_Type'] == model_type]
        plt.scatter(subset['MCC'], subset['F1'], c=color, label=model_type, alpha=0.7, s=60)

    plt.xlabel('MCC Score')
    plt.ylabel('F1 Score')
    plt.title('MCC vs F1 Score')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # Plot 4: Count by model type
    plt.subplot(2, 2, 4)
    type_counts = final_comparison_df['Model_Type'].value_counts()
    plt.pie(type_counts.values, labels=type_counts.index, autopct='%1.0f%%', colors=[colors[t] for t in type_counts.index])
    plt.title('Number of Models by Type')

    plt.tight_layout()
    plt.savefig("results/final_comparison/comprehensive_model_comparison.png", dpi=300, bbox_inches='tight')
    plt.show()

else:
    print("No model results found for comparison")

print(f"\n{'='*80}")
print("COMPARISON COMPLETE!")
print(f"{'='*80}")

In [None]:
# Training curve visualization
def plot_training_curves_comparison():
    """
    Plot training curves for all models to visualize overfitting
    """
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    axes = axes.flatten()

    models_data = [
        ('roberta_frozen', roberta_frozen_trainer),
        ('roberta_full', roberta_full_trainer),
        ('roberta_lora', roberta_lora_trainer),
        ('distilbert_frozen', distilbert_frozen_trainer),
        ('distilbert_full', distilbert_full_trainer),
        ('distilbert_lora', distilbert_lora_trainer),
    ]

    for idx, (model_name, trainer) in enumerate(models_data):
        if trainer and idx < len(axes):
            logs = trainer.state.log_history

            # Extract training and validation losses
            train_losses = [log['loss'] for log in logs if 'loss' in log and 'eval_loss' not in log]
            val_losses = [log['eval_loss'] for log in logs if 'eval_loss' in log]
            val_f1 = [log['eval_f1'] for log in logs if 'eval_f1' in log]

            epochs = range(1, len(val_losses) + 1)

            # Plot validation loss and F1
            ax = axes[idx]
            ax2 = ax.twinx()

            line1 = ax.plot(epochs, val_losses, 'b-', label='Val Loss', marker='o')
            line2 = ax2.plot(epochs, val_f1, 'r-', label='Val F1', marker='s')

            ax.set_xlabel('Epoch')
            ax.set_ylabel('Validation Loss', color='b')
            ax2.set_ylabel('Validation F1', color='r')
            ax.set_title(f'{model_name}')

            # Add overfitting indicators
            if len(val_losses) > 3:
                best_epoch = np.argmax(val_f1) + 1
                ax2.axvline(x=best_epoch, color='g', linestyle='--', alpha=0.7, label=f'Best F1 (Epoch {best_epoch})')

            ax.grid(True, alpha=0.3)

            # Combine legends
            lines1, labels1 = ax.get_legend_handles_labels()
            lines2, labels2 = ax2.get_legend_handles_labels()
            ax.legend(lines1 + lines2, labels1 + labels2, loc='upper right')

    plt.tight_layout()
    plt.savefig('results/visualizations/training_curves_comparison.png', dpi=300, bbox_inches='tight')
    plt.show()

plot_training_curves_comparison()

In [None]:
# Computational requirements analysis
def analyze_computational_requirements():
    """
    Analyze computational requirements for different fine-tuning approaches
    """

    # Training times (extract from your existing results)
    training_times = {
        'roberta_frozen': 3.96,  # minutes - from your output
        'roberta_full': 2.65,    # minutes
        'roberta_lora': 1.93,    # minutes
        'distilbert_frozen': 5.01, # minutes
        'distilbert_full': 5.51,   # minutes
        'distilbert_lora': 1.89,   # minutes
    }

    # Parameter counts (extract from your model summaries)
    parameter_counts = {
        'roberta_frozen': {'total': 124647170, 'trainable': 14767874, 'trainable_pct': 11.85},
        'roberta_full': {'total': 124647170, 'trainable': 124647170, 'trainable_pct': 100.00},
        'roberta_lora': {'total': 124647170, 'trainable': 979202, 'trainable_pct': 0.78},
        'distilbert_frozen': {'total': 66955010, 'trainable': 7680002, 'trainable_pct': 11.47},
        'distilbert_full': {'total': 66955010, 'trainable': 66955010, 'trainable_pct': 100.00},
        'distilbert_lora': {'total': 66955010, 'trainable': 684290, 'trainable_pct': 1.01},
    }

    # Performance results (from your comparison_df)
    performance_results = {
        'roberta_frozen': {'accuracy': 0.9533, 'f1': 0.8772, 'mcc': 0.8503},
        'roberta_full': {'accuracy': 0.9400, 'f1': 0.8525, 'mcc': 0.8150},
        'roberta_lora': {'accuracy': 0.9333, 'f1': 0.8276, 'mcc': 0.7870},
        'distilbert_frozen': {'accuracy': 0.9333, 'f1': 0.8333, 'mcc': 0.7917},
        'distilbert_full': {'accuracy': 0.9600, 'f1': 0.9032, 'mcc': 0.8788},
        'distilbert_lora': {'accuracy': 0.9467, 'f1': 0.8667, 'mcc': 0.8333},
    }

    # Create comprehensive analysis DataFrame
    analysis_data = []
    for model in training_times.keys():
        analysis_data.append({
            'Model': model,
            'Training_Time_Min': training_times[model],
            'Total_Parameters': parameter_counts[model]['total'],
            'Trainable_Parameters': parameter_counts[model]['trainable'],
            'Trainable_Percentage': parameter_counts[model]['trainable_pct'],
            'Test_Accuracy': performance_results[model]['accuracy'],
            'Test_F1': performance_results[model]['f1'],
            'Test_MCC': performance_results[model]['mcc'],
            'Efficiency_Score': performance_results[model]['mcc'] / (parameter_counts[model]['trainable_pct'] / 100),  # MCC per % of trainable params
        })

    efficiency_df = pd.DataFrame(analysis_data)

    # Add model family and approach columns
    efficiency_df['Model_Family'] = efficiency_df['Model'].apply(lambda x: 'RoBERTa' if 'roberta' in x else 'DistilBERT')
    efficiency_df['Fine_Tuning_Approach'] = efficiency_df['Model'].apply(lambda x: x.split('_')[1])

    # Save results
    efficiency_df.to_csv('results/metrics/computational_efficiency_analysis.csv', index=False)

    return efficiency_df

# Run the analysis
efficiency_results = analyze_computational_requirements()
print("Computational Efficiency Analysis:")
print(efficiency_results.round(4))

In [None]:
# Efficiency visualization
def plot_efficiency_analysis(efficiency_df):
    """
    Create comprehensive efficiency plots
    """
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))

    # Plot 1: Performance vs Trainable Parameters
    ax1 = axes[0, 0]
    scatter = ax1.scatter(efficiency_df['Trainable_Percentage'], efficiency_df['Test_MCC'],
                         c=efficiency_df['Training_Time_Min'], s=100, alpha=0.7, cmap='viridis')

    for idx, row in efficiency_df.iterrows():
        ax1.annotate(row['Model'], (row['Trainable_Percentage'], row['Test_MCC']),
                    xytext=(5, 5), textcoords='offset points', fontsize=8)

    ax1.set_xlabel('Trainable Parameters (%)')
    ax1.set_ylabel('Test MCC')
    ax1.set_title('Performance vs Parameter Efficiency')
    ax1.grid(True, alpha=0.3)
    plt.colorbar(scatter, ax=ax1, label='Training Time (min)')

    # Plot 2: Efficiency Score by Approach
    ax2 = axes[0, 1]
    approach_efficiency = efficiency_df.groupby('Fine_Tuning_Approach')['Efficiency_Score'].mean().reset_index()
    bars = ax2.bar(approach_efficiency['Fine_Tuning_Approach'], approach_efficiency['Efficiency_Score'])
    ax2.set_ylabel('Efficiency Score (MCC per % trainable params)')
    ax2.set_title('Efficiency by Fine-tuning Approach')
    ax2.grid(True, alpha=0.3)

    # Add value labels on bars
    for bar in bars:
        height = bar.get_height()
        ax2.text(bar.get_x() + bar.get_width()/2., height + 0.01,
                f'{height:.2f}', ha='center', va='bottom')

    # Plot 3: Training Time vs Performance
    ax3 = axes[1, 0]
    colors = {'lora': 'red', 'frozen': 'blue', 'full': 'green'}
    for approach in efficiency_df['Fine_Tuning_Approach'].unique():
        subset = efficiency_df[efficiency_df['Fine_Tuning_Approach'] == approach]
        ax3.scatter(subset['Training_Time_Min'], subset['Test_MCC'],
                   label=approach, color=colors.get(approach, 'gray'), s=100, alpha=0.7)

    ax3.set_xlabel('Training Time (minutes)')
    ax3.set_ylabel('Test MCC')
    ax3.set_title('Training Time vs Performance')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Plot 4: Model Family Comparison
    ax4 = axes[1, 1]
    family_comparison = efficiency_df.groupby(['Model_Family', 'Fine_Tuning_Approach']).agg({
        'Test_MCC': 'mean',
        'Trainable_Percentage': 'mean'
    }).reset_index()

    x = np.arange(len(family_comparison['Fine_Tuning_Approach'].unique()))
    width = 0.35

    roberta_data = family_comparison[family_comparison['Model_Family'] == 'RoBERTa']
    distilbert_data = family_comparison[family_comparison['Model_Family'] == 'DistilBERT']

    ax4.bar(x - width/2, roberta_data['Test_MCC'], width, label='RoBERTa', alpha=0.7)
    ax4.bar(x + width/2, distilbert_data['Test_MCC'], width, label='DistilBERT', alpha=0.7)

    ax4.set_xlabel('Fine-tuning Approach')
    ax4.set_ylabel('Test MCC')
    ax4.set_title('Model Family Performance Comparison')
    ax4.set_xticks(x)
    ax4.set_xticklabels(roberta_data['Fine_Tuning_Approach'])
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('results/visualizations/efficiency_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()

plot_efficiency_analysis(efficiency_results)