<a href="https://colab.research.google.com/github/2303A51529/NLP-LAB/blob/main/NLP_F_7_11_2025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import os
import random
import math
import sys
import subprocess
import inspect
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# NLP / modeling
import nltk
from nltk.corpus import stopwords

# Ensure stopwords
nltk.download('stopwords', quiet=True)

import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.utils import resample

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
)
from transformers import logging as transformers_logging
transformers_logging.set_verbosity_error()

# --- LIME import with auto-install fallback ---
try:
    from lime.lime_text import LimeTextExplainer
except Exception:
    print("LIME not found — attempting to install 'lime' package via pip...")
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "lime"], stdout=subprocess.DEVNULL)
        from lime.lime_text import LimeTextExplainer
        print("Installed lime successfully.")
    except Exception as e:
        print("Automatic installation of 'lime' failed. Please install manually: pip install lime")
        raise e

# --------------------------- Config ---------------------------
DATA_PATH = "reviews.csv"        # expected columns: 'review' and 'label' (labels: -2,-1,0,1,2)
SYNTHETIC_SIZE_PER_CLASS = 800   # when generating synthetic data (per label)
RANDOM_SEED = 42
MODEL_NAME = "distilbert-base-uncased"
NUM_LABELS = 5                   # [-2,-1,0,1,2]
BATCH_SIZE = 16
EPOCHS = 2                       # demo-friendly; increase for real training
LR = 2e-5
OUTPUT_DIR = "hf_sentiment_model"
MAX_LENGTH = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# --------------------------------------------------------------

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

label_map = {-2:0, -1:1, 0:2, 1:3, 2:4}       # map original labels to [0..4]
inv_label_map = {v:k for k,v in label_map.items()}
human_names = {
    -2: "Very Negative",
    -1: "Negative",
     0: "Neutral",
     1: "Positive",
     2: "Very Positive"
}

# ----------------- Lightweight regex tokenizer (no punkt) -----------------
import re
_token_re = re.compile(r"[A-Za-z0-9]+(?:'[A-Za-z0-9]+)?|[^\sA-Za-z0-9]")

def regex_tokenize(text):
    if not isinstance(text, str):
        text = str(text)
    return _token_re.findall(text)

# --------------------------- Task 1: Data preparation ---------------------------
def load_or_create_reviews(path=DATA_PATH):
    if os.path.exists(path):
        df = pd.read_csv(path)
        if 'review' not in df.columns or 'label' not in df.columns:
            raise ValueError("CSV must contain 'review' and 'label' columns.")
        df = df[df['label'].isin(label_map.keys())].copy()
        df = df[['review', 'label']].dropna().reset_index(drop=True)
        print(f"Loaded {len(df)} reviews from {path}")
        return df
    # synthetic dataset
    total = SYNTHETIC_SIZE_PER_CLASS * len(label_map)
    print(f"File {path} not found — generating synthetic dataset with {total} rows.")
    pos_phrases = [
        "I love this product, absolutely fantastic experience.",
        "Works great, exceeded my expectations.",
        "High quality and very satisfied with the purchase.",
        "Excellent value and quick delivery.",
        "Amazing! I will buy again."
    ]
    neg_phrases = [
        "Terrible quality, broke after one use.",
        "Extremely disappointed, waste of money.",
        "Did not work as advertised, very poor.",
        "Horrible customer service, will not buy again.",
        "Product arrived damaged and unusable."
    ]
    neutral_phrases = [
        "The product is okay, nothing special.",
        "Average item, does the job.",
        "Received as described, no surprises.",
        "It works, but there are better alternatives.",
        "Neutral feelings about this purchase."
    ]
    data = []
    for label in [-2,-1,0,1,2]:
        for i in range(SYNTHETIC_SIZE_PER_CLASS):
            if label >= 1:
                text = random.choice(pos_phrases) + " " + random.choice(["Fast shipping.", "Good packaging.", "Expected quality."])
            elif label <= -1:
                text = random.choice(neg_phrases) + " " + random.choice(["Late delivery.", "Missing parts.", "Poor packaging."])
            else:
                text = random.choice(neutral_phrases) + " " + random.choice(["Okay for the price.", "Satisfactory.", "No major issues."])
            data.append({'review': text, 'label': label})
    df = pd.DataFrame(data)
    df = df.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
    print(f"Synthetic dataset created: {len(df)} rows.")
    return df

# get stopwords
nltk.download('stopwords', quiet=True)
from nltk.corpus import stopwords
stop_words = set(stopwords.words("english"))

