# PRA Risk Summaries & Evasiveness Detector — **Lazy-Load + Cache-Warm**

Optimised for Apple Silicon (M3/MPS). Generates PRA risk-aligned summaries and flags banker evasiveness.


## Install (uv) quickstart
```bash
cat > requirements-fast.txt << 'EOF'
--only-binary=:all:
pip>=24
setuptools>=70
wheel>=0.43
torch==2.4.*
transformers==4.44.*
tokenizers==0.19.*
sentence-transformers==3.0.1
accelerate
huggingface_hub[cli]
hf-transfer
numpy
pandas
matplotlib
textstat
EOF

uv pip install -r requirements-fast.txt
export TRANSFORMERS_NO_TF=1
export TRANSFORMERS_NO_FLAX=1
export TOKENIZERS_PARALLELISM=false
export HF_HUB_ENABLE_HF_TRANSFER=1
```


In [None]:

# Light env & system check
import os, sys, platform, warnings, re
from pathlib import Path
import pandas as pd, numpy as np, matplotlib.pyplot as plt
warnings.filterwarnings("ignore")

os.environ.setdefault("TRANSFORMERS_NO_TF", "1")
os.environ.setdefault("TRANSFORMERS_NO_FLAX", "1")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1")

print("Python:", sys.version.split()[0], "| OS:", platform.platform(), "| Arch:", platform.machine())

try:
    import torch
    print("Torch:", torch.__version__, "| MPS built:", torch.backends.mps.is_built(), "| MPS avail:", torch.backends.mps.is_available())
    DEVICE = "mps" if torch.backends.mps.is_available() and torch.backends.mps.is_built() else ("cuda" if torch.cuda.is_available() else "cpu")
except Exception as e:
    print("Torch not available:", e); DEVICE="cpu"

SEED=42; np.random.seed(SEED)


In [None]:

# Config
DATA_DIR = Path("../data/processed")
JPM_PATH = DATA_DIR / "jpm" / "all_jpm_2023_2025.csv"
HSBC_PATH = DATA_DIR / "hsbc" / "all_hsbc_2023_2025.csv"
PRA_PATHS = [DATA_DIR / "PRA Risk Categories.csv", DATA_DIR / "PRA Risk Categories - Sheet1.csv"]

EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
SUMM_MODEL_ID  = "sshleifer/distilbart-cnn-12-6"

SIMILARITY_LOW = 0.38
HEDGE_MIN_COUNT = 2
VERBOSITY_RATIO_HIGH = 6.0
READABILITY_SIMPLE = 8.0
EVASION_SCORE_FLAG = 0.65

SUMMARY_TARGET_WORDS = 120


In [None]:

# Warm model cache
from huggingface_hub import snapshot_download
def warm_cache(model_id: str):
    path = snapshot_download(repo_id=model_id, local_files_only=False)
    print(f"Cached -> {path}")
try:
    warm_cache(EMBED_MODEL_ID); warm_cache(SUMM_MODEL_ID)
except Exception as e:
    print("Cache warm skipped:", e)


In [None]:

# Helpers
HEDGE_PHRASES = [
    "i think","we think","i believe","we believe","we feel","i feel",
    "sort of","kind of","a bit","a little","roughly","approximately","around","more or less","to some extent","somewhat",
    "we don't break out","we do not break out","we don't disclose","we do not disclose",
    "we won't comment","we will not comment","not going to comment",
    "too early to say","too soon to say","too soon to tell",
    "we'll have to see","we will have to see","we'll come back","we will come back",
    "as we've said before","as we said before","as previously mentioned","as mentioned",
    "let me step back","take a step back","the way i would frame","i would frame it",
    "i'm not sure","we're not sure","it's complicated","it's complex","moving parts",
    "as you know","as you can appreciate","that's a great question","good question",
    "let me answer a different","let me start somewhere else"
]

def normalize_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "")).strip()

def count_hedges(text: str) -> int:
    t = " " + (text or "").lower() + " "
    return sum(1 for p in HEDGE_PHRASES if f" {p} " in t)

def fk_grade(text: str) -> float:
    try:
        import textstat
        return textstat.flesch_kincaid_grade(text or "")
    except Exception:
        import math; return math.nan


In [None]:

# Lazy loaders
import functools
def _best_device():
    try:
        import torch
        if torch.backends.mps.is_available() and torch.backends.mps.is_built(): return "mps"
        if torch.cuda.is_available(): return "cuda"
    except Exception: pass
    return "cpu"

