## DSA4213 Assignment 3

### Load dataset

In [None]:
from datasets import load_dataset
import re
from transformers import AutoTokenizer

# Load IMDb dataset
dataset = load_dataset("imdb")

# Minimal cleaning (remove <br /> tags)
def clean_text(text):
    return re.sub(r"<br />", " ", text).strip()

dataset = dataset.map(lambda x: {"text": clean_text(x["text"])})

# Tokenizer (shared for both BERT & DistilBERT)
MODEL_NAME = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=256)

encoded_dataset = dataset.map(tokenize, batched=True)


### Clean data set

In [None]:
from datasets import load_dataset, Dataset, DatasetDict
from sklearn.model_selection import train_test_split

# 1. Load all IMDB data
raw_dataset = load_dataset("imdb", split="train+test")
print(f"Total combined samples: {len(raw_dataset)}")

# 2. Remove duplicate texts
unique_map = {}
texts, labels = [], []

for t, l in zip(raw_dataset["text"], raw_dataset["label"]):
    if t not in unique_map:
        unique_map[t] = l
        texts.append(t)
        labels.append(l)

print(f"Unique samples after deduplication: {len(texts)}")

# 3. Re-split cleanly (80 / 20 stratified)
train_texts, test_texts, train_labels, test_labels = train_test_split(
    texts, labels, test_size=0.2, random_state=42, stratify=labels
)

dataset_clean = DatasetDict({
    "train": Dataset.from_dict({"text": train_texts, "label": train_labels}),
    "test":  Dataset.from_dict({"text": test_texts,  "label": test_labels}),
})

# 4. Verify zero overlap
train_set = set(dataset_clean["train"]["text"])
test_set  = set(dataset_clean["test"]["text"])
print("Overlap between train and test:", len(train_set.intersection(test_set)))

# 5. Save clean dataset
dataset_clean.save_to_disk("./clean_imdb_dataset")
print("Clean dataset saved at ./clean_imdb_dataset")


### Simple EDA

In [None]:
import pandas as pd
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from datasets import load_from_disk, load_dataset

# --------------------------------------------------------
# Load dataset (cleaned version if available)
# --------------------------------------------------------
try:
    dataset = load_from_disk("./clean_imdb_dataset")
    print("Loaded cleaned IMDb dataset from disk.")
except:
    dataset = load_dataset("imdb")
    print("Loaded default IMDb dataset.")

# Convert to DataFrame for analysis
train_df = pd.DataFrame(dataset["train"])
test_df = pd.DataFrame(dataset["test"])

# --------------------------------------------------------
# 1. Label distribution (balance check)
# --------------------------------------------------------
train_counts = Counter(dataset["train"]["label"])
test_counts = Counter(dataset["test"]["label"])

print("\nLabel Distribution:")
print("Train:", train_counts)
print("Test:", test_counts)

# Plot label distribution
plt.figure(figsize=(5,4))
sns.barplot(x=["Negative", "Positive"],
            y=[train_counts[0], train_counts[1]],
            palette="coolwarm")
plt.title("Label Distribution in IMDb Train Split")
plt.ylabel("Count")
plt.show()

# --------------------------------------------------------
# 2. Review length analysis
# --------------------------------------------------------
train_df["char_length"] = train_df["text"].apply(len)
train_df["word_length"] = train_df["text"].apply(lambda x: len(x.split()))

print("\nCharacter length statistics:")
print(train_df["char_length"].describe())

print("\nWord length statistics:")
print(train_df["word_length"].describe())

# Histogram of review lengths (in words)
plt.figure(figsize=(8,5))
plt.hist(train_df["word_length"], bins=60, color="skyblue", edgecolor="black")
plt.xlabel("Review Length (words)")
plt.ylabel("Frequency")
plt.title("IMDb Review Word Count Distribution")
plt.xlim(0, 1200)  # trim long tail for visibility
plt.show()

# --------------------------------------------------------
# 3. Sample examples for quick inspection
# --------------------------------------------------------
print("\nExample Positive Review:\n")
print(train_df[train_df["label"] == 1]["text"].iloc[0][:500])

print("\nExample Negative Review:\n")
print(train_df[train_df["label"] == 0]["text"].iloc[0][:500])

# --------------------------------------------------------
# 4. Train-test comparison summary
# --------------------------------------------------------
summary = {
    "Split": ["Train", "Test"],
    "Total Samples": [len(train_df), len(test_df)],
    "Positive": [train_counts[1], test_counts[1]],
    "Negative": [train_counts[0], test_counts[0]],
}

summary_df = pd.DataFrame(summary)
print("\nDataset Summary:")
print(summary_df)

# Optional: visualize comparison
plt.figure(figsize=(6,4))
sns.barplot(data=summary_df.melt(id_vars="Split", value_vars=["Positive", "Negative"]),
            x="Split", y="value", hue="variable", palette="viridis")
plt.title("Positive vs Negative Reviews per Split")
plt.ylabel("Count")
plt.show()


### Full finetuning

In [None]:
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments
)
from datasets import load_from_disk, load_dataset
import evaluate
import numpy as np

