<a href="https://colab.research.google.com/github/Qi-He1/stats507-coursework/blob/main/final_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers datasets torch pandas matplotlib seaborn scikit-learn

import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import load_dataset
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, roc_curve, auc
import random
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import time
import traceback

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(42)

# 1. Load and explore the dataset

In [None]:
dataset = load_dataset("GonzaloA/fake_news")

print(f"Dataset structure: {dataset}")
print(f"Available splits: {list(dataset.keys())}")

print("\n=== Data Distribution ===")
for split_name in dataset.keys():
    labels = dataset[split_name]['label']
    fake_count = sum(1 for label in labels if label == 0)
    real_count = sum(1 for label in labels if label == 1)
    print(f"{split_name} set - Fake news: {fake_count}, Real news: {real_count}, Total: {len(labels)}")

final_dataset = {
    'train': dataset['train'],
    'validation': dataset['validation'],
    'test': dataset['test']
}

# 2. Define model configurations and global variables

In [None]:
MODELS = {
    "distilbert": "distilbert-base-uncased",
    "bert": "google-bert/bert-base-uncased",
    "roberta": "FacebookAI/roberta-base"
}

all_results = {}

# 3. Define model training and evaluation function

In [None]:
def train_and_evaluate_model(model_name, model_path):

    print(f"Starting {model_name.upper()} model")
    start_time = time.time()

    # Load tokenizer
    print(f"Loading {model_name} tokenizer...")
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_path)
    except Exception as e:
        print(f"Failed to load tokenizer: {e}")
        return None

    # For RoBERTa, set pad_token
    if model_name == "roberta" and tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # Define preprocessing function
    def preprocess_function(examples):
        return tokenizer(
            examples["text"],
            truncation=True,
            padding=True,
            max_length=512,
            return_tensors=None
        )

    # Tokenize datasets
    print("Tokenizing text...")
    tokenized_datasets = {}
    for split in ['train', 'validation', 'test']:
        tokenized_datasets[split] = final_dataset[split].map(
            preprocess_function,
            batched=True,
            batch_size=1000
        )

        # Rename label column to match Hugging Face format
        if 'label' in tokenized_datasets[split].column_names:
            tokenized_datasets[split] = tokenized_datasets[split].rename_column('label', 'labels')

    # Create model
    print(f"Initializing {model_name} model...")
    model_config = {
        "num_labels": 2,
        "id2label": {0: "fake", 1: "real"},
        "label2id": {"fake": 0, "real": 1}
    }

    # Special handling for RoBERTa
    if model_name == "roberta":
        model_config["pad_token_id"] = tokenizer.pad_token_id

    try:
        model = AutoModelForSequenceClassification.from_pretrained(
            model_path,
            **model_config
        )
    except Exception as e:
        print(f"Failed to load model: {e}")
        return None

    # Define evaluation metrics calculation function for training validation
    def compute_metrics(eval_pred):
        predictions, labels = eval_pred
        pred_labels = np.argmax(predictions, axis=1)

        accuracy = accuracy_score(labels, pred_labels)
        precision, recall, f1, _ = precision_recall_fscore_support(
            labels, pred_labels, average='binary'
        )

        return {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1": f1
        }

    # Set training arguments (use accuracy as the best model metric)
    training_args = TrainingArguments(
        output_dir=f"./results/{model_name}-fake-news",
        overwrite_output_dir=True,
        num_train_epochs=3,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir=f"./logs/{model_name}",
        logging_steps=100,
        eval_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="accuracy",
        greater_is_better=True,
        seed=42,
        report_to=None,
    )

    # Create Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["validation"],
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
    )

    # Train model
    print(f"Training {model_name} model...")
    try:
        train_result = trainer.train()

        # Record training time
        end_time = time.time()
        training_time = end_time - start_time
        print(f"{model_name} training time: {training_time:.2f} seconds")

    except Exception as e:
        print(f"Training failed: {e}")
        traceback.print_exc()
        return None

    # Final evaluation on test set
    print(f"Evaluating {model_name} model on test set...")
    test_predictions = trainer.predict(tokenized_datasets["test"])

    # Extract prediction results
    predictions = test_predictions.predictions
    true_labels = test_predictions.label_ids
    predictions_proba = torch.softmax(torch.tensor(predictions), dim=-1).numpy()
    pred_labels = np.argmax(predictions, axis=1)

    # Calculate all evaluation metrics
    accuracy = accuracy_score(true_labels, pred_labels)
    precision, recall, f1, _ = precision_recall_fscore_support(
        true_labels, pred_labels, average='binary'
    )

    fpr, tpr, thresholds = roc_curve(true_labels, predictions_proba[:, 1])
    roc_auc = auc(fpr, tpr)
    cm = confusion_matrix(true_labels, pred_labels)

    print(f"\n=== {model_name.upper()} Test Results ===")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"AUC-ROC: {roc_auc:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")

    # Save results
    model_results = {
        'test_metrics': {
            'test_accuracy': accuracy,
            'test_f1': f1,
            'test_precision': precision,
            'test_recall': recall
        },
        'training_time': training_time,
        'confusion_matrix': cm.tolist(),
        'roc_curve': {
            'fpr': fpr.tolist(),
            'tpr': tpr.tolist(),
            'auc': roc_auc
        }
    }

    # Save model
    trainer.save_model(f"./models/{model_name}-final")
    tokenizer.save_pretrained(f"./models/{model_name}-final")
    print(f"\n{model_name} model saved to './models/{model_name}-final'")

    return model_results