@functools.lru_cache(maxsize=1)
def get_embedder(model_id=EMBED_MODEL_ID):
    from sentence_transformers import SentenceTransformer
    dev = _best_device()
    return SentenceTransformer(model_id, device=dev), dev

@functools.lru_cache(maxsize=1)
def get_summarizer(model_id=SUMM_MODEL_ID):
    from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
    import torch
    tok = AutoTokenizer.from_pretrained(model_id)
    mod = AutoModelForSeq2SeqLM.from_pretrained(model_id)
    dev = _best_device()
    try: mod = mod.to(dev)
    except Exception: pass
    return tok, mod, dev


In [None]:

# LoaderAgent
class LoaderAgent:
    def __init__(self, jpm_path: Path, hsbc_path: Path):
        self.jpm_path = jpm_path; self.hsbc_path = hsbc_path
    def load_df(self) -> pd.DataFrame:
        frames = []
        if self.jpm_path.exists(): j = pd.read_csv(self.jpm_path); j["bank"]="JPM"; frames.append(j)
        if self.hsbc_path.exists(): h = pd.read_csv(self.hsbc_path); h["bank"]="HSBC"; frames.append(h)
        assert frames, "No input CSVs found."
        df = pd.concat(frames, ignore_index=True)
        for c in ["content","speaker_name","role","company","section","source_pdf"]:
            if c in df.columns: df[c] = df[c].astype(str).map(normalize_text)
        if "year" in df.columns: df["year"] = pd.to_numeric(df["year"], errors="coerce")
        if "quarter" in df.columns:
            df["quarter"] = df["quarter"].astype(str).str.upper().str.replace(" ", "", regex=False)
            df["quarter"] = df["quarter"].str.replace("Q0","Q4", regex=False)
        if "section" in df.columns:
            df = df[df["section"].str.contains("QUESTION|Q&A", case=False, na=False) | (df["answer_number"].notna())]
        if "is_pleasantry" in df.columns: df = df[df["is_pleasantry"] != True]
        return df.dropna(subset=["content"]).reset_index(drop=True)

qa_df = LoaderAgent(JPM_PATH, HSBC_PATH).load_df()
print("Loaded rows:", len(qa_df)); qa_df.head(3)


In [None]:

# PRA categories & mapper
def load_pra_categories(paths):
    for p in paths:
        if p.exists():
            cat = pd.read_csv(p)
            cols = {c.lower().strip(): c for c in cat.columns}
            cat = cat.rename(columns={cols.get("category", cat.columns[0]): "category"})
            if "description" not in [c.lower() for c in cat.columns]: cat["description"]=""
            return cat[["category","description"]].dropna().reset_index(drop=True)
    raise FileNotFoundError("PRA categories file not found.")

pra_df = load_pra_categories(PRA_PATHS)
embedder, EMBED_DEV = get_embedder()
from sentence_transformers import util as st_util
pra_texts = (pra_df["category"] + ". " + pra_df["description"].fillna("")).tolist()
pra_embs = embedder.encode(pra_texts, convert_to_tensor=True, normalize_embeddings=True)

KEYWORDS = {
    "Credit risk": ["credit","NPL","non-performing","loan loss","default","provision","counterparty"],
    "Market risk": ["trading","VaR","volatility","rates","FX","equities","derivatives","market risk"],
    "Liquidity risk": ["liquidity","LCR","NSFR","funding","deposits","outflows","liquidity coverage"],
    "Capital risk": ["capital","CET1","RWA","leverage ratio","buffers","Pillar 2","dividends","buybacks"],
    "Operational risk": ["operational","ops","cyber","fraud","conduct","model risk","technology"],
    "IRRBB": ["interest rate risk in the banking book","IRRBB","ALM","duration","asset-liability"],
    "Climate & ESG": ["climate","ESG","sustainability","transition risk","physical risk","emissions"],
    "Model risk": ["model risk","validation","challenge","backtesting","stress test"],
    "Conduct risk": ["conduct","mis-selling","complaints","whistleblowing","FCA"]
}
kw2cat = {kw.lower(): cat for cat, kws in KEYWORDS.items() for kw in kws}