# ------------------------------------------------------------
# 1. Load IMDb Dataset
# ------------------------------------------------------------
try:
    dataset = load_from_disk("./clean_imdb_dataset")
    print("Loaded cleaned IMDb dataset from disk.")
except:
    dataset = load_dataset("imdb")
    print("Loaded default IMDb dataset.")

# ------------------------------------------------------------
# 2. Tokenizer and Encoding
# ------------------------------------------------------------
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=256)

encoded_dataset = dataset.map(preprocess_function, batched=True)

# ------------------------------------------------------------
# 3. Metrics
# ------------------------------------------------------------
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy.compute(predictions=preds, references=labels)["accuracy"],
        "precision": precision.compute(predictions=preds, references=labels, average="macro")["precision"],
        "recall": recall.compute(predictions=preds, references=labels, average="macro")["recall"],
        "f1": f1.compute(predictions=preds, references=labels, average="macro")["f1"],
    }

# ------------------------------------------------------------
# 4. Training Arguments  (simplified for compatibility)
# ------------------------------------------------------------
training_args = TrainingArguments(
    output_dir="./results_full",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=2,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=100,
    save_total_limit=2
)

# ------------------------------------------------------------
# 5. FULL FINE-TUNING (BERT)
# ------------------------------------------------------------
bert_model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)

trainer_bert = Trainer(
    model=bert_model,
    args=training_args,
    train_dataset=encoded_dataset["train"].select(range(5000)),
    eval_dataset=encoded_dataset["test"].select(range(1000)),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print("\n=== Training BERT Full Fine-Tuning ===")
trainer_bert.train()
print("\n=== Evaluating BERT Full ===")
results_bert = trainer_bert.evaluate()
print(results_bert)
trainer_bert.save_model("./bert_full_finetuned")
print("Saved BERT model to ./bert_full_finetuned")

# ------------------------------------------------------------
# 6. FULL FINE-TUNING (DistilBERT)
# ------------------------------------------------------------
distil_model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)

trainer_distil = Trainer(
    model=distil_model,
    args=training_args,
    train_dataset=encoded_dataset["train"].select(range(5000)),
    eval_dataset=encoded_dataset["test"].select(range(1000)),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print("\n=== Training DistilBERT Full Fine-Tuning ===")
trainer_distil.train()
print("\n=== Evaluating DistilBERT Full ===")
results_distil = trainer_distil.evaluate()
print(results_distil)
trainer_distil.save_model("./distilbert_full_finetuned")
print("Saved DistilBERT model to ./distilbert_full_finetuned")

# ------------------------------------------------------------
# 7. Summary
# ------------------------------------------------------------
print("\n=== Summary of Fine-Tuning Results ===")
print("BERT:", results_bert)
print("DistilBERT:", results_distil)


### LORA


In [None]:
from peft import LoraConfig, get_peft_model
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments
)
from datasets import load_from_disk, load_dataset
import evaluate
import numpy as np
import torch

# ------------------------------------------------------------
# 1. Load IMDb Dataset
# ------------------------------------------------------------
try:
    dataset = load_from_disk("./clean_imdb_dataset")
    print("Loaded cleaned IMDb dataset from disk.")
except:
    dataset = load_dataset("imdb")
    print("Loaded default IMDb dataset.")

# ------------------------------------------------------------
# 2. Tokenizer and Encoding
# ------------------------------------------------------------
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=256)

encoded_dataset = dataset.map(preprocess_function, batched=True)

# ------------------------------------------------------------
# 3. Metrics
# ------------------------------------------------------------
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy.compute(predictions=preds, references=labels)["accuracy"],
        "precision": precision.compute(predictions=preds, references=labels, average="macro")["precision"],
        "recall": recall.compute(predictions=preds, references=labels, average="macro")["recall"],
        "f1": f1.compute(predictions=preds, references=labels, average="macro")["f1"],
    }

# ------------------------------------------------------------
# 4. Training Arguments (simplified for compatibility)
# ------------------------------------------------------------
training_args = TrainingArguments(
    output_dir="./results_lora",
    learning_rate=2e-4,               # higher LR since LoRA trains fewer params
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs_lora",
    logging_steps=100,
    save_total_limit=2
)

# ------------------------------------------------------------
# 5. LoRA Fine-Tuning (BERT)
# ------------------------------------------------------------
lora_config_bert = LoraConfig(
    r=8,
    lora_alpha=32,
    target_modules=["query", "value"],
    lora_dropout=0.1,
    bias="none",
    task_type="SEQ_CLS"
)

bert_model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
bert_lora = get_peft_model(bert_model, lora_config_bert)