def preprocess_text(text):
    """
    Lowercase, remove punctuation-like tokens and stopwords using regex_tokenize (no punkt).
    Returns cleaned text string.
    """
    text = str(text).lower()
    toks = regex_tokenize(text)
    toks = [t for t in toks if any(c.isalnum() for c in t)]
    toks = [t for t in toks if t not in stop_words]
    return " ".join(toks)

# Load and preprocess
df = load_or_create_reviews(DATA_PATH)
df['clean'] = df['review'].apply(preprocess_text)

# Balance classes: ensure equal class sizes by downsampling majority classes
counts = df['label'].value_counts().to_dict()
min_count = min(counts.values())
print("Class counts before balancing:", counts)
balanced = []
for lab in sorted(label_map.keys()):
    subset = df[df['label'] == lab]
    if len(subset) > min_count:
        subset = subset.sample(min_count, random_state=RANDOM_SEED)
    balanced.append(subset)
df_bal = pd.concat(balanced).sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
print("Class counts after balancing:", df_bal['label'].value_counts().to_dict())

# Map labels to 0..4
df_bal['label_id'] = df_bal['label'].map(label_map)

# Train/test split stratified
train_df, test_df = train_test_split(df_bal, test_size=0.2, random_state=RANDOM_SEED, stratify=df_bal['label_id'])
print(f"Train size: {len(train_df)}, Test size: {len(test_df)}")

# --------------------------- Task 2: Transformer fine-tune ---------------------------
print("Loading tokenizer and model:", MODEL_NAME)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS)

class HFReviewsDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=MAX_LENGTH):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self): return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        enc = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_length)
        item = {k: torch.tensor(v) for k,v in enc.items()}
        item['labels'] = torch.tensor(int(self.labels[idx]))
        return item

train_dataset = HFReviewsDataset(train_df['review'].tolist(), train_df['label_id'].tolist(), tokenizer)
test_dataset = HFReviewsDataset(test_df['review'].tolist(), test_df['label_id'].tolist(), tokenizer)

data_collator = DataCollatorWithPadding(tokenizer)

def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    acc = (preds == labels).mean()
    return {"accuracy": acc}

# Build TrainingArguments robustly: detect supported kwargs and ensure eval/save strategies match
requested_args = dict(
    output_dir=OUTPUT_DIR,
    num_train_epochs=EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=LR,
    logging_dir=os.path.join(OUTPUT_DIR, "logs"),
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    seed=RANDOM_SEED,
    fp16=torch.cuda.is_available()
)

# Inspect TrainingArguments signature and filter keys
ta_init_params = inspect.signature(TrainingArguments.__init__).parameters
allowed_keys = set(ta_init_params.keys())
allowed_keys.discard('self')

filtered_args = {k:v for k,v in requested_args.items() if k in allowed_keys}

# Ensure evaluation_strategy and save_strategy match when load_best_model_at_end=True
# If both keys supported, set both to 'epoch' (already requested). If one or both missing, disable load_best_model_at_end.
if 'load_best_model_at_end' in filtered_args and filtered_args.get('load_best_model_at_end', False):
    eval_supported = 'evaluation_strategy' in filtered_args
    save_supported = 'save_strategy' in filtered_args
    # Some versions use 'save_strategy' name but allowed_keys may have 'save_strategy' or 'save_steps' etc.
    if eval_supported and save_supported:
        # OK — they are present (we already set them to 'epoch' in requested_args)
        pass
    else:
        # Cannot guarantee matching strategies; disable loading best model
        print("Warning: training arguments in this transformers version do not support both evaluation/save strategies.")
        print("Disabling load_best_model_at_end to maintain compatibility.")
        filtered_args['load_best_model_at_end'] = False
        # Also remove metric_for_best_model if present
        if 'metric_for_best_model' in filtered_args:
            filtered_args.pop('metric_for_best_model', None)

# Final TrainingArguments
training_args = TrainingArguments(**filtered_args)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

print("Starting training (this may take a while)...")
trainer.train()
print("Training finished. Running evaluation on test set...")
eval_out = trainer.predict(test_dataset)
y_true = test_df['label_id'].to_numpy()
y_pred = np.argmax(eval_out.predictions, axis=1)

print("Classification report (test):")
print(classification_report(y_true, y_pred, target_names=[human_names[inv_label_map[i]] for i in range(NUM_LABELS)]))

# Confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=list(range(NUM_LABELS)))
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=[human_names[inv_label_map[i]] for i in range(NUM_LABELS)],
            yticklabels=[human_names[inv_label_map[i]] for i in range(NUM_LABELS)])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix")
plt.tight_layout()
plt.savefig("confusion_matrix.png", dpi=200)
plt.show()

# Which sentiment class is most often misclassified? (lowest recall)
report = classification_report(y_true, y_pred, output_dict=True)
per_class_recall = {int(k):v['recall'] for k,v in report.items() if k.isdigit()}
worst_class_id = min(per_class_recall.items(), key=lambda x: x[1])[0]
print(f"Most often misclassified class (lowest recall): id={worst_class_id}, label={human_names[inv_label_map[worst_class_id]]}, recall={per_class_recall[worst_class_id]:.3f}")

# --------------------------- Task 3: Explainable AI with LIME ---------------------------
label_names = [human_names[inv_label_map[i]] for i in range(NUM_LABELS)]

def predict_proba_for_lime(texts):
    enc = tokenizer(texts, truncation=True, padding=True, max_length=MAX_LENGTH, return_tensors='pt')
    enc = {k: v.to(DEVICE) for k,v in enc.items()}
    model.to(DEVICE)
    model.eval()
    with torch.no_grad():
        logits = model(**enc).logits
        probs = torch.softmax(logits, dim=-1).cpu().numpy()
    return probs

explainer = LimeTextExplainer(class_names=label_names)

# Choose examples from test set: prefer mismatches
mismatch_idxs = [i for i,(t,p) in enumerate(zip(y_true, y_pred)) if t != p]
selected = []
selected += mismatch_idxs[:2]
for i in range(len(y_true)):
    if i not in selected and y_true[i] == y_pred[i]:
        selected.append(i)
        break
selected = [i for i in selected if i < len(test_df)]
print("LIME will explain these test indices (in test_df):", selected)

explanations = {}
for si in selected:
    raw_text = test_df.iloc[si]['review']
    print("\n--- Review (index {}):".format(si))
    print(raw_text)
    print("True label:", human_names[inv_label_map[test_df.iloc[si]['label_id']]])
    print("Model predicted:", human_names[inv_label_map[y_pred[si]]])
    exp = explainer.explain_instance(raw_text, predict_proba_for_lime, num_features=8, labels=[0,1,2,3,4])
    pred_label_idx = int(y_pred[si])
    true_label_idx = int(test_df.iloc[si]['label_id'])
    print("\nTop contributing features for predicted label ({}):".format(human_names[inv_label_map[pred_label_idx]]))
    for feat, weight in exp.as_list(label=pred_label_idx)[:8]:
        print(f"  {feat:30s} -> {weight:+.4f}")
    print("\nTop contributing features for true label ({}):".format(human_names[inv_label_map[true_label_idx]]))
    for feat, weight in exp.as_list(label=true_label_idx)[:8]:
        print(f"  {feat:30s} -> {weight:+.4f}")
    explanations[si] = exp

# Save explanations
import pickle
with open("lime_explanations.pkl", "wb") as f:
    pickle.dump(explanations, f)

# Discussion
print("\n--- Discussion: How LIME explanations help businesses ---")
print("""
1) Word-level insight: LIME highlights which words/phrases positively or negatively contributed to a sentiment prediction.
2) Root cause analysis: Clusters of negative contributions around shipping, packaging, or a specific feature point to operational issues.
3) Product improvement: If certain desirable properties (e.g., 'durable') are consistently associated with positive predictions,
   they can be emphasized in marketing or design.
4) Trust & auditing: Explainers provide transparency to stakeholders and let product teams validate model predictions.
5) Actionable monitoring: Create dashboards showing top negative contributing tokens over time to detect emerging problems early.
""")

print("\nScript finished. Outputs saved:")
print(" - confusion_matrix.png")
print(" - lime_explanations.pkl")
print(" - trained HF model in:", OUTPUT_DIR)


File reviews.csv not found — generating synthetic dataset with 4000 rows.
Synthetic dataset created: 4000 rows.
Class counts before balancing: {-2: 800, 2: 800, 1: 800, 0: 800, -1: 800}
Class counts after balancing: {-2: 800, 2: 800, 1: 800, 0: 800, -1: 800}
Train size: 3200, Test size: 800
Loading tokenizer and model: distilbert-base-uncased
Disabling load_best_model_at_end to maintain compatibility.
Starting training (this may take a while)...


  trainer = Trainer(
[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize?ref=models
[34m[1mwandb[0m: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: Paste an API key from your profile and hit enter: