In [1]:
import os, re, time, json, random, itertools, textwrap, warnings, requests, csv
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple

import numpy as np
import torch

from bs4 import BeautifulSoup
from unidecode import unidecode
from rapidfuzz import fuzz

from torch.utils.data import Dataset as TorchDataset
from transformers import DataCollatorForSeq2Seq
from transformers import AutoTokenizer
from transformers import AutoModelForSeq2SeqLM

from peft import LoraConfig, get_peft_model, TaskType
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer  

# base_model = AutoModelForSeq2SeqLM.from_pretrained(gen_ckpt).to(DEVICE)

from sentence_transformers import CrossEncoder
# reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device=DEVICE)

from sentence_transformers import SentenceTransformer
from sklearn.neighbors import NearestNeighbors
from rank_bm25 import BM25Okapi
from difflib import SequenceMatcher


warnings.filterwarnings("ignore")

tok = AutoTokenizer.from_pretrained("bert-base-uncased")
# embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", device=DEVICE)


# --- Config ---
COMPANY = "Apple Inc."
CIK = "0000320193"  # Apple
SEED = 42
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
DEVICE


'cpu'

In [2]:
#@title Download SEC companyfacts (small JSON) + (optional) 10-K HTML (tiny)
os.makedirs("data", exist_ok=True)

headers = {
    "User-Agent": "Colab-Student-Assignment/1.0 (email@example.com)"  # replace email if asked
}

facts_url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{CIK}.json"
facts_path = "data/companyfacts.json"

r = requests.get(facts_url, headers=headers, timeout=60)
r.raise_for_status()
with open(facts_path, "wb") as f:
    f.write(r.content)

# Optional small narrative: Fetch a lightweight 10-K HTML page (Apple FY2024 10-K index page)
# (If this ever fails due to site changes, the pipeline still works from JSON alone.)
opt_html_urls = [
    "https://www.sec.gov/ixviewer/doc?action=display&source=content&source_url=/Archives/edgar/data/320193/000032019324000099/aapl-20240928.htm"
]
raw_html_texts = []
for u in opt_html_urls:
    try:
        h = requests.get(u, headers=headers, timeout=60)
        if h.ok and len(h.content) < 10_000_000:  # keep it light (<10MB)
            raw_html_texts.append(h.text)
    except Exception as e:
        print("Optional HTML fetch skipped:", e)

len(r.content)/1_000_000, len(raw_html_texts)


(3.617318, 0)

In [3]:
os.makedirs("data", exist_ok=True)
headers = {"User-Agent": "Colab-Student-Assignment/1.0 (email@example.com)"}

# Small JSON (a few MB)
facts_url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{CIK}.json"
facts_path = "data/companyfacts.json"
r = requests.get(facts_url, headers=headers, timeout=60); r.raise_for_status()
with open(facts_path, "wb") as f: f.write(r.content)

# Optional tiny narrative HTML (1–2 pages)
opt_html_urls = [
    "https://www.sec.gov/ixviewer/doc?action=display&source=content&source_url=/Archives/edgar/data/320193/000032019324000099/aapl-20240928.htm"
]
raw_html_texts = []
for u in opt_html_urls:
    try:
        h = requests.get(u, headers=headers, timeout=60)
        if h.ok and len(h.content) < 10_000_000:  # keep small
            raw_html_texts.append(h.text)
    except Exception as e:
        print("Optional HTML fetch skipped:", e)

(len(r.content)/1_000_000, len(raw_html_texts))


(3.617318, 0)

In [4]:
def clean_text(txt: str) -> str:
    txt = unidecode(txt)
    txt = re.sub(r"[ \t]+", " ", txt)
    txt = re.sub(r"\n{2,}", "\n", txt)
    txt = txt.replace("\u00a0", " ")
    return txt.strip()

sections = []
for html in raw_html_texts:
    # Use html5lib (pure Python) to avoid lxml
    soup = BeautifulSoup(html, "html5lib")
    for bad in soup(["script", "style", "noscript"]):
        bad.extract()
    text = clean_text(soup.get_text(separator="\n"))

    # naive sectioning by headings (ITEM X or ALLCAPS)
    chunks = re.split(r"\n(?=(ITEM\s+\d+|[A-Z][A-Z \-/&]{6,}))", text)
    i = 0
    while i < len(chunks):
        if i+1 < len(chunks) and re.match(r"(ITEM\s+\d+|[A-Z][A-Z \-/&]{6,})", chunks[i+1] or ""):
            heading = chunks[i+1].strip()
            body = chunks[i+2].strip() if i+2 < len(chunks) else ""
            sections.append({"section": heading[:80], "text": body})
            i += 3
        else:
            body = chunks[i].strip()
            if body:
                sections.append({"section": "GENERAL", "text": body})
            i += 1

