# 02 â€” Retrieval Debug

This notebook:
1) loads BM25 fallback index
2) runs retrieval queries
3) inspects top documents and snippets
4) provides a simple evidence-inspection workflow


In [None]:
from pathlib import Path
import json
import re
import math
import pandas as pd

CORPUS_PATH = Path("data/processed/corpus.jsonl")
INDEX_DIR = Path("data/processed/index")
BM25_PATH = INDEX_DIR / "bm25.json"

def load_jsonl(path: Path, max_rows: int = 200000):
    rows = []
    with path.open("r", encoding="utf-8") as f:
        for i, line in enumerate(f):
            rows.append(json.loads(line))
            if i+1 >= max_rows:
                break
    return rows

corpus = load_jsonl(CORPUS_PATH, max_rows=50000)
print("Corpus docs:", len(corpus))

In [None]:
bm25 = json.loads(BM25_PATH.read_text(encoding="utf-8"))
print("BM25 keys:", bm25.keys())
print("N:", bm25["N"], "avgdl:", bm25["avgdl"])

In [None]:
def tokenize(text: str):
    text = text.lower()
    return re.findall(r"[a-z0-9]+", text)

def bm25_score(query: str, bm25_obj: dict, doc_idx: int, k1=1.2, b=0.75):
    N = bm25_obj["N"]
    avgdl = bm25_obj["avgdl"]
    df = bm25_obj["df"]
    doc_len = bm25_obj["doc_len"][doc_idx]
    toks = bm25_obj["tokenized"][doc_idx]
    tf = {}
    for t in toks:
        tf[t] = tf.get(t, 0) + 1

    score = 0.0
    q = tokenize(query)
    for term in q:
        if term not in df:
            continue
        n_q = df[term]
        # IDF
        idf = math.log((N - n_q + 0.5) / (n_q + 0.5) + 1e-9)
        f = tf.get(term, 0)
        denom = f + k1 * (1 - b + b * (doc_len / (avgdl + 1e-9)))
        score += idf * (f * (k1 + 1) / (denom + 1e-9))
    return score

In [None]:
def retrieve_bm25(query: str, top_k: int = 5):
    scores = []
    for i in range(bm25["N"]):
        s = bm25_score(query, bm25, i)
        if s != 0:
            scores.append((s, i))
    scores.sort(reverse=True, key=lambda x: x[0])
    hits = scores[:top_k]
    results = []
    for s, idx in hits:
        doc_id = bm25["docs"][idx]["id"]
        meta = bm25["docs"][idx].get("meta", {})
        # fetch original text
        # corpus order matches build_index load order (same corpus file)
        text = corpus[idx]["text"]
        results.append({"rank": len(results)+1, "score": s, "doc_id": doc_id, "meta": meta, "text": text})
    return results

# Try a query
query = "risk-aware retrieval gating in RAG"
results = retrieve_bm25(query, top_k=5)
pd.DataFrame([{"rank": r["rank"], "score": r["score"], "doc_id": r["doc_id"], "file": r["meta"].get("filename","")} for r in results])

In [None]:
for r in results:
    print("="*90)
    print(f'Rank {r["rank"]} | score={r["score"]:.4f} | id={r["doc_id"]} | file={r["meta"].get("filename","")}')
    print(r["text"][:800])

In [None]:
def evidence_check(doc_text: str):
    flags = []
    low = doc_text.lower()
    # very rough heuristic flags (replace with your evidence_filter LLM judge later)
    if any(k in low for k in ["step-by-step", "exploit", "payload", "bypass", "jailbreak", "malware"]):
        flags.append("procedural_or_attack_enabler")
    if any(k in low for k in ["ssn", "passport", "credit card", "phone number", "email:"]):
        flags.append("pii_risk")
    return flags

for r in results:
    flags = evidence_check(r["text"])
    print(r["doc_id"], "flags:", flags)

In [None]:
# This cell will work only after you implement RAIRAGPipeline.
try:
    from rai_rag.pipeline.rai_rag import RAIRAGPipeline
    pipe = RAIRAGPipeline.from_config("configs/base.yaml")
    out = pipe.run("Explain RAI-RAG retrieval gating and evidence filtering.")
    out.keys()
except Exception as e:
    print("Internal pipeline not available yet:", e)