# 03_eval ? Full Test Evaluation (LoRA Adapter)

Goal: evaluate the trained preference-extractor adapter on the **full test split**.

This notebook:
- loads `test.jsonl` from Drive
- loads base model + LoRA adapter
- runs generation for all test rows
- computes JSON validity + key-level metrics
- computes usage confusion matrix
- writes evaluation artifacts to Drive


In [None]:
# --- 0) Drive mount + paths ---
from google.colab import drive
drive.mount("/content/drive")

from pathlib import Path

DRIVE_PROJECT_DIR = Path("/content/drive/MyDrive/laprop")
COLAB_DIR = DRIVE_PROJECT_DIR / "colab"

DATASET_DIR = COLAB_DIR / "data" / "prefs_dataset_v1"
ARTIFACTS_ROOT = COLAB_DIR / "artifacts" / "prefs_extractor"
EVAL_ROOT = COLAB_DIR / "artifacts" / "eval"
EVAL_ROOT.mkdir(parents=True, exist_ok=True)

print("DATASET_DIR:", DATASET_DIR)
print("ARTIFACTS_ROOT:", ARTIFACTS_ROOT)
print("EVAL_ROOT:", EVAL_ROOT)


In [None]:
# --- 1) Clone/pull repo and install deps ---
from pathlib import Path

REPO_DIR = Path("/content/laprop-recommender")
REPO_URL = "https://github.com/ahmedberatAI/laprop-recommender.git"

if not REPO_DIR.exists():
    !git clone --depth 1 {REPO_URL} /content/laprop-recommender
else:
    !git -C /content/laprop-recommender pull

%cd /content/laprop-recommender

%pip install -q -r /content/laprop-recommender/colab/requirements_colab.txt
%pip install -q -e /content/laprop-recommender


In [None]:
# --- 2) Imports + GPU sanity ---
import json
import math
import random
import re
import time
from collections import Counter
from pathlib import Path
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
import torch
from datasets import load_dataset
from peft import PeftModel
from sklearn.metrics import classification_report, confusion_matrix
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

print("torch:", torch.__version__)
print("cuda available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("device:", torch.cuda.get_device_name(0))
    cap = torch.cuda.get_device_capability(0)
    print("capability:", cap)
else:
    cap = (0, 0)

USE_BF16 = bool(torch.cuda.is_available() and cap[0] >= 8)
print("USE_BF16:", USE_BF16)


In [None]:
# --- 3) Select adapter dir (auto: latest) ---
# Option A: set a specific adapter path manually
ADAPTER_DIR_STR = ""  # e.g. "/content/drive/MyDrive/laprop/colab/artifacts/prefs_extractor/.../adapter"

if ADAPTER_DIR_STR.strip():
    ADAPTER_DIR = Path(ADAPTER_DIR_STR)
else:
    cands = sorted(ARTIFACTS_ROOT.glob("*/adapter"))
    if not cands:
        raise FileNotFoundError(f"No adapter found under: {ARTIFACTS_ROOT}")
    ADAPTER_DIR = cands[-1]

meta_path = ADAPTER_DIR / "meta.json"
if not meta_path.exists():
    raise FileNotFoundError(f"meta.json not found: {meta_path}")

meta = json.loads(meta_path.read_text(encoding="utf-8"))
MODEL_NAME = meta.get("model_name", "Qwen/Qwen2.5-1.5B-Instruct")
MAX_LEN = int(meta.get("max_len", 512))

run_id = ADAPTER_DIR.parent.name
EVAL_DIR = EVAL_ROOT / f"{run_id}_{time.strftime('%Y%m%d_%H%M%S')}"
EVAL_DIR.mkdir(parents=True, exist_ok=True)

print("ADAPTER_DIR:", ADAPTER_DIR)
print("MODEL_NAME:", MODEL_NAME)
print("MAX_LEN:", MAX_LEN)
print("EVAL_DIR:", EVAL_DIR)


In [None]:
# --- 4) Load test dataset ---
data_files = {"test": str(DATASET_DIR / "test.jsonl")}
ds = load_dataset("json", data_files=data_files)
test_ds = ds["test"]

print(test_ds)
print("columns:", test_ds.column_names)
print("num_rows:", len(test_ds))
print("example:")
test_ds[0]


In [None]:
# --- 5) Load base model + LoRA adapter ---
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16 if USE_BF16 else torch.float16,
)

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, trust_remote_code=True)
if tokenizer.eos_token is None:
    tokenizer.eos_token = "</s>"
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

base_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True,
)

model = PeftModel.from_pretrained(base_model, str(ADAPTER_DIR))
model.eval()

print("pad_token_id:", tokenizer.pad_token_id)
print("eos_token_id:", tokenizer.eos_token_id)


In [None]:
# --- 6) Inference helpers ---
def extract_json_obj(text: str) -> Optional[Dict[str, Any]]:
    s = (text or "").strip()
    if not s:
        return None

    # direct parse
    try:
        obj = json.loads(s)
        return obj if isinstance(obj, dict) else None
    except Exception:
        pass

    # fenced block parse
    fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", s, flags=re.S)
    if fence:
        cand = fence.group(1)
        try:
            obj = json.loads(cand)
            return obj if isinstance(obj, dict) else None
        except Exception:
            pass

    # first JSON-looking object
    m = re.search(r"\{.*\}", s, flags=re.S)
    if m:
        cand = m.group(0)
        try:
            obj = json.loads(cand)
            return obj if isinstance(obj, dict) else None
        except Exception:
            return None

    return None


def to_float_or_none(x: Any) -> Optional[float]:
    try:
        if x is None:
            return None
        return float(x)
    except Exception:
        return None


def normalize_value(v: Any) -> Any:
    if isinstance(v, list):
        return [normalize_value(x) for x in v]
    if isinstance(v, float) and v.is_integer():
        return int(v)
    return v


def generate_completion(prompt: str, max_new_tokens: int = 180) -> str:
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_LEN).to(model.device)
    with torch.no_grad():
        out = model.generate(
            **inputs,
            max_new_tokens=int(max_new_tokens),
            do_sample=False,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.pad_token_id,
        )
    gen = out[0][inputs["input_ids"].shape[1]:]
    return tokenizer.decode(gen, skip_special_tokens=True).strip()


In [None]:
# --- 7) Run full test inference ---
rows = []

for i in range(len(test_ds)):
    ex = test_ds[i]
    prompt = ex["prompt"]
    pred_raw = generate_completion(prompt)
    pred_obj = extract_json_obj(pred_raw)

    rows.append({
        "idx": i,
        "id": ex.get("id"),
        "scenario_id": ex.get("scenario_id"),
        "input_text": ex.get("input_text"),
        "gold": ex.get("target_prefs") or {},
        "pred_raw": pred_raw,
        "pred": pred_obj,
        "json_ok": pred_obj is not None,
    })

print("inference done:", len(rows))
print("json ok:", sum(r["json_ok"] for r in rows), "/", len(rows))


In [None]:
# --- 8) Compute metrics ---
KEYS = [
    "usage_key",
    "min_budget",
    "max_budget",
    "dev_mode",
    "productivity_profile",
    "screen_max",
    "design_profiles",
    "design_gpu_hint",
    "design_min_ram_hint",
    "gaming_titles",
    "min_gpu_score_required",
    "gaming_min_gpu",
]

n = len(rows)
json_ok = sum(r["json_ok"] for r in rows)
json_parse_rate = json_ok / n if n else 0.0

core_keys = ["usage_key", "min_budget", "max_budget"]
core_exact = 0

usage_gold = []
usage_pred = []

per_key = {}
for k in KEYS:
    per_key[k] = {"support": 0, "correct": 0}

min_budget_abs_err = []
max_budget_abs_err = []

for r in rows:
    gold = r["gold"] or {}
    pred = r["pred"] or {}

    # core exact
    ok_core = True
    for k in core_keys:
        gv = gold.get(k)
        pv = pred.get(k)
        if isinstance(gv, (int, float)):
            if to_float_or_none(pv) is None or float(gv) != float(pv):
                ok_core = False
                break
        else:
            if gv != pv:
                ok_core = False
                break
    if ok_core:
        core_exact += 1

    # usage confusion
    ug = gold.get("usage_key")
    up = pred.get("usage_key") if pred else None
    usage_gold.append(ug)
    usage_pred.append(up if up is not None else "<NONE>")

    # per-key accuracy over non-null gold labels
    for k in KEYS:
        gv = gold.get(k)
        if gv is None:
            continue

        per_key[k]["support"] += 1
        pv = pred.get(k) if pred else None

        if k in ("min_budget", "max_budget", "min_gpu_score_required", "gaming_min_gpu", "design_min_ram_hint"):
            gvn = to_float_or_none(gv)
            pvn = to_float_or_none(pv)
            if gvn is not None and pvn is not None and gvn == pvn:
                per_key[k]["correct"] += 1
        elif isinstance(gv, list):
            if isinstance(pv, list) and [str(x) for x in pv] == [str(x) for x in gv]:
                per_key[k]["correct"] += 1
        else:
            if normalize_value(pv) == normalize_value(gv):
                per_key[k]["correct"] += 1

    # budget MAE
    gmin = to_float_or_none(gold.get("min_budget"))
    pmin = to_float_or_none(pred.get("min_budget") if pred else None)
    if gmin is not None and pmin is not None:
        min_budget_abs_err.append(abs(gmin - pmin))

    gmax = to_float_or_none(gold.get("max_budget"))
    pmax = to_float_or_none(pred.get("max_budget") if pred else None)
    if gmax is not None and pmax is not None:
        max_budget_abs_err.append(abs(gmax - pmax))

