In [None]:
from google.colab import drive
drive.mount('/content/drive')

!git clone https://github.com/atremante26/nlp_final_project.git
%cd nlp_final_project

In [None]:
!pip -q install captum gensim

In [None]:
# IMPORTS
import os, random, sys, re
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from transformers import AutoTokenizer
import torch
from google.colab import files

In [None]:
# SET SEED
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: ", device)

In [None]:
# IMPORTS
# Define project path
project_path = '/content/nlp_final_project'
sys.path.insert(0, project_path)

# Import model
from models import MultiTaskRoBERTa, MultiTaskDataset

sys.path.insert(0, os.path.join(project_path, 'src'))
from data_preprocessing import load_multi_task_data
import data_preprocessing
data_preprocessing.DATA_PATH = '/content/nlp_final_project/data/processed/dices_350_binary.csv'
sys.path.remove(os.path.join(project_path, 'src'))

sys.path.insert(0, os.path.join(project_path, 'explainability'))
from models import compute_integrated_gradients

In [None]:
# LOAD DATA
splits = load_multi_task_data(balance=False)

train_df = splits["train"].reset_index(drop=True)
val_df   = splits["val"].reset_index(drop=True)
test_df  = splits["test"].reset_index(drop=True)

print("Train:", len(train_df), "Val:", len(val_df), "Test:", len(test_df))
test_df.head()

In [None]:
# UPLOAD MODEL + INITIALIZE TOKENIZER / MODEL

# Upload checkpoint file
uploaded = files.upload()

CKPT_PATH = list(uploaded.keys())[0]

# Load checkpoint
ckpt = torch.load(CKPT_PATH, map_location=device)

# Extract metadata
TASKS = ckpt["tasks"]        
MODEL_NAME = ckpt["model_name"]

print("Model:", MODEL_NAME)
print("Tasks:", TASKS)

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Initialize model
model = MultiTaskRoBERTa(
    model_name=MODEL_NAME,
    tasks=TASKS
).to(device)

# Load weights
model.load_state_dict(ckpt["model_state_dict"])
model.eval()

In [None]:
# IG SETUP
TASK = "Q2_harmful"
LABEL_COL = "Q2_harmful_binary"
MAX_LEN_IG = 128      
N_STEPS = 25          
THRESH = 0.5   
K_TOP = 10    

In [None]:
# TOKEN FILTERING HELPERS
# Collect all special tokens from the tokenizer to filter them out later
SPECIAL_TOKENS = {
    t for t in [
        tokenizer.pad_token, tokenizer.cls_token, tokenizer.sep_token,
        tokenizer.bos_token, tokenizer.eos_token, tokenizer.unk_token
    ]
    if t is not None
}
# Regex to identify pure punctuation tokens
punct_re = re.compile(r"^\W+$")

def keep_token(tok: str) -> bool:
    """
    Decide whether to include a token in attribution summaries
    Filters out special tokens, pure punctuation, and empty tokens
    """
    if tok in SPECIAL_TOKENS:
        return False

    # RoBERTa uses "Ġ" as a marker for a leading space
    t = tok.replace("Ġ", "").strip()

    if t == "":
        return False
    if punct_re.match(t):
        return False

    return True

def top_tokens(tokens, attrs, k=10):
    """
    Return top-k positive and top-k negative tokens by attribution score.
    """
    toks = np.array(tokens)
    att  = np.array(attrs, dtype=float)

    # Filter out special tokens and punctuation
    mask = np.array([keep_token(t) for t in toks], dtype=bool)
    toks = toks[mask]
    att  = att[mask]

    if len(toks) == 0:
        return [], []

    # Sort: highest attribution = most positive influence on "unsafe"
    top_pos = toks[np.argsort(-att)[:k]]
    top_neg = toks[np.argsort(att)[:k]]

    # Make RoBERTa tokens more readable (restore whitespace marker)
    top_pos = [t.replace("Ġ", " ") for t in top_pos]
    top_neg = [t.replace("Ġ", " ") for t in top_neg]

    return top_pos, top_neg

In [None]:
@torch.no_grad()
def score_task_probs(df, batch_size=32):
    """
    Compute sigmoid probabilities for TASK across a dataframe of texts
    """
    probs = []

    for i in tqdm(range(0, len(df), batch_size), desc=f"Scoring {TASK}"):
        # Process in batches for efficiency
        texts = df["text"].iloc[i:i+batch_size].tolist()

        # Tokenize batch
        enc = tokenizer(
            texts,
            return_tensors="pt",
            truncation=True,
            padding=True,
            max_length=MAX_LEN_IG
        )

        # Get model predictions for this task
        logits = model(
            input_ids=enc["input_ids"].to(device),
            attention_mask=enc["attention_mask"].to(device)
        )[TASK] # Extract Q2_harmful logits from multitask output

        # Convert logits to probabilities
        p = torch.sigmoid(logits).squeeze(-1).cpu().numpy()
        probs.extend(p.tolist())

    # Add predictions to dataframe
    out_df = df.copy().reset_index(drop=True)
    out_df["q2_prob"] = probs
    out_df["q2_pred"] = (out_df["q2_prob"] >= THRESH).astype(int)

    return out_df

test_scored = score_task_probs(test_df)
test_scored[[LABEL_COL, "q2_prob", "q2_pred"]].head()

In [None]:
# PICK EXAMPLES (6)
# Pick 3 most confident "harmful" predictions (highest probabilities)
pos_idx = test_scored.sort_values("q2_prob", ascending=False).head(3).index.tolist()

# Pick 3 most confident "safe" predictions (lowest probabilities)
neg_idx = test_scored.sort_values("q2_prob", ascending=True).head(3).index.tolist()

# Combine into single list for analysis
chosen_idxs = pos_idx + neg_idx

print("Chosen examples (most confident predictions):")
display(test_scored.loc[chosen_idxs, [LABEL_COL, "q2_prob", "q2_pred"]])

In [None]:
# RUN IG
rows = []

for idx in tqdm(chosen_idxs):
    # Extract example info
    text  = test_scored.loc[idx, "text"]
    y_true = int(test_scored.loc[idx, LABEL_COL])
    prob   = float(test_scored.loc[idx, "q2_prob"])
    pred   = int(test_scored.loc[idx, "q2_pred"])

    # Compute IG attributions at the embedding layer
    out = compute_integrated_gradients(
        text=text,
        model=model,
        tokenizer=tokenizer,
        task=TASK,
        n_steps=N_STEPS,
        device=str(device),
        max_length=MAX_LEN_IG
    )

    tokens = out["tokens"]

    # Get attribution scores 
    attrs = out["raw_attributions"] if "raw_attributions" in out else out["attributions"]

    # Convergence delta measures IG approximation quality (<0.05)
    delta = float(out["convergence_delta"])

    # Extract top influential tokens
    top_pos, top_neg = top_tokens(tokens, attrs, k=K_TOP)

    # Store results for analysis
    rows.append({
        "idx": idx,
        "Q2_true": y_true,
        "Q2_pred": pred,
        "Q2_prob": prob,
        "convergence_delta": delta,
        "top_pos_tokens": ", ".join(top_pos),
        "top_neg_tokens": ", ".join(top_neg),
        "text_snippet": text[:300] + ("..." if len(text) > 300 else "")
    })

ig_df = pd.DataFrame(rows).sort_values("Q2_prob", ascending=False)
ig_df.head()

In [None]:
ig_df.to_csv("results/ig/ig_results_q2_selected.csv", index=False)
print("Saved: ig_results_q2_selected.csv")