In [None]:
# ============================
#  Setup: install packages
# ============================
!pip install -q torch transformers peft accelerate bitsandbytes scikit-learn nltk

import nltk
nltk.download('punkt')

# ============================
#  Clone your GitHub repo
# ============================
# TODO: replace with your actual repo URL (HTTPS)
REPO_URL = "https://github.com/YOUR_USERNAME/YOUR_REPO_NAME.git"

!git clone {REPO_URL}

# If your repo name is different, change this line accordingly:
%cd YOUR_REPO_NAME


In [None]:
import os
import json
import random
from pathlib import Path

import torch
from transformers import AutoTokenizer, BitsAndBytesConfig
from peft import AutoPeftModelForCausalLM

MODEL_ID = "Iftakhar/deepseek-phi-adapter"


def get_repo_root() -> Path:
    # We are already cd-ed into repo root in Colab
    return Path(os.getcwd())


def get_data_path() -> Path:
    return get_repo_root() / "data" / "phi_data.jsonl"


def load_data(path: Path, max_samples: int | None = None):
    data = []
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            j = json.loads(line)
            data.append(
                {
                    "instruction": j.get("input", ""),
                    "ground_truth": j.get("output", {}).get("redacted_text", ""),
                    "raw": j,
                }
            )
    if max_samples is not None and len(data) > max_samples:
        data = random.sample(data, max_samples)
    return data


def load_model_and_tokenizer():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")

    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
    )

    tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    if "<END>" not in tokenizer.get_vocab():
        tokenizer.add_special_tokens({"additional_special_tokens": ["<END>"]})
    end_id = tokenizer.convert_tokens_to_ids("<END>") if "<END>" in tokenizer.get_vocab() else tokenizer.eos_token_id

    model = AutoPeftModelForCausalLM.from_pretrained(
        MODEL_ID,
        device_map="auto",
        torch_dtype=torch.bfloat16,
        quantization_config=bnb_config,
        trust_remote_code=True,
    )
    model.resize_token_embeddings(len(tokenizer))
    model.eval()

    return model, tokenizer, device, end_id


def generate_redaction(model, tokenizer, device, end_id, instruction: str) -> str:
    prompt = f"### Instruction:\n{instruction}\n\n### Output:\n"
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    gen_kwargs = dict(
        max_new_tokens=384,
        temperature=0.0,
        top_p=0.9,
        eos_token_id=end_id,
    )
    with torch.no_grad():
        out = model.generate(**inputs, **gen_kwargs)

    decoded = tokenizer.decode(out[0], skip_special_tokens=True)
    if "### Output:" in decoded:
        decoded = decoded.split("### Output:", 1)[1]
    if "<END>" in decoded:
        decoded = decoded.split("<END>", 1)[0]
    return decoded.strip()


# ---------- RUN DEMO ----------
data_path = get_data_path()
if not data_path.exists():
    raise FileNotFoundError(f"Expected data file at: {data_path}")

print(f" Loading data from: {data_path}")
samples = load_data(data_path, max_samples=5)  # change 5 -> any number you like

print(f" Loading model from Hugging Face: {MODEL_ID}")
model, tokenizer, device, end_id = load_model_and_tokenizer()

print("\n=================  DEMO OUTPUTS =================")
for idx, sample in enumerate(samples, start=1):
    print("=" * 100)
    print(f" SAMPLE {idx}")
    print(" Instruction:")
    print(sample["instruction"])
    print("\n  Generating redacted output...")
    pred = generate_redaction(model, tokenizer, device, end_id, sample["instruction"])

    print("\n Model Output:")
    print(pred)
    print("\n Ground Truth:")
    print(sample["ground_truth"])
    print()

print("\nDemo complete.")


In [None]:
import re
import json
import random
from pathlib import Path
from collections import defaultdict

import torch
from transformers import AutoTokenizer, BitsAndBytesConfig
from peft import AutoPeftModelForCausalLM
from nltk.translate.bleu_score import sentence_bleu
from sklearn.metrics import precision_recall_fscore_support

MODEL_ID = "Iftakhar/deepseek-phi-adapter"


def get_repo_root() -> Path:
    return Path(os.getcwd())


def get_data_path() -> Path:
    return get_repo_root() / "data" / "phi_data.jsonl"


def read_phi_jsonl(path: Path):
    data = []
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            j = json.loads(line)
            data.append(
                {
                    "instruction": j.get("input", ""),
                    "ground_truth": j.get("output", {}).get("redacted_text", ""),
                    "split": j.get("split", None),
                    "raw": j,
                }
            )
    return data


def maybe_filter_test(data):
    if any(d.get("split") == "test" for d in data):
        return [d for d in data if d.get("split") == "test"]
    return data


def load_model_and_tokenizer():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")

    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
    )

    tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    if "<END>" not in tokenizer.get_vocab():
        tokenizer.add_special_tokens({"additional_special_tokens": ["<END>"]})
    end_id = tokenizer.convert_tokens_to_ids("<END>") if "<END>" in tokenizer.get_vocab() else tokenizer.eos_token_id

    model = AutoPeftModelForCausalLM.from_pretrained(
        MODEL_ID,
        device_map="auto",
        torch_dtype=torch.bfloat16,
        quantization_config=bnb_config,
        trust_remote_code=True,
    )
    model.resize_token_embeddings(len(tokenizer))
    model.eval()

    return model, tokenizer, device, end_id


GEN_KW = dict(max_new_tokens=384, temperature=0.6, top_p=0.9)
safe_div = lambda a, b: a / b if b > 0 else 0.0


def clean_output(txt: str) -> str:
    txt = txt.split("<END>")[0]
    txt = re.sub(r"(RecordedVote|<HR>|Ã½:).*", "", txt, flags=re.DOTALL)
    return txt.strip()


def extract_phi_tags(text: str):
    return re.findall(r"<REDACTED:([A-Z_]+)>", text)


def classify_policy(prompt: str) -> str:
    p = prompt.lower()
    if "all phi" in p or "everything" in p:
        return "Redact All PHI"
    elif "do not" in p or "keep all" in p:
        return "Do Not Redact"
    elif "only" in p:
        if "hospital" in p or "facility" in p:
            return "Facility Only"
        elif "vehicle" in p or "vin" in p:
            return "Vehicle Only"
        elif "date" in p:
            return "Date Only"
        elif "identifier" in p or "ssn" in p:
            return "Identifier Only"
        else:
            return "Selective"
    elif "except" in p:
        return "Except Some PHI"
    else:
        return "General"


def detect_hallucination(original_text: str, model_output: str) -> bool:
    original = original_text.lower()
    output = model_output

    new_entities = re.findall(
        r"(mrn\d+|vin\d+|ssn\s*\d+|\b[A-Z][a-z]+\s[A-Z][a-z]+)", output
    )
    hallucinated = [e for e in new_entities if e.lower() not in original]
    return len(hallucinated) > 0


def generate_output(model, tokenizer, device, end_id, instruction: str) -> str:
    prompt = f"### Instruction:\n{instruction}\n\n### Output:\n"
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        gen = model.generate(
            **inputs,
            max_new_tokens=GEN_KW["max_new_tokens"],
            temperature=GEN_KW["temperature"],
            top_p=GEN_KW["top_p"],
            eos_token_id=end_id,
        )
    out = tokenizer.decode(gen[0], skip_special_tokens=True)
    if "### Output:" in out:
        out = out.split("### Output:", 1)[1]
    model_out = clean_output(out)
    return model_out


# ------------------- RUN EVALUATION -------------------
data_path = get_data_path()
if not data_path.exists():
    raise FileNotFoundError(f"Expected data file at: {data_path}")

print(f" Loading data from: {data_path}")
all_data = read_phi_jsonl(data_path)
test_data = maybe_filter_test(all_data)

if len(test_data) == 0:
    raise ValueError("No test data found in phi_data.jsonl.")

NUM_SAMPLES = 100  # change this if you want more/less
num_samples = min(NUM_SAMPLES, len(test_data))
samples = random.sample(test_data, num_samples)
print(f" Evaluating on {num_samples} samples.")

print(f"Loading model from Hugging Face: {MODEL_ID}")
model, tokenizer, device, end_id = load_model_and_tokenizer()

metrics_by_policy = defaultdict(
    lambda: {"TP": 0, "FP": 0, "FN": 0, "BLEU": [], "Trust": [], "Hall": 0, "Count": 0}
)
preds, refs = [], []

# MAIN LOOP
for sample in samples:
    gt = sample["ground_truth"]
    instr = sample["instruction"]

    model_out = generate_output(model, tokenizer, device, end_id, instr)

    pred_tags = extract_phi_tags(model_out)
    gt_tags = extract_phi_tags(gt)

    policy = classify_policy(instr)
    m = metrics_by_policy[policy]
    m["Count"] += 1

    pred_set, gt_set = set(pred_tags), set(gt_tags)
    TP = len(pred_set & gt_set)
    FP = len(pred_set - gt_set)
    FN = len(gt_set - pred_set)

    m["TP"] += TP
    m["FP"] += FP
    m["FN"] += FN

    is_hall = detect_hallucination(instr, model_out)
    m["Hall"] += 1 if is_hall else 0

    try:
        bleu = sentence_bleu([gt.split()], model_out.split())
    except Exception:
        bleu = 0.0
    m["BLEU"].append(bleu)
    trust = (1 - int(is_hall)) * bleu
    m["Trust"].append(trust)

    preds.append(model_out)
    refs.append(gt)

# AGGREGATE
print("\n=================  EVALUATION SUMMARY =================")
macro_F1s = []
total_TP = total_FP = total_FN = 0
total_hall = total_count = 0

for pol, m in metrics_by_policy.items():
    total_TP += m["TP"]
    total_FP += m["FP"]
    total_FN += m["FN"]
    total_hall += m["Hall"]
    total_count += m["Count"]

    P = safe_div(m["TP"], m["TP"] + m["FP"])
    R = safe_div(m["TP"], m["TP"] + m["FN"])
    F1 = safe_div(2 * P * R, P + R)
    HallRate = safe_div(m["Hall"], m["Count"])
    BLEU = sum(m["BLEU"]) / max(1, len(m["BLEU"]))
    Trust = sum(m["Trust"]) / max(1, len(m["Trust"]))
    macro_F1s.append(F1)

    print(f"\n Policy: {pol}")
    print(f"Samples: {m['Count']}")
    print(f"Precision: {P:.3f}, Recall: {R:.3f}, F1: {F1:.3f}")
    print(f"BLEU: {BLEU:.3f}, Hallucination Rate: {HallRate*100:.1f}%")
    print(f"Trust Score: {Trust:.3f}")

overall_microP = safe_div(total_TP, total_TP + total_FP)
overall_microR = safe_div(total_TP, total_TP + total_FN)
overall_microF1 = safe_div(2 * overall_microP * overall_microR, overall_microP + overall_microR)
macroF1 = sum(macro_F1s) / len(macro_F1s) if macro_F1s else 0.0
overall_HallRate = safe_div(total_hall, total_count)

correct_policies = sum(1 for s in samples if classify_policy(s["instruction"]) != "General")
PMA = safe_div(correct_policies, len(samples))

# char-level F1 with alignment
chars_true, chars_pred = [], []
for gt, pr in zip(refs, preds):
    L = min(len(gt), len(pr))
    if L == 0:
        continue
    chars_true.extend(list(gt[:L]))
    chars_pred.extend(list(pr[:L]))

if chars_true and chars_pred:
    P_c, R_c, F_c, _ = precision_recall_fscore_support(
        chars_true,
        chars_pred,
        average="micro",
    )
else:
    P_c = R_c = F_c = 0.0

print("\n------------------------------------------------------------")
print(f" Overall Micro Precision: {overall_microP:.3f}")
print(f"Overall Micro Recall:    {overall_microR:.3f}")
print(f" Overall Micro F1:        {overall_microF1:.3f}")
print(f" Macro F1 (avg across policies): {macroF1:.3f}")
print(f" Hallucination Rate (overall): {overall_HallRate*100:.1f}%")
print(f" Policy Match Accuracy (PMA): {PMA:.3f}")
print(f"Char-level F1: {F_c:.3f} (Precision {P_c:.3f}, Recall {R_c:.3f})")

print("\n================= ðŸ§¾ QUALITATIVE EXAMPLES =================")
for i, (s, pred) in enumerate(zip(samples[:5], preds[:5]), 1):
    print("=" * 120)
    print(f" SAMPLE {i} | Policy: {classify_policy(s['instruction'])}")
    print(" Instruction:", s["instruction"])
    print("\n Model Output:\n", pred)
    print("\n Ground Truth:\n", s["ground_truth"])

print("\n Evaluation complete â€” all metrics computed successfully.")