# 4. Train all models

In [None]:
successful_models = []
for model_name, model_path in MODELS.items():
    print(f"\nProcessing model: {model_name.upper()}")
    result = train_and_evaluate_model(model_name, model_path)

    if result is not None:
        all_results[model_name] = result
        successful_models.append(model_name)
        print(f" {model_name.upper()} model training completed")
    else:
        print(f" {model_name.upper()} model training failed")

print(f"\nSuccessfully trained models: {[m.upper() for m in successful_models]}")

# 5. Result comparison and visualization

In [None]:
# Create comparison table
comparison_data = []
for model_name, results in all_results.items():
    test_metrics = results['test_metrics']
    training_time = results['training_time']
    roc_auc = results['roc_curve']['auc']

    comparison_data.append({
        'Model': model_name.upper(),
        'Accuracy': f"{test_metrics.get('test_accuracy', 0):.4f}",
        'F1-Score': f"{test_metrics.get('test_f1', 0):.4f}",
        'AUC-ROC': f"{roc_auc:.4f}",
        'Precision': f"{test_metrics.get('test_precision', 0):.4f}",
        'Recall': f"{test_metrics.get('test_recall', 0):.4f}",
        'Training_Time(s)': f"{training_time:.2f}"
    })

comparison_df = pd.DataFrame(comparison_data)
print("\nDetailed Performance Comparison:")
print(comparison_df.to_string(index=False))

# Visualization results
# 1. Main performance metrics comparison plot
plt.figure(figsize=(10, 6))
metrics_to_plot = ['Accuracy', 'F1-Score', 'AUC-ROC']
metric_values = {metric: [] for metric in metrics_to_plot}

for model_name in all_results.keys():
    test_metrics = all_results[model_name]['test_metrics']
    roc_auc = all_results[model_name]['roc_curve']['auc']

    for metric in metrics_to_plot:
        if metric == 'F1-Score':
            key = 'test_f1'
            metric_values[metric].append(float(test_metrics.get(key, 0)))
        elif metric == 'AUC-ROC':
            metric_values[metric].append(float(roc_auc))
        else:
            key = 'test_accuracy'
            metric_values[metric].append(float(test_metrics.get(key, 0)))

x = np.arange(len(all_results))
width = 0.25

for i, metric in enumerate(metrics_to_plot):
    plt.bar(x + i*width, metric_values[metric], width, label=metric, alpha=0.8)

plt.xlabel('Models')
plt.ylabel('Score')
plt.title('Main Performance Metrics Comparison')
plt.xticks(x + width, [model.upper() for model in all_results.keys()])
plt.legend()
plt.ylim(0.9, 1)
plt.show()

# 2. Training time and efficiency comparison
plt.figure(figsize=(8, 6))
training_times_list = [all_results[model]['training_time'] for model in all_results.keys()]
plt.bar([model.upper() for model in all_results.keys()], training_times_list,
        color=['skyblue', 'lightcoral', 'lightgreen'][:len(all_results)])
plt.title('Training Time Comparison (seconds)')
plt.ylabel('Time (seconds)')

for i, v in enumerate(training_times_list):
    plt.text(i, v + max(training_times_list)*0.01, f'{v:.1f}s', ha='center', va='bottom')
plt.show()

# 3. Confusion matrix heatmap
model_names = list(all_results.keys())
for model_name in model_names:
    plt.figure(figsize=(6, 5))
    cm = np.array(all_results[model_name]['confusion_matrix'])

    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Predicted Fake', 'Predicted Real'],
                yticklabels=['Actual Fake', 'Actual Real'])
    plt.title(f'{model_name.upper()} Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()