len(sections)


0

In [5]:
with open("data/companyfacts.json","r") as f:
    facts = json.load(f)

# Prefer 'USD' unit rows; otherwise pick the unit with most entries
def pick_unit_rows(tag_obj):
    units = tag_obj.get("units", {})
    if "USD" in units:
        return "USD", units["USD"]
    if "shares" in units:
        return "shares", units["shares"]
    # fallback: the unit key with most rows
    best_unit, best_rows = None, []
    for u, rows in units.items():
        if len(rows) > len(best_rows):
            best_unit, best_rows = u, rows
    return best_unit, best_rows

def latest_two_by_end(tag: str):
    obj = facts.get("facts", {}).get("us-gaap", {}).get(tag, {})
    if not obj: return None, None, None
    unit, rows = pick_unit_rows(obj)
    if not rows: return None, None, None
    # sort by 'end' date desc
    rows = [r for r in rows if "end" in r]
    rows.sort(key=lambda x: x.get("end",""), reverse=True)
    if len(rows) >= 2:
        return rows[0], rows[1], unit
    return None, None, None

# Common GAAP tags (robust actual tag names)
TAGS = {
    "Revenues": "Revenues",
    "NetIncomeLoss": "Net income (loss)",
    "OperatingIncomeLoss": "Operating income (loss)",
    "GrossProfit": "Gross profit",
    "Assets": "Total assets",
    "Liabilities": "Total liabilities",
    "StockholdersEquity": "Shareholders' equity",
    "CashAndCashEquivalentsAtCarryingValue": "Cash & equivalents",
    "EarningsPerShareDiluted": "EPS (diluted)",
    "EarningsPerShareBasic": "EPS (basic)",
    "NetCashProvidedByUsedInOperatingActivities": "Net cash from operations",
    "PaymentsToAcquirePropertyPlantAndEquipment": "Capital expenditures",
}

def fmt_amt(v):
    try:
        v = float(v)
        if abs(v) >= 1e9: return f"${v/1e9:.2f}B"
        if abs(v) >= 1e6: return f"${v/1e6:.2f}M"
        return f"${v:,.0f}"
    except:
        return str(v)

qa = []
for tag, pretty in TAGS.items():
    a,b,unit = latest_two_by_end(tag)
    if a and b:
        y1, v1 = a.get("end","")[:4], a.get("val")
        y2, v2 = b.get("end","")[:4], b.get("val")
        if v1 is None or v2 is None: continue
        q1 = f"What was {COMPANY}'s {pretty} in {y1}?"
        a1 = f"{COMPANY}'s {pretty} in {y1} was {fmt_amt(v1)}."
        q2 = f"What was {COMPANY}'s {pretty} in {y2}?"
        a2 = f"{COMPANY}'s {pretty} in {y2} was {fmt_amt(v2)}."
        try:
            delta = float(v1) - float(v2)
            sgn = "increase" if delta >= 0 else "decrease"
            q3 = f"How did {pretty} change from {y2} to {y1} for {COMPANY}?"
            a3 = f"{pretty} shows a {sgn} from {fmt_amt(v2)} in {y2} to {fmt_amt(v1)} in {y1}."
            qa.extend([(q1,a1),(q2,a2),(q3,a3)])
        except:
            qa.extend([(q1,a1),(q2,a2)])

# Pad to ≥50 with paraphrases if needed
base_len = len(qa)
i = 0
while len(qa) < 50 and base_len > 0:
    q, a_ = qa[i % base_len]
    qa.append((q.replace("What was","Give me").replace("How did","Describe how"),
               a_.replace("was","stood at")))
    i += 1

len(qa)


50

In [6]:

tok = AutoTokenizer.from_pretrained("bert-base-uncased")

def make_chunks(text, toks=100, stride=20):
    tokens = tok.encode(text, add_special_tokens=False)
    out = []
    for i in range(0, len(tokens), max(1, toks - stride)):
        window = tokens[i:i+toks]
        if not window: break
        out.append(tok.decode(window))
    return out