trainer_bert = Trainer(
    model=bert_lora,
    args=training_args,
    train_dataset=encoded_dataset["train"].select(range(5000)),
    eval_dataset=encoded_dataset["test"].select(range(1000)),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print("\n=== Training BERT LoRA Fine-Tuning ===")
trainer_bert.train()
print("\n=== Evaluating BERT LoRA ===")
results_bert = trainer_bert.evaluate()
print(results_bert)
bert_lora.save_pretrained("./bert_lora_finetuned")
print("Saved BERT LoRA model to ./bert_lora_finetuned")

# ------------------------------------------------------------
# 6. LoRA Fine-Tuning (DistilBERT)
# ------------------------------------------------------------
lora_config_distil = LoraConfig(
    r=8,
    lora_alpha=32,
    target_modules=["q_lin", "v_lin"],   # correct layer names for DistilBERT
    lora_dropout=0.1,
    bias="none",
    task_type="SEQ_CLS"
)

distil_model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)
distil_lora = get_peft_model(distil_model, lora_config_distil)

trainer_distil = Trainer(
    model=distil_lora,
    args=training_args,
    train_dataset=encoded_dataset["train"].select(range(5000)),
    eval_dataset=encoded_dataset["test"].select(range(1000)),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print("\n=== Training DistilBERT LoRA Fine-Tuning ===")
trainer_distil.train()
print("\n=== Evaluating DistilBERT LoRA ===")
results_distil = trainer_distil.evaluate()
print(results_distil)
distil_lora.save_pretrained("./distilbert_lora_finetuned")
print("Saved DistilBERT LoRA model to ./distilbert_lora_finetuned")

# ------------------------------------------------------------
# 7. Summary
# ------------------------------------------------------------
print("\n=== Summary of LoRA Fine-Tuning Results ===")
print("BERT LoRA:", results_bert)
print("DistilBERT LoRA:", results_distil)


### Prompt Tuning

In [None]:
# ------------------------------------------------------------
# PROMPT TUNING for BERT and DistilBERT (Final Fixed Version)
# ------------------------------------------------------------
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments
)
from peft import PromptTuningConfig, get_peft_model
from datasets import load_from_disk, load_dataset
import evaluate
import numpy as np
import pandas as pd
import torch

# ------------------------------------------------------------
# 1. Load IMDb Dataset
# ------------------------------------------------------------
try:
    dataset = load_from_disk("./clean_imdb_dataset")
    print("Loaded cleaned IMDb dataset from disk.")
except:
    dataset = load_dataset("imdb")
    print("Loaded default IMDb dataset (may contain overlap).")

# ------------------------------------------------------------
# 2. Tokenizer and Encoding
# ------------------------------------------------------------
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def preprocess_function(examples):
    return tokenizer(
        examples["text"],
        truncation=True,
        padding="max_length",
        max_length=256
    )

encoded_dataset = dataset.map(preprocess_function, batched=True)

# ------------------------------------------------------------
# 3. Metrics
# ------------------------------------------------------------
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy.compute(predictions=preds, references=labels)["accuracy"],
        "precision": precision.compute(predictions=preds, references=labels, average="macro")["precision"],
        "recall": recall.compute(predictions=preds, references=labels, average="macro")["recall"],
        "f1": f1.compute(predictions=preds, references=labels, average="macro")["f1"],
    }

# ------------------------------------------------------------
# 4. Training Arguments
# ------------------------------------------------------------
training_args = TrainingArguments(
    output_dir="./results_prompt",
    learning_rate=5e-4,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs_prompt",
    logging_steps=100,
    save_total_limit=2,
    report_to="none",
    fp16=torch.cuda.is_available(),
)

# ------------------------------------------------------------
# 5. Prompt Tuning - BERT
# ------------------------------------------------------------
prompt_config_bert = PromptTuningConfig(
    task_type="SEQ_CLS",
    num_virtual_tokens=20,
    token_dim=768,              # embedding dimension
    num_layers=12,              # BERT has 12 transformer layers
    num_attention_heads=12      # BERT-base has 12 attention heads
)

bert_prompt = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-uncased",
    num_labels=2
)
bert_prompt = get_peft_model(bert_prompt, prompt_config_bert)

trainer_bert = Trainer(
    model=bert_prompt,
    args=training_args,
    train_dataset=encoded_dataset["train"].select(range(5000)),
    eval_dataset=encoded_dataset["test"].select(range(1000)),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print("\nTraining BERT Prompt Tuning")
trainer_bert.train()
print("\nEvaluating BERT Prompt Tuning")
results_bert = trainer_bert.evaluate()
print(results_bert)
bert_prompt.save_pretrained("./bert_prompt_finetuned")
print("Saved BERT Prompt model to ./bert_prompt_finetuned")

# ------------------------------------------------------------
# 6. Prompt Tuning - DistilBERT
# ------------------------------------------------------------
prompt_config_distil = PromptTuningConfig(
    task_type="SEQ_CLS",
    num_virtual_tokens=20,
    token_dim=768,              # embedding dimension
    num_layers=6,               # DistilBERT has 6 layers
    num_attention_heads=12      # DistilBERT also uses 12 heads
)

distil_prompt = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased",
    num_labels=2
)
distil_prompt = get_peft_model(distil_prompt, prompt_config_distil)

trainer_distil = Trainer(
    model=distil_prompt,
    args=training_args,
    train_dataset=encoded_dataset["train"].select(range(5000)),
    eval_dataset=encoded_dataset["test"].select(range(1000)),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print("\nTraining DistilBERT Prompt Tuning")
trainer_distil.train()
print("\nEvaluating DistilBERT Prompt Tuning")
results_distil = trainer_distil.evaluate()
print(results_distil)
distil_prompt.save_pretrained("./distilbert_prompt_finetuned")
print("Saved DistilBERT Prompt model to ./distilbert_prompt_finetuned")

# ------------------------------------------------------------
# 7. Summary
# ------------------------------------------------------------
summary_df = pd.DataFrame([
    {"Model": "BERT Prompt", **results_bert},
    {"Model": "DistilBERT Prompt", **results_distil}
])
print("\nSummary of Prompt Tuning Results")
print(summary_df)
summary_df.to_csv("prompt_tuning_results.csv", index=False)

# ------------------------------------------------------------
# 8. Trainable Parameters
# ------------------------------------------------------------
print("\nTrainable Parameters for BERT Prompt:")
bert_prompt.print_trainable_parameters()

print("\nTrainable Parameters for DistilBERT Prompt:")
distil_prompt.print_trainable_parameters()


### Evaluation

In [None]:
# ============================================================
# SENTIMENT CLASSIFICATION EVALUATION (BERT & DistilBERT Variants)
# ============================================================

import os
import numpy as np
import pandas as pd
import torch
from datasets import load_from_disk
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer
)
from peft import PeftModel
import evaluate
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

# ------------------------------------------------------------
# 1. Load Clean IMDb Dataset
# ------------------------------------------------------------
dataset = load_from_disk("./clean_imdb_dataset")
test_dataset = dataset["test"]

# ------------------------------------------------------------
# 2. Define Evaluation Metrics
# ------------------------------------------------------------
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

def compute_metrics(preds, labels):
    pred_labels = np.argmax(preds, axis=-1)
    return {
        "accuracy": accuracy.compute(predictions=pred_labels, references=labels)["accuracy"],
        "precision": precision.compute(predictions=pred_labels, references=labels, average="macro")["precision"],
        "recall": recall.compute(predictions=pred_labels, references=labels, average="macro")["recall"],
        "f1": f1.compute(predictions=pred_labels, references=labels, average="macro")["f1"],
    }

# ------------------------------------------------------------
# 3. Safe Loader (Handles LoRA / Prompt / Full Models)
# ------------------------------------------------------------
def load_model_safely(model_path):
    adapter_config_path = os.path.join(model_path, "adapter_config.json")
    if os.path.exists(adapter_config_path):
        print(f"Detected adapter at {model_path}. Loading base model...")
        base_name = "distilbert-base-uncased" if "distilbert" in model_path.lower() else "bert-base-uncased"
        tokenizer = AutoTokenizer.from_pretrained(base_name)
        base_model = AutoModelForSequenceClassification.from_pretrained(base_name, num_labels=2)
        model = PeftModel.from_pretrained(base_model, model_path)
    else:
        try:
            tokenizer = AutoTokenizer.from_pretrained(model_path)
        except Exception:
            base_name = "distilbert-base-uncased" if "distilbert" in model_path.lower() else "bert-base-uncased"
            tokenizer = AutoTokenizer.from_pretrained(base_name)
        model = AutoModelForSequenceClassification.from_pretrained(model_path)
    return tokenizer, model

# ------------------------------------------------------------
# 4. Evaluate Model & Compute Confusion Matrix
# ------------------------------------------------------------
def evaluate_model(model_path, max_samples=2000):
    print(f"\n=== Evaluating {model_path} ===")
    tokenizer, model = load_model_safely(model_path)
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)

    def preprocess(examples):
        return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=256)
    
    encoded = test_dataset.select(range(min(max_samples, len(test_dataset)))).map(preprocess, batched=True)
    encoded = encoded.remove_columns(["text"])
    encoded.set_format("torch")

    trainer = Trainer(model=model, tokenizer=tokenizer)
    predictions = trainer.predict(encoded)

    preds = np.argmax(predictions.predictions, axis=-1)
    labels = predictions.label_ids
    metrics = compute_metrics(predictions.predictions, labels)

    cm = confusion_matrix(labels, preds, normalize="true")

    print(f"Metrics for {model_path}: {metrics}")
    return metrics, cm

# ------------------------------------------------------------
# 5. Evaluate All Models
# ------------------------------------------------------------
model_folders = [
    "bert_full_finetuned",
    "bert_lora_finetuned",
    "bert_prompt_finetuned",
    "distilbert_full_finetuned",
    "distilbert_lora_finetuned",
    "distilbert_prompt_finetuned",
]

results, cms = [], {}
for path in model_folders:
    if not os.path.exists(path):
        print(f"Skipping {path} — folder not found.")
        continue
    try:
        metrics, cm = evaluate_model(path)
        results.append({"Model": path, **metrics})
        cms[path] = cm
    except Exception as e:
        print(f"Error evaluating {path}: {e}")