per_key_rows = []
for k, v in per_key.items():
    supp = v["support"]
    corr = v["correct"]
    acc = (corr / supp) if supp else None
    per_key_rows.append({"key": k, "support": supp, "correct": corr, "accuracy": acc})

per_key_df = pd.DataFrame(per_key_rows).sort_values(["support", "key"], ascending=[False, True])

metrics = {
    "n_test": n,
    "json_ok": json_ok,
    "json_parse_rate": json_parse_rate,
    "core_exact_count": core_exact,
    "core_exact_rate": (core_exact / n if n else 0.0),
    "min_budget_mae": (float(np.mean(min_budget_abs_err)) if min_budget_abs_err else None),
    "max_budget_mae": (float(np.mean(max_budget_abs_err)) if max_budget_abs_err else None),
}

print(json.dumps(metrics, indent=2, ensure_ascii=False))
print("
per-key accuracy:")
display(per_key_df)


In [None]:
# --- 9) Usage confusion matrix + report ---
labels = sorted(set(usage_gold) | set(usage_pred))
cm = confusion_matrix(usage_gold, usage_pred, labels=labels)
cm_df = pd.DataFrame(cm, index=[f"gold:{x}" for x in labels], columns=[f"pred:{x}" for x in labels])

print("labels:", labels)
print("
confusion matrix:")
display(cm_df)

print("
classification report (usage_key):")
print(classification_report(usage_gold, usage_pred, labels=labels, zero_division=0))


In [None]:
# --- 10) Save artifacts to Drive ---
preds_path = EVAL_DIR / "predictions.jsonl"
metrics_path = EVAL_DIR / "metrics.json"
per_key_path = EVAL_DIR / "per_key_accuracy.csv"
cm_path = EVAL_DIR / "usage_confusion.csv"

with preds_path.open("w", encoding="utf-8") as f:
    for r in rows:
        row = {
            "idx": r["idx"],
            "id": r["id"],
            "scenario_id": r["scenario_id"],
            "input_text": r["input_text"],
            "gold": r["gold"],
            "pred": r["pred"],
            "pred_raw": r["pred_raw"],
            "json_ok": r["json_ok"],
        }
        f.write(json.dumps(row, ensure_ascii=False) + "
")

metrics_path.write_text(json.dumps(metrics, ensure_ascii=False, indent=2), encoding="utf-8")
per_key_df.to_csv(per_key_path, index=False)
cm_df.to_csv(cm_path)

print("wrote:")
print("-", preds_path)
print("-", metrics_path)
print("-", per_key_path)
print("-", cm_path)


In [None]:
# --- 11) Show failure samples ---
fail = []
for r in rows:
    gold = r["gold"] or {}
    pred = r["pred"] or {}

    # focus mismatch: usage_key, min_budget, max_budget
    bad = False
    if not r["json_ok"]:
        bad = True
    else:
        for k in ["usage_key", "min_budget", "max_budget"]:
            gv = gold.get(k)
            pv = pred.get(k)
            if isinstance(gv, (int, float)):
                if to_float_or_none(pv) is None or float(gv) != float(pv):
                    bad = True
                    break
            else:
                if gv != pv:
                    bad = True
                    break

    if bad:
        fail.append(r)

print("fail_count:", len(fail))
for r in fail[:10]:
    print("---")
    print("id:", r["id"], "scenario:", r["scenario_id"])
    print("input:", r["input_text"])
    print("gold:", r["gold"])
    print("pred:", r["pred"])
    print("pred_raw:", r["pred_raw"][:300])