corpus_docs = []
for s in sections:
    for size in (100, 400):
        for ch in make_chunks(s["text"], toks=size, stride=20):
            corpus_docs.append({
                "id": f'{s["section"][:30]}::{hash(ch)}::{size}',
                "section": s["section"],
                "chunk_size": size,
                "text": ch
            })

# Fallback to facts sentences if no HTML sections present
if not corpus_docs:
    for q,a_ in qa:
        corpus_docs.append({"id": f"facts::{hash(q+a_)}::100", "section":"FACTS", "chunk_size":100, "text": f"{q}\n{a_}"})

len(corpus_docs)


50

In [7]:
embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", device=DEVICE)

texts = [d["text"] for d in corpus_docs]
emb = embedder.encode(texts, convert_to_numpy=True, normalize_embeddings=True,
                      batch_size=64, show_progress_bar=False)

# Dense index (cosine distance => similarity = 1 - dist)
nn = NearestNeighbors(metric="cosine")
nn.fit(emb)

# Sparse BM25
bm25 = BM25Okapi([t.split() for t in texts])

def retrieve(query, topk=5, alpha=0.5):
    # Dense
    qv = embedder.encode([query], normalize_embeddings=True)
    distances, indices = nn.kneighbors(qv, n_neighbors=topk)
    dense = [(int(i), 1.0 - float(distances[0][k])) for k,i in enumerate(indices[0])]

    # Sparse (min-max normalize)
    scores = bm25.get_scores(query.split())
    top_ids = np.argsort(scores)[::-1][:topk]
    smax, smin = scores[top_ids].max() + 1e-9, scores[top_ids].min()
    sparse = [(int(i), float((scores[i]-smin)/(smax-smin+1e-9))) for i in top_ids]

    # Hybrid fusion
    fused = {}
    for i,sc in dense:  fused[i] = fused.get(i,0) + alpha*sc
    for i,sc in sparse: fused[i] = fused.get(i,0) + (1-alpha)*sc
    ranked = sorted(fused.items(), key=lambda x: x[1], reverse=True)[:topk]
    return [(corpus_docs[i], float(s)) for i,s in ranked]


In [8]:
# Advanced RAG: #3 Re-Ranking with Cross-Encoders 
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device=DEVICE)

def rerank_with_cross_encoder(query, candidates, topk=5):
    pairs = [(query, c["text"]) for c,_ in candidates]
    scores = reranker.predict(pairs).tolist()
    ranked = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)[:topk]
    return [(c[0], float(s)) for c,s in ranked]


In [9]:
FIN_KEYWORDS = ["revenue","income","profit","assets","liabilities","eps","cash","equity",
                "operating","balance sheet","income statement","cash flow","fy","2023","2024","2025"]
def is_relevant_query(q: str) -> bool:
    low = q.lower()
    return any(k in low for k in FIN_KEYWORDS) and COMPANY.lower().split()[0] in low

def numeric_verification(answer: str, contexts: List[str]) -> float:
    nums = re.findall(r"\$[-+]?\d[\d,]*(?:\.\d+)?[MB]?", answer)
    if not nums: return 0.75  # neutral if no numeric claim
    ctx = " ".join(contexts)
    hits = 0
    for n in nums:
        n_plain = n.replace("$","").replace(",","")
        if n in ctx or n_plain in ctx: hits += 1
    return 0.25 + 0.75*(hits / max(1,len(nums)))  # 0.25..1.0


In [10]:
gen_ckpt = "google/flan-t5-small"
gen_tok = AutoTokenizer.from_pretrained(gen_ckpt)
gen_model = AutoModelForSeq2SeqLM.from_pretrained(gen_ckpt).to(DEVICE)

def gen_answer(prompt, max_new_tokens=128):
    inpt = gen_tok(prompt, return_tensors="pt").to(DEVICE)
    out = gen_model.generate(**inpt, max_new_tokens=max_new_tokens, do_sample=False)
    return gen_tok.decode(out[0], skip_special_tokens=True)

def rag_answer(query: str, topk=5, use_reranker=True):
    t0 = time.time()
    if not is_relevant_query(query):
        return {"answer":"I’m sorry, that looks irrelevant to the financial documents I have.",
                "confidence":0.1, "method":"RAG", "time": time.time()-t0, "contexts":[]}
    cands = retrieve(query, topk=topk)
    if use_reranker: cands = rerank_with_cross_encoder(query, cands, topk=topk)
    ctxs = [c["text"] for c,_ in cands]
    prompt = f"Answer the question using the context.\nQuestion: {query}\n\nContext:\n" + \
             "\n---\n".join(ctxs) + "\n\nAnswer:"
    ans = gen_answer(prompt)
    conf = numeric_verification(ans, ctxs)
    return {"answer": ans.strip(), "confidence": float(conf), "method":"RAG",
            "time": time.time()-t0, "contexts": ctxs}


In [11]:
#10
# Split 40/10 using the QA pairs we built earlier
random.shuffle(qa)
train_pairs = qa[:40]
test_pairs  = qa[40:50]


class QADataset(TorchDataset):
    """Arrow-/pandas-free torch dataset for seq2seq FT on Flan-T5."""
    def __init__(self, pairs, tokenizer):
        self.pairs = pairs
        self.tok = tokenizer
    def __len__(self):
        return len(self.pairs)
    def __getitem__(self, idx):
        q, a = self.pairs[idx]
        model_inputs = self.tok(f"Q: {q}\nA:", truncation=True)
        with self.tok.as_target_tokenizer():
            labels = self.tok(a, truncation=True)
        return {
            "input_ids": torch.tensor(model_inputs["input_ids"], dtype=torch.long),
            "attention_mask": torch.tensor(model_inputs["attention_mask"], dtype=torch.long),
            "labels": torch.tensor(labels["input_ids"], dtype=torch.long),
        }

tok_train = QADataset(train_pairs, gen_tok)
tok_test  = QADataset(test_pairs,  gen_tok)
data_collator = DataCollatorForSeq2Seq(gen_tok, model=gen_model)

# Baseline (pre-FT) on 10 test Qs — returns list[dict], not a DataFrame
def run_baseline(pairs):
    rows=[]
    for q,a_true in pairs:
        t0=time.time()
        out = gen_answer(f"Q: {q}\nA:")
        rows.append({
            "question": q,
            "ground_truth": a_true,
            "pred": out,
            "confidence": float(numeric_verification(out, [a_true])),
            "time": time.time()-t0
        })
    return rows

baseline_rows = run_baseline(test_pairs)
# Quick peek
for r in baseline_rows[:3]: print(r)


{'question': 'How did Net income (loss) change from 2025 to 2025 for Apple Inc.?', 'ground_truth': 'Net income (loss) shows a increase from $23.43B in 2025 to $84.54B in 2025.', 'pred': 'Apple Inc.', 'confidence': 0.75, 'time': 0.42537379264831543}
{'question': "What was Apple Inc.'s Capital expenditures in 2025?", 'ground_truth': "Apple Inc.'s Capital expenditures in 2025 was $6.01B.", 'pred': '$1.2 billion', 'confidence': 0.25, 'time': 0.31282997131347656}
{'question': "What was Apple Inc.'s Operating income (loss) in 2025?", 'ground_truth': "Apple Inc.'s Operating income (loss) in 2025 was $100.62B.", 'pred': 'amounted to a net loss', 'confidence': 0.75, 'time': 0.7023003101348877}


In [13]:
# 11) **Advanced FT technique** = LoRA on Flan-T5-small  (fixed args/trainer)

base_model = AutoModelForSeq2SeqLM.from_pretrained(gen_ckpt).to(DEVICE)

# Advanced FT: LoRA on T5 attention projections
lora_cfg = LoraConfig(
    task_type=TaskType.SEQ_2_SEQ_LM,
    r=16, lora_alpha=32, lora_dropout=0.05,
    target_modules=["q","k","v","o"]
)

ft_model = get_peft_model(base_model, lora_cfg)
ft_model.print_trainable_parameters()

# Smaller batches are friendlier on CPU runtimes
BATCH = 4 if DEVICE == "cpu" else 8

args = Seq2SeqTrainingArguments(
    output_dir="runs/ft_flan_t5_small_lora",
    learning_rate=2e-4,
    per_device_train_batch_size=BATCH,
    per_device_eval_batch_size=BATCH,
    num_train_epochs=3,
    lr_scheduler_type="cosine",
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="no",
    logging_steps=50,
    predict_with_generate=True,      # <-- now valid
    generation_max_length=128,
    report_to="none",
    remove_unused_columns=False
)

trainer = Seq2SeqTrainer(
    model=ft_model,
    args=args,
    train_dataset=tok_train,
    eval_dataset=tok_test,
    tokenizer=gen_tok,
    data_collator=data_collator,
)

trainer.train()
eval_metrics = trainer.evaluate()
print(eval_metrics)


trainable params: 1,376,256 || all params: 78,337,408 || trainable%: 1.7568


Epoch,Training Loss,Validation Loss
1,No log,1.942858
2,No log,1.797937
3,No log,1.768823


{'eval_loss': 1.7688226699829102, 'eval_runtime': 1.3235, 'eval_samples_per_second': 7.556, 'eval_steps_per_second': 2.267, 'epoch': 3.0}


In [14]:
def ft_answer(query: str):
    t0=time.time()
    if not is_relevant_query(query):
        return {"answer":"I’m sorry, that looks irrelevant to the financial Q/A I was trained on.",
                "confidence":0.1, "method":"FT", "time": time.time()-t0}
    inpt = gen_tok(f"Q: {query}\nA:", return_tensors="pt").to(DEVICE)
    out = ft_model.generate(**inpt, max_new_tokens=128, do_sample=False)
    ans = gen_tok.decode(out[0], skip_special_tokens=True)
    conf = numeric_verification(ans, [ans])
    return {"answer": ans.strip(), "confidence": float(conf), "method":"FT", "time": time.time()-t0}


In [15]:
relevant_high = test_pairs[0][0] if test_pairs else "What was Apple Inc.'s Revenues in 2024?"
relevant_low  = "How did Apple’s performance change recently?"
irrelevant    = "What is the capital of France?"

tests = [
    ("Relevant, high-confidence", relevant_high),
    ("Relevant, low-confidence", relevant_low),
    ("Irrelevant", irrelevant),
]

mand_rows=[]
for label, q in tests:
    r = rag_answer(q)
    f = ft_answer(q)
    mand_rows.append({"type":label, "method":r["method"], "question":q, "answer":r["answer"], "confidence":float(r["confidence"]), "time":float(r["time"])})
    mand_rows.append({"type":label, "method":f["method"], "question":q, "answer":f["answer"], "confidence":float(f["confidence"]), "time":float(f["time"])})

# Peek
for r in mand_rows: print(r)