# ------------------------------------------------------------
# 6. Save Quantitative Results
# ------------------------------------------------------------
if results:
    results_df = pd.DataFrame(results)
    results_df = results_df.round(4)
    print("\n=== Evaluation Summary ===")
    print(results_df)
    results_df.to_csv("model_evaluation_summary.csv", index=False)
    print("\nSaved results to model_evaluation_summary.csv")
else:
    print("\nNo models evaluated successfully.")

# ------------------------------------------------------------
# 7. Plot Confusion Matrices
# ------------------------------------------------------------
if cms:
    n = len(cms)
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    for i, (name, cm) in enumerate(cms.items()):
        sns.heatmap(cm, annot=True, fmt=".2f", cmap="Blues", xticklabels=["Neg", "Pos"],
                    yticklabels=["Neg", "Pos"], ax=axes[i])
        axes[i].set_title(name.replace("_", "\n"))
        axes[i].set_xlabel("Predicted Label")
        axes[i].set_ylabel("True Label")
    plt.tight_layout()
    plt.savefig("confusion_matrices_all_models.png")
    plt.show()
    print("\nSaved confusion matrices to confusion_matrices_all_models.png")


### Qualitative analysis examples 

In [None]:
# ==============================================================
# QUALITATIVE ERROR EXAMPLES FROM CONFUSION MATRIX CELLS (ROBUST)
# ==============================================================

import os
import torch
import pandas as pd
import numpy as np
from datasets import load_from_disk
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer
from peft import PeftModel

# --------------------------------------------------------------
# 1. Load Clean IMDb Dataset
# --------------------------------------------------------------
dataset = load_from_disk("./clean_imdb_dataset")
test_dataset = dataset["test"]

# --------------------------------------------------------------
# 2. Safe Model Loader (handles PEFT + fallback)
# --------------------------------------------------------------
def load_model_and_tokenizer(model_path):
    base_name = "distilbert-base-uncased" if "distilbert" in model_path.lower() else "bert-base-uncased"
    tokenizer = AutoTokenizer.from_pretrained(base_name)

    adapter_config_path = os.path.join(model_path, "adapter_config.json")
    if os.path.exists(adapter_config_path):
        try:
            print(f"🔹 Loading PEFT adapter for {model_path} ...")
            base_model = AutoModelForSequenceClassification.from_pretrained(base_name, num_labels=2)
            model = PeftModel.from_pretrained(base_model, model_path)
        except Exception as e:
            print(f" Failed to load PEFT model {model_path}: {e}")
            print("Falling back to standard model load instead.")
            model = AutoModelForSequenceClassification.from_pretrained(base_name, num_labels=2)
    else:
        try:
            print(f"🔹 Loading full fine-tuned model for {model_path} ...")
            model = AutoModelForSequenceClassification.from_pretrained(model_path)
        except Exception as e:
            print(f"Failed to load {model_path} directly: {e}")
            print("Using base pretrained model instead.")
            model = AutoModelForSequenceClassification.from_pretrained(base_name, num_labels=2)

    return tokenizer, model

# --------------------------------------------------------------
# 3. Extract Confusion Examples
# --------------------------------------------------------------
def extract_confusion_examples(model_path, max_samples=1500):
    print(f"\n==================== Evaluating {model_path} ====================")
    tokenizer, model = load_model_and_tokenizer(model_path)
    model.to("cuda" if torch.cuda.is_available() else "cpu")

    def preprocess(examples):
        return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=256)

    encoded = test_dataset.select(range(min(max_samples, len(test_dataset)))).map(preprocess, batched=True)
    encoded.set_format("torch")

    trainer = Trainer(model=model, tokenizer=tokenizer)
    preds_output = trainer.predict(encoded)
    preds = np.argmax(preds_output.predictions, axis=-1)
    labels = preds_output.label_ids

    df = pd.DataFrame({
        "text": test_dataset["text"][:len(preds)],
        "true_label": labels,
        "pred_label": preds
    })

    df["true_sentiment"] = df["true_label"].map({0: "Negative", 1: "Positive"})
    df["pred_sentiment"] = df["pred_label"].map({0: "Negative", 1: "Positive"})

    df["category"] = df.apply(lambda r:
        "True Positive" if (r.true_label==1 and r.pred_label==1) else
        "True Negative" if (r.true_label==0 and r.pred_label==0) else
        "False Positive (Neg→Pos)" if (r.true_label==0 and r.pred_label==1) else
        "False Negative (Pos→Neg)", axis=1)

    errors = df[df["category"].str.contains("False")].reset_index(drop=True)
    out_csv = f"{model_path}_qualitative_errors.csv"
    errors.to_csv(out_csv, index=False)
    print(f"Saved {len(errors)} misclassified examples to {out_csv}")

    # Show sample qualitative errors
    for cat in ["False Positive (Neg→Pos)", "False Negative (Pos→Neg)"]:
        subset = errors[errors["category"] == cat].head(3)
        print(f"\n{cat}:")
        for _, row in subset.iterrows():
            preview = row['text'][:300].replace("\n", " ")
            print(f"- {preview} [...] (true={row['true_sentiment']}, pred={row['pred_sentiment']})")

# --------------------------------------------------------------
# 4. Evaluate All Models (skip missing / broken)
# --------------------------------------------------------------
model_folders = [
    "bert_full_finetuned",
    "bert_lora_finetuned",
    "bert_prompt_finetuned",
    "distilbert_full_finetuned",
    "distilbert_lora_finetuned",
    "distilbert_prompt_finetuned",
]

for model_path in model_folders:
    if os.path.exists(model_path):
        try:
            extract_confusion_examples(model_path)
        except Exception as e:
            print(f"Skipping {model_path} due to error: {e}")
            continue
    else:
        print(f"Model not found: {model_path}")

print("\n==================== Evaluation Complete ====================")


### Toxicity analysis

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer
from peft import PeftModel
from datasets import load_from_disk
import numpy as np
import pandas as pd
import torch
import os

# ------------------------------------------------------------
# 1. Load dataset
# ------------------------------------------------------------
dataset = load_from_disk("./clean_imdb_dataset")
test_dataset = dataset["test"]

# ------------------------------------------------------------
# 2. Evaluation + save predictions
# ------------------------------------------------------------
def evaluate_and_save_predictions(model_path):
    print(f"\nEvaluating {model_path}")

    # Detect base model
    base_name = "distilbert-base-uncased" if "distilbert" in model_path.lower() else "bert-base-uncased"

    # Always load tokenizer from base
    tokenizer = AutoTokenizer.from_pretrained(base_name)

    # Try loading the model
    try:
        model = AutoModelForSequenceClassification.from_pretrained(model_path)
        print("Loaded full fine-tuned model")
    except OSError:
        print(f"Adapter detected — loading base model '{base_name}' and merging adapter from '{model_path}'")
        base_model = AutoModelForSequenceClassification.from_pretrained(base_name, num_labels=2)
        model = PeftModel.from_pretrained(base_model, model_path)
        model = model.merge_and_unload()  # optional: merge adapter into base model for evaluation

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)

    # Tokenize test set
    def preprocess_function(examples):
        return tokenizer(
            examples["text"],
            truncation=True,
            padding="max_length",
            max_length=256
        )

    tokenized_test = test_dataset.map(preprocess_function, batched=True)
    tokenized_test = tokenized_test.remove_columns(["text"])
    tokenized_test.set_format("torch")

    # Run inference
    trainer = Trainer(model=model, tokenizer=tokenizer)
    predictions = trainer.predict(tokenized_test)

    preds = np.argmax(predictions.predictions, axis=-1)
    labels = predictions.label_ids

    # Save to CSV
    df = pd.DataFrame({
        "text": test_dataset["text"],
        "true_label": labels,
        "pred_label": preds
    })
    df["true_sentiment"] = df["true_label"].map({0: "Negative", 1: "Positive"})
    df["pred_sentiment"] = df["pred_label"].map({0: "Negative", 1: "Positive"})

    out_path = f"{model_path}_predictions.csv"
    df.to_csv(out_path, index=False)
    print(f"Saved predictions to {out_path} ({len(df)} samples)")
    return df


# ------------------------------------------------------------
# 3. Evaluate all models
# ------------------------------------------------------------
model_folders = [
    "bert_lora_finetuned",
    "bert_prompt_finetuned",
    "distilbert_full_finetuned",
    "distilbert_lora_finetuned",
    "distilbert_prompt_finetuned",
]

for path in model_folders:
    if os.path.exists(path):
        try:
            evaluate_and_save_predictions(path)
        except Exception as e:
            print(f"Error evaluating {path}: {e}")
    else:
        print(f"⚠️ Model not found: {path}")


In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer
from peft import PeftModel
from datasets import load_from_disk
import numpy as np
import pandas as pd
import torch, os, json

# ------------------------------------------------------------
# 1. Load dataset
# ------------------------------------------------------------
dataset = load_from_disk("./clean_imdb_dataset")
test_dataset = dataset["test"]

# ------------------------------------------------------------
# 2. Helper: sanitize adapter_config.json (removes ANY invalid keys)
# ------------------------------------------------------------
def sanitize_adapter_config(model_path):
    config_path = os.path.join(model_path, "adapter_config.json")
    if not os.path.exists(config_path):
        return
    try:
        with open(config_path, "r") as f:
            config = json.load(f)

        valid_keys = {
            "base_model_name_or_path",
            "peft_type",
            "r",
            "lora_alpha",
            "target_modules",
            "lora_dropout",
            "bias",
            "task_type",
            "inference_mode",
            "num_virtual_tokens",
            "prompt_tuning_init",
            "token_dim",
            "num_transformer_submodules",
            "encoder_hidden_size",
        }

        modified = False
        to_remove = [k for k in config.keys() if k not in valid_keys]

        if to_remove:
            print(f"🧹 Removing unsupported keys from {config_path}: {to_remove}")
            for k in to_remove:
                config.pop(k, None)
            modified = True

        if "peft_type" not in config:
            if "lora" in model_path.lower():
                config["peft_type"] = "LORA"
            elif "prompt" in model_path.lower():
                config["peft_type"] = "PROMPT_TUNING"
            modified = True

        if modified:
            with open(config_path, "w") as f:
                json.dump(config, f, indent=2)
            print(f"✅ Sanitized adapter config for {model_path}")

    except Exception as e:
        print(f"Could not sanitize {config_path}: {e}")