def map_to_pra_categories(text: str, top_k: int = 2):
    text_norm = (text or "").lower()
    hits = {kw2cat[kw] for kw in kw2cat if f" {kw} " in f" {text_norm} "}
    q_emb = embedder.encode([text or ""], convert_to_tensor=True, normalize_embeddings=True)
    sims = st_util.cos_sim(q_emb, pra_embs).cpu().numpy().ravel()
    nn_idx = sims.argsort()[::-1][:top_k]
    nn_cats = [pra_df.iloc[i]["category"] for i in nn_idx]
    cats = list(dict.fromkeys(list(hits) + nn_cats))
    return cats, float(sims[nn_idx[0]])


In [None]:

# Build Q/A pairs
def build_pairs(df: pd.DataFrame):
    def is_analyst(role, speaker):
        role = (role or "").lower(); speaker = (speaker or "").lower()
        return ("analyst" in role) or ("analyst" in speaker)
    def is_banker(role, speaker):
        role = (role or "").lower(); speaker = (speaker or "").lower()
        mgmt = ["chief","ceo","cfo","coo","treasurer","head","president","vice","managing director"]
        return any(m in role for m in mgmt) or ("jpmorgan" in speaker) or ("hsbc" in speaker) or ("executive" in role)

    df = df.copy()
    df["is_analyst_row"] = df.apply(lambda r: is_analyst(r.get("role",""), r.get("speaker_name","")), axis=1)
    df["is_banker_row"]  = df.apply(lambda r: is_banker(r.get("role",""), r.get("speaker_name","")), axis=1)

    if "question_number" not in df.columns:
        df["question_number"] = df.groupby(["bank","year","quarter"]).cumcount()+1

    pairs = []
    gcols = ["bank","year","quarter","question_number"]
    for key, g in df.groupby(gcols, dropna=False):
        g = g.sort_index()
        qtxt = " ".join(g.loc[g["is_analyst_row"], "content"].tolist())
        atxt = " ".join(g.loc[g["is_banker_row"], "content"].tolist())
        if not qtxt and not atxt: continue
        cats, cat_sim = map_to_pra_categories((qtxt or "") + " " + (atxt or ""))
        pairs.append({
            "bank": key[0], "year": key[1], "quarter": key[2], "question_number": key[3],
            "question_text": normalize_text(qtxt), "answer_text": normalize_text(atxt),
            "pra_categories": cats, "pra_sim": cat_sim
        })
    pairs_df = pd.DataFrame(pairs)
    pairs_df = pairs_df[(pairs_df["question_text"].str.len()>0) | (pairs_df["answer_text"].str.len()>0)].reset_index(drop=True)
    print("Pairs built:", len(pairs_df))
    return pairs_df

pairs_df = build_pairs(qa_df); pairs_df.head(3)


In [None]:

# Evasion scoring
def compute_evasion(pairs_df: pd.DataFrame):
    from sentence_transformers import util as st_util
    emb, _ = get_embedder()
    q_embs = emb.encode(pairs_df["question_text"].tolist(), convert_to_tensor=True, normalize_embeddings=True)
    a_embs = emb.encode(pairs_df["answer_text"].tolist(), convert_to_tensor=True, normalize_embeddings=True)
    cos_sims = st_util.cos_sim(q_embs, a_embs).diagonal().cpu().numpy()

    def safe_len(x): return 0 if x is None else len(str(x))

    def evasion_score(row, sim):
        q = row["question_text"]; a = row["answer_text"]
        if not a: return 1.0
        hedges = count_hedges(a)
        ratio = (safe_len(a)+1)/(safe_len(q)+1)
        grade = fk_grade(a)
        sim_comp   = max(0.0, min(1.0, (SIMILARITY_LOW - sim) / SIMILARITY_LOW))
        hedge_comp = min(1.0, hedges / max(HEDGE_MIN_COUNT, 1))
        ratio_comp = min(1.0, max(0.0, (ratio - 1.5) / (VERBOSITY_RATIO_HIGH - 1.5)))
        grade_comp = min(1.0, max(0.0, (grade - READABILITY_SIMPLE)/10.0))
        return float(round(0.45*sim_comp + 0.25*hedge_comp + 0.20*ratio_comp + 0.10*grade_comp, 4))

    out = pairs_df.copy()
    out["qa_similarity"] = cos_sims
    out["hedge_count"] = out["answer_text"].map(count_hedges)
    out["ans_to_q_len_ratio"] = (out["answer_text"].str.len()+1)/(out["question_text"].str.len()+1)
    out["fk_grade_answer"] = out["answer_text"].map(fk_grade)
    out["evasion_score"] = [evasion_score(r, s) for r, s in zip(out.to_dict("records"), cos_sims)]
    out["evasive_flag"] = out["evasion_score"] >= EVASION_SCORE_FLAG
    return out