{'type': 'Relevant, high-confidence', 'method': 'RAG', 'question': 'How did Net income (loss) change from 2025 to 2025 for Apple Inc.?', 'answer': 'Net income (loss) shows a increase from $23.43B in 2025 to $84.54B in 2025. ---', 'confidence': 1.0, 'time': 2.402811288833618}
{'type': 'Relevant, high-confidence', 'method': 'FT', 'question': 'How did Net income (loss) change from 2025 to 2025 for Apple Inc.?', 'answer': "Apple Inc.'s net income (loss) was reduced from $20 million to $17 million in 2025.", 'confidence': 1.0, 'time': 2.4034883975982666}
{'type': 'Relevant, low-confidence', 'method': 'RAG', 'question': 'How did Apple’s performance change recently?', 'answer': 'I’m sorry, that looks irrelevant to the financial documents I have.', 'confidence': 0.1, 'time': 0.0}
{'type': 'Relevant, low-confidence', 'method': 'FT', 'question': 'How did Apple’s performance change recently?', 'answer': 'I’m sorry, that looks irrelevant to the financial Q/A I was trained on.', 'confidence': 0.1, 

In [16]:
def exact_or_fuzzy_match(pred, gold):
    pred_l, gold_l = pred.lower(), gold.lower()
    if fuzz.token_set_ratio(pred_l, gold_l) >= 85: return True
    nums = re.findall(r"\$[-+]?\d[\d,]*(?:\.\d+)?[MB]?", gold)
    ok = all(n in pred or n.replace("$","") in pred for n in nums) if nums else False
    return ok

eval_qs = qa[:10] if len(qa)>=10 else qa

res_rows=[]
for q, a_true in eval_qs:
    r = rag_answer(q)
    f = ft_answer(q)
    res_rows.append({
        "method":"RAG","question":q,"ground_truth":a_true,
        "answer":r["answer"],"confidence":float(r["confidence"]),
        "time":float(r["time"]), "correct": int(exact_or_fuzzy_match(r["answer"], a_true))
    })
    res_rows.append({
        "method":"FT","question":q,"ground_truth":a_true,
        "answer":f["answer"],"confidence":float(f["confidence"]),
        "time":float(f["time"]), "correct": int(exact_or_fuzzy_match(f["answer"], a_true))
    })

# Summary stats by method
from statistics import mean
summary_rows=[]
by_method = {"RAG": [], "FT": []}
for row in res_rows:
    by_method.setdefault(row["method"], []).append(row)

for method, group in by_method.items():
    if not group: continue
    summary_rows.append({
        "method": method,
        "avg_accuracy": round(mean(g["correct"] for g in group), 4),
        "avg_confidence": round(mean(g["confidence"] for g in group), 4),
        "avg_time_s": round(mean(g["time"] for g in group), 4),
        "n": len(group)
    })

# Peek
print("Summary:", summary_rows)


Summary: [{'method': 'RAG', 'avg_accuracy': 0.6, 'avg_confidence': 0.975, 'avg_time_s': 1.4361, 'n': 10}, {'method': 'FT', 'avg_accuracy': 0, 'avg_confidence': 0.875, 'avg_time_s': 1.2334, 'n': 10}]


In [20]:

# ------------ Sizes ------------
COMPACT_QUESTION_HEIGHT = "110px"   # ~4–5 lines
COMPACT_OUTPUT_HEIGHT   = COMPACT_QUESTION_HEIGHT
EXPANDED_OUTPUT_HEIGHT  = "340px"

# ------------ Helpers ------------
def token_set_ratio(a: str, b: str) -> int:
    def norm(s: str):
        toks = re.findall(r"\w+", (s or "").lower())
        return set(toks), " ".join(sorted(toks))
    set_a, str_a = norm(a); set_b, str_b = norm(b)
    common = " ".join(sorted(set_a & set_b))
    def ratio(x, y): return int(round(100 * SequenceMatcher(None, x, y).ratio()))
    return max(ratio(common, str_a), ratio(common, str_b), ratio(str_a, str_b))

gt_map = {q: a for (q, a) in (globals().get("test_pairs") or [])}

def is_correct(question: str, answer: str, confidence: float) -> bool:
    ql = (question or "").strip().lower()
    al = (answer or "").strip().lower()
    if question in gt_map:
        gold = gt_map[question]
        if token_set_ratio(answer, gold) >= 85:
            return True
        nums = re.findall(r"\$?[-+]?\d[\d,]*(?:\.\d+)?(?:[MB])?", gold)
        if nums and all(n in (answer or "") or n.replace("$","") in (answer or "") for n in nums):
            return True
        return False
    if "irrelevant" in globals() and ql == (globals()["irrelevant"] or "").strip().lower():
        guardrail = any(p in al for p in [
            "not in scope","not applicable","i’m sorry","i am sorry","cannot answer","no supporting context"
        ])
        return guardrail or float(confidence) <= 0.5
    if "relevant_low" in globals() and ql == (globals()["relevant_low"] or "").strip().lower():
        has_num = bool(re.search(r"\$|\d", answer or ""))
        return (not has_num) or float(confidence) < 0.7
    return True

def pretty_method(m):
    return "Fine-Tune" if (m or "").upper() == "FT" else m

# ------------ Shim to your pipeline ------------
EMBEDDER     = globals().get("embedder")
RERANKER     = globals().get("reranker")
GEN_TOK      = globals().get("gen_tok")
GEN_MODEL    = globals().get("gen_model")
CORPUS_TEXTS = globals().get("texts")
NN           = globals().get("nn")
BM25         = globals().get("bm25")

def _need(*names):
    missing = [n for n in names if globals().get(n) is None]
    if missing:
        raise RuntimeError("Missing objects: " + ", ".join(missing) +
                           ". Re-run earlier steps to build them.")

def rag_ui(query: str, topk: int = 5, use_reranker: bool = True):
    ra = globals().get("rag_answer")
    if ra is None:
        raise RuntimeError("rag_answer(...) is not defined. Run your RAG setup cell.")
    try:
        return ra(query, topk, use_reranker)  # short signature
    except TypeError:
        _need("EMBEDDER","RERANKER","GEN_TOK","GEN_MODEL","CORPUS_TEXTS","NN","BM25")
        return ra(query, topk, use_reranker, EMBEDDER, RERANKER, GEN_TOK, GEN_MODEL, CORPUS_TEXTS, NN, BM25)

def ft_ui(query: str):
    fa = globals().get("ft_answer")
    if fa is None:
        raise RuntimeError("ft_answer(...) is not defined. Run your FT setup cell.")
    try:
        return fa(query)  # short signature
    except TypeError:
        _need("GEN_TOK","GEN_MODEL")
        return fa(query, GEN_TOK, GEN_MODEL)

In [21]:
# ==== Random 12-Q Evaluation (auto-extend test_pairs if needed) ====
import random, time, re
from html import escape
from IPython.display import HTML, display

# ---------- 0) Preconditions ----------
if not callable(globals().get("rag_ui")) or not callable(globals().get("ft_ui")):
    raise RuntimeError("rag_ui / ft_ui are not defined. Run the UI shim cell (or your UI cell) first.")

if "test_pairs" not in globals() or not isinstance(test_pairs, list) or not test_pairs:
    raise RuntimeError("test_pairs not found. Define test_pairs=[(question, answer), ...] first.")

# ---------- 1) Ensure at least N distinct questions by paraphrasing existing ones ----------
def _paraphrase_variants(q: str):
    q = q.strip()
    # keep punctuation sane
    base = q if q.endswith("?") else (q + "?")
    # lightweight templates to keep meaning (answer stays valid)
    return [
        base,
        f"Please answer: {base}",
        f"In brief, {base[0].lower() + base[1:]}",
        f"Quick check — {base[0].lower() + base[1:]}",
        f"Could you tell me: {base[0].lower() + base[1:]}",
        f"Kindly answer: {base[0].lower() + base[1:]}",
        f"What about this: {base[0].lower() + base[1:]}",
        f"I need to know: {base[0].lower() + base[1:]}"
    ]

def expand_test_pairs_inplace(min_n=12, seed=42):
    global test_pairs
    rng = random.Random(seed)
    uniq = []
    seen = set()
    for q,a in test_pairs:
        qn = q.strip()
        if qn and qn not in seen:
            uniq.append((qn,a))
            seen.add(qn)
    if len(uniq) >= min_n:
        test_pairs = uniq
        return

    # add paraphrases of existing Qs, keeping answers
    added = 0
    out = uniq[:]
    i = 0
    while len(out) < min_n:
        q, a = uniq[i % len(uniq)]
        cands = _paraphrase_variants(q)
        rng.shuffle(cands)
        for cand in cands:
            if cand not in seen:
                out.append((cand, a))
                seen.add(cand)
                added += 1
                break
        i += 1
        if i > 200:  # safety
            break
    test_pairs = out


expand_test_pairs_inplace(min_n=12, seed=42)

# ---------- 2) Pick 12 random, distinct questions ----------
def pick_questions(n=12, seed=42):
    pool = [q for q,_ in test_pairs]
    uniq = sorted(set([q.strip() for q in pool if q and q.strip()]))
    if len(uniq) < n:
        raise ValueError(f"Still < {n} questions after expansion (found {len(uniq)}).")
    rng = random.Random(seed)
    return rng.sample(uniq, n)

# ---------- 3) Evaluate both methods ----------
def evaluate_random(n=12, seed=42, topk=5, use_reranker=True):
    rows = []
    questions = pick_questions(n=n, seed=seed)
    for q in questions:
        # RAG
        t0 = time.time()
        r = rag_ui(q, topk=topk, use_reranker=use_reranker)
        dt = time.time() - t0
        rows.append({
            "Question": q,
            "Method": "RAG",
            "Answer": str(r.get("answer","")),
            "Confidence": float(r.get("confidence",0.0)),
            "Time (s)": float(dt),
            "Correct (Y/N)": "Y" if is_correct(q, str(r.get("answer","")), float(r.get("confidence",0.0))) else "N"
        })
        # FT
        t0 = time.time()
        f = ft_ui(q)
        dt = time.time() - t0
        rows.append({
            "Question": q,
            "Method": "Fine-Tune",
            "Answer": str(f.get("answer","")),
            "Confidence": float(f.get("confidence",0.0)),
            "Time (s)": float(dt),
            "Correct (Y/N)": "Y" if is_correct(q, str(f.get("answer","")), float(f.get("confidence",0.0))) else "N"
        })
        # also append to your interactive session log if present
        if "session_rows" in globals():
            session_rows.extend([rows[-2], rows[-1]])
    return rows

# ---------- 4) Render wrapped HTML table ----------
def render_eval_table(rows, title="Random Evaluation (RAG/FT)"):
    cols = ["Question","Method","Answer","Confidence","Time (s)","Correct (Y/N)"]
    css = """
    <style>
      .table-wrapfull {
        border-collapse: collapse; width: 100% !important; table-layout: auto !important;
        font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Arial; font-size:14px;
      }
      .table-wrapfull th, .table-wrapfull td {
        border: 1px solid #e5e7eb; padding: 8px 10px; vertical-align: top;
        white-space: pre-wrap !important; overflow-wrap: anywhere !important; word-break: break-word !important;
        text-overflow: clip !important; max-width: none !important;
      }
      .table-wrapfull th { background: #f8fafc; text-align: left; }
      .table-wrapfull td:nth-child(4), .table-wrapfull td:nth-child(5) { text-align: right; }
      .good { color:#065f46; font-weight:600; } .bad { color:#b91c1c; font-weight:600; }
    </style>
    """
    html = [css, f"<h4>{escape(title)}</h4>", '<table class="table-wrapfull"><thead><tr>']
    for c in cols: html.append(f"<th>{escape(c)}</th>")
    html.append("</tr></thead><tbody>")
    for r in rows:
        html.append("<tr>")
        for c in cols:
            v = r.get(c, "")
            if c == "Correct (Y/N)":
                cls = "good" if v == "Y" else "bad"
                html.append(f'<td><span class="{cls}">{escape(str(v))}</span></td>')
            elif c in ("Confidence","Time (s)"):
                try:
                    num = float(v); html.append(f"<td>{num:.2f}</td>")
                except Exception:
                    html.append(f"<td>{escape(str(v))}</td>")
            else:
                cell = escape(str(v)).replace("\n","<br>")
                html.append(f"<td>{cell}</td>")
        html.append("</tr>")
    html.append("</tbody></table>")
    display(HTML("".join(html)))

# ---------- 5) Summary ----------
def summarize(rows):
    by = {"RAG": [], "Fine-Tune": []}
    for r in rows: by.setdefault(r["Method"], []).append(r)
    lines = []
    for m, lst in by.items():
        if not lst: continue
        acc = sum(1 for r in lst if r["Correct (Y/N)"] == "Y") / len(lst)
        avg_t = sum(r["Time (s)"] for r in lst) / len(lst)
        avg_c = sum(r["Confidence"] for r in lst) / len(lst)
        lines.append(f"{m:9s} — N={len(lst):2d} | Acc={acc:.2%} | Avg conf={avg_c:.2f} | Avg time={avg_t:.2f}s")
    display(HTML("<pre style='font-size:13px'>" + "<br>".join(lines) + "</pre>"))

# ---------- 6) Run ----------
ROWS = evaluate_random(n=12, seed=42, topk=5, use_reranker=True)
render_eval_table(ROWS, title="Random Evaluation (RAG & FT)")
summarize(ROWS)


Question,Method,Answer,Confidence,Time (s),Correct (Y/N)
What was Apple Inc.'s Revenues in 2018?,RAG,revenue,0.75,0.55,N
What was Apple Inc.'s Revenues in 2018?,Fine-Tune,$1.5 billion,1.0,0.54,N
How did Net income (loss) change from 2025 to 2025 for Apple Inc.?,RAG,Net income (loss) shows a increase from $23.43B in 2025 to $84.54B in 2025. ---,1.0,2.31,Y
How did Net income (loss) change from 2025 to 2025 for Apple Inc.?,Fine-Tune,Apple Inc.'s net income (loss) was reduced from $20 million to $17 million in 2025.,1.0,1.95,N
Give me Apple Inc.'s Net income (loss) in 2025?,RAG,Apple Inc.'s Net income (loss) in 2025 stood at $23.43B. ---,1.0,2.09,Y
Give me Apple Inc.'s Net income (loss) in 2025?,Fine-Tune,Apple Inc.'s net income (loss) in 2025 is $1.25 billion.,1.0,2.2,N
How did Total liabilities change from 2025 to 2025 for Apple Inc.?,RAG,---,0.75,0.78,Y
How did Total liabilities change from 2025 to 2025 for Apple Inc.?,Fine-Tune,Apple Inc.'s total liabilities were reduced from $20 million to $1 billion.,1.0,1.48,N
How did Total assets change from 2025 to 2025 for Apple Inc.?,RAG,a increase from $331.23B in 2025 to $331.50B in 2025,1.0,2.13,Y
How did Total assets change from 2025 to 2025 for Apple Inc.?,Fine-Tune,Apple Inc.'s total assets were changed from 2025 to 2025.,0.75,1.79,N