# ------------------------------------------------------------
# 3. Evaluation function
# ------------------------------------------------------------
def evaluate_and_save_predictions(model_path):
    print(f"\nEvaluating {model_path}")
    sanitize_adapter_config(model_path)

    base_name = "distilbert-base-uncased" if "distilbert" in model_path.lower() else "bert-base-uncased"
    tokenizer = AutoTokenizer.from_pretrained(base_name)

    try:
        print("⚙️ Loading base model and applying PEFT adapter...")
        base_model = AutoModelForSequenceClassification.from_pretrained(base_name, num_labels=2)
        model = PeftModel.from_pretrained(base_model, model_path)

        # Merge for LoRA, skip for Prompt
        try:
            model = model.merge_and_unload()
        except Exception:
            print("ℹmerge_and_unload skipped (likely Prompt tuning)")

    except Exception as e:
        print(f"Failed to load adapter for {model_path}: {e}")
        return None

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    model.eval()

    def preprocess_function(examples):
        return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=256)

    tokenized_test = test_dataset.map(preprocess_function, batched=True)
    tokenized_test = tokenized_test.remove_columns(["text"])
    tokenized_test.set_format("torch")

    trainer = Trainer(model=model, tokenizer=tokenizer)
    predictions = trainer.predict(tokenized_test)
    preds = np.argmax(predictions.predictions, axis=-1)
    labels = predictions.label_ids

    df = pd.DataFrame({
        "text": test_dataset["text"],
        "true_label": labels,
        "pred_label": preds
    })
    df["true_sentiment"] = df["true_label"].map({0: "Negative", 1: "Positive"})
    df["pred_sentiment"] = df["pred_label"].map({0: "Negative", 1: "Positive"})

    out_path = f"{model_path}_predictions.csv"
    df.to_csv(out_path, index=False)
    print(f"Saved predictions to {out_path} ({len(df)} samples)")
    return df

# ------------------------------------------------------------
# 4. Evaluate all models
# ------------------------------------------------------------
model_folders = [
    "bert_lora_finetuned",
    "bert_prompt_finetuned",
    "distilbert_lora_finetuned",
    "distilbert_prompt_finetuned",
]

for path in model_folders:
    if os.path.exists(path):
        try:
            evaluate_and_save_predictions(path)
        except Exception as e:
            print(f"Error evaluating {path}: {e}")
    else:
        print(f"Model not found: {path}")


In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer
from peft import PeftModel
from datasets import load_from_disk
import numpy as np
import pandas as pd
import torch, os, json

# ------------------------------------------------------------
# 1. Load dataset
# ------------------------------------------------------------
dataset = load_from_disk("./clean_imdb_dataset")
test_dataset = dataset["test"]

# ------------------------------------------------------------
# 2. Helper: sanitize adapter_config.json
# ------------------------------------------------------------
def sanitize_adapter_config(model_path):
    config_path = os.path.join(model_path, "adapter_config.json")
    if not os.path.exists(config_path):
        return
    try:
        with open(config_path, "r") as f:
            config = json.load(f)

        valid_keys = {
            "base_model_name_or_path",
            "peft_type",
            "r",
            "lora_alpha",
            "target_modules",
            "lora_dropout",
            "bias",
            "task_type",
            "inference_mode",
            "num_virtual_tokens",
            "prompt_tuning_init",
            "token_dim",
            "num_transformer_submodules",
            "encoder_hidden_size",
            "num_layers",
            "num_attention_heads",
        }

        modified = False
        to_remove = [k for k in config.keys() if k not in valid_keys]
        if to_remove:
            print(f"🧹 Removing unsupported keys from {config_path}: {to_remove}")
            for k in to_remove:
                config.pop(k, None)
            modified = True

        if "peft_type" not in config:
            if "lora" in model_path.lower():
                config["peft_type"] = "LORA"
            elif "prompt" in model_path.lower():
                config["peft_type"] = "PROMPT_TUNING"
            modified = True

        # --- Auto-patch DistilBERT prompt tuning missing keys ---
        if "distilbert" in model_path.lower() and "prompt" in model_path.lower():
            if "num_layers" not in config:
                config["num_layers"] = 6        # DistilBERT = 6 layers
                print(f"Added num_layers=6 for DistilBERT prompt tuning in {model_path}")
                modified = True
            if "num_attention_heads" not in config:
                config["num_attention_heads"] = 12   # DistilBERT = 12 heads
                print(f"Added num_attention_heads=12 for DistilBERT prompt tuning in {model_path}")
                modified = True

        if modified:
            with open(config_path, "w") as f:
                json.dump(config, f, indent=2)
            print(f"Sanitized adapter config for {model_path}")

    except Exception as e:
        print(f"Could not sanitize {config_path}: {e}")