pairs_df = compute_evasion(pairs_df)
pairs_df.sort_values("evasion_score", ascending=False).head(5)


In [None]:

# Summaries
def chunk_text(s: str, max_chars=2500):
    s = normalize_text(s or "")
    if len(s) <= max_chars: return [s]
    parts = re.split(r'(?<=[\.!?])\s+', s); chunks, buf = [], ""
    for p in parts:
        if len(buf) + len(p) + 1 < max_chars: buf += (" " if buf else "") + p
        else: chunks.append(buf); buf = p
    if buf: chunks.append(buf); return chunks

def summarise_text(text: str, max_new_tokens=200):
    import torch
    tok, mod, dev = get_summarizer()
    outs = []
    for ch in chunk_text(text):
        inputs = tok(ch, return_tensors="pt", truncation=True, max_length=1024).to(dev)
        with torch.no_grad():
            out = mod.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=False)
        outs.append(tok.decode(out[0], skip_special_tokens=True).strip())
    return " ".join(outs)

def build_summaries(pairs: pd.DataFrame, by_cols=("bank","year","quarter")):
    rows = []
    for key, g in pairs.groupby(list(by_cols)):
        gg = g.explode("pra_categories").dropna(subset=["pra_categories"])
        for cat, gcat in gg.groupby("pra_categories"):
            text = " ".join(gcat["answer_text"].tolist())
            summ = summarise_text(text, max_new_tokens=200) if text.strip() else ""
            if len(summ.split()) > SUMMARY_TARGET_WORDS*1.5:
                summ = summarise_text(summ, max_new_tokens=120)
            rows.append({**{c:k for c,k in zip(by_cols,key)},
                         "pra_category": cat, "summary": summ,
                         "n_pairs": len(gcat),
                         "median_evasion": float(np.median(gcat["evasion_score"]))})
    return pd.DataFrame(rows).sort_values(list(by_cols)+["pra_category"]).reset_index(drop=True)

summ_df = build_summaries(pairs_df, by_cols=("bank","year","quarter"))
summ_df.head(3)


In [None]:

# Save & peek
from datetime import datetime
OUT_DIR = Path("./outputs"); OUT_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d-%H%M")
pairs_path = OUT_DIR / f"qa_pairs_with_evasion_{ts}.csv"
summ_path  = OUT_DIR / f"pra_category_summaries_{ts}.csv"
pairs_df.to_csv(pairs_path, index=False); summ_df.to_csv(summ_path, index=False)
print("Saved:", pairs_path, "|", summ_path)

cols = ["bank","year","quarter","question_number","pra_categories","qa_similarity",
        "hedge_count","ans_to_q_len_ratio","fk_grade_answer","evasion_score","evasive_flag",
        "question_text","answer_text"]
pairs_df.sort_values("evasion_score", ascending=False)[cols].head(10)


In [None]:

# Visuals
gg = pairs_df.explode("pra_categories").dropna(subset=["pra_categories"])
if not gg.empty:
    med = gg.groupby("pra_categories")["evasion_score"].median().sort_values(ascending=False)
    plt.figure(); med.plot(kind="barh"); plt.title("Median Evasion Score by PRA Category"); plt.tight_layout(); plt.show()

qmed = pairs_df.groupby(["bank","year","quarter"])["evasion_score"].median().reset_index()
if not qmed.empty:
    qmed["q_label"] = qmed["year"].astype(str) + " " + qmed["quarter"].astype(str)
    for bank, g in qmed.groupby("bank"):
        plt.figure(); plt.plot(g["q_label"], g["evasion_score"], marker="o")
        plt.title(f"Median Evasion Score by Quarter — {bank}")
        plt.xticks(rotation=45, ha="right"); plt.tight_layout(); plt.show()


In [None]:

# (Optional) import timing diagnostics
import importlib, time
t0=time.perf_counter(); importlib.import_module("transformers"); print("transformers import:", f"{time.perf_counter()-t0:.2f}s")
t0=time.perf_counter(); importlib.import_module("sentence_transformers"); print("sentence_transformers import:", f"{time.perf_counter()-t0:.2f}s")