# ------------------------------------------------------------
# 3. Evaluation function
# ------------------------------------------------------------
def evaluate_and_save_predictions(model_path):
    print(f"\nEvaluating {model_path}")
    sanitize_adapter_config(model_path)

    base_name = "distilbert-base-uncased" if "distilbert" in model_path.lower() else "bert-base-uncased"
    tokenizer = AutoTokenizer.from_pretrained(base_name)

    try:
        print("Loading base model and applying PEFT adapter...")
        base_model = AutoModelForSequenceClassification.from_pretrained(base_name, num_labels=2)
        model = PeftModel.from_pretrained(base_model, model_path)

        try:
            model = model.merge_and_unload()
        except Exception:
            print("merge_and_unload skipped (likely Prompt tuning)")

    except Exception as e:
        print(f"Failed to load adapter for {model_path}: {e}")
        return None

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    model.eval()

    def preprocess_function(examples):
        return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=256)

    tokenized_test = test_dataset.map(preprocess_function, batched=True)
    tokenized_test = tokenized_test.remove_columns(["text"])
    tokenized_test.set_format("torch")

    trainer = Trainer(model=model, tokenizer=tokenizer)
    predictions = trainer.predict(tokenized_test)
    preds = np.argmax(predictions.predictions, axis=-1)
    labels = predictions.label_ids

    df = pd.DataFrame({
        "text": test_dataset["text"],
        "true_label": labels,
        "pred_label": preds
    })
    df["true_sentiment"] = df["true_label"].map({0: "Negative", 1: "Positive"})
    df["pred_sentiment"] = df["pred_label"].map({0: "Negative", 1: "Positive"})

    out_path = f"{model_path}_predictions.csv"
    df.to_csv(out_path, index=False)
    print(f"Saved predictions to {out_path} ({len(df)} samples)")
    return df

# ------------------------------------------------------------
# 4. Evaluate all models
# ------------------------------------------------------------
model_folders = [
    "distilbert_prompt_finetuned",
]

for path in model_folders:
    if os.path.exists(path):
        try:
            evaluate_and_save_predictions(path)
        except Exception as e:
            print(f"Error evaluating {path}: {e}")
    else:
        print(f"⚠️ Model not found: {path}")


In [None]:
# ------------------------------------------------------------
# Toxicity Analysis for Multiple Models (Clean Version)
# ------------------------------------------------------------
from transformers import pipeline
from tqdm import tqdm
import pandas as pd
import numpy as np
import os

# ------------------------------------------------------------
# 1. Setup
# ------------------------------------------------------------
MODEL_FILES = [
    "bert_full_finetuned_predictions.csv",
    "distilbert_full_finetuned_predictions.csv",
    "bert_lora_finetuned_predictions.csv",
    "distilbert_lora_finetuned_predictions.csv",
    "bert_prompt_finetuned_predictions.csv",
    "distilbert_prompt_finetuned_predictions.csv"
]

# Filter only existing files
MODEL_FILES = [f for f in MODEL_FILES if os.path.exists(f)]
assert MODEL_FILES, "No prediction CSVs found in directory."

print(f"Found {len(MODEL_FILES)} prediction files to analyze.")

# ------------------------------------------------------------
# 2. Load Toxicity Model
# ------------------------------------------------------------
print("\nLoading Toxicity Classifier (unitary/toxic-bert)...")
toxicity_pipe = pipeline(
    "text-classification",
    model="unitary/toxic-bert",
    truncation=True,
    max_length=512
)

def compute_toxicity(text):
    try:
        return toxicity_pipe(text)[0]["score"]
    except Exception:
        return np.nan

# ------------------------------------------------------------
# 3. Run Toxicity Analysis for Each Model
# ------------------------------------------------------------
summary = []

for file in MODEL_FILES:
    print(f"\nAnalyzing {file}")
    df = pd.read_csv(file)
    if "text" not in df.columns:
        print(f"Skipping {file} — missing 'text' column")
        continue

    tqdm.pandas(desc=f"Toxicity for {file}")
    df["toxicity_score"] = df["text"].progress_apply(compute_toxicity)

    # Compute overall and sentiment-based stats
    avg_toxicity = df["toxicity_score"].mean()
    pos_tox = df.loc[df["pred_sentiment"] == "Positive", "toxicity_score"].mean()
    neg_tox = df.loc[df["pred_sentiment"] == "Negative", "toxicity_score"].mean()

    summary.append({
        "Model": file.replace("_predictions.csv", ""),
        "Avg_Toxicity": round(avg_toxicity, 4),
        "Positive_Toxicity": round(pos_tox, 4),
        "Negative_Toxicity": round(neg_tox, 4),
    })

    # Save individual results
    out_file = file.replace(".csv", "_with_toxicity.csv")
    df.to_csv(out_file, index=False)
    print(f"Saved {out_file}")

# ------------------------------------------------------------
# 4. Create Summary Table
# ------------------------------------------------------------
summary_df = pd.DataFrame(summary).sort_values("Avg_Toxicity", ascending=False)
summary_df.to_csv("toxicity_comparison_summary.csv", index=False)

print("\nToxicity Summary Across Models:")
print(summary_df)

print("\nSaved: toxicity_comparison_summary.csv")
