# Environment Setup

In [1]:
# Install necessary Python packages
!pip install -q python-docx google-api-python-client faiss-cpu transformers accelerate \
                huggingface_hub sentencepiece rank-bm25 sentence-transformers

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.0/253.0 kB[0m [31m17.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m61.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# Import necessary libraries

# Standard library
import io, re, math
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List
from datetime import datetime

# Third-party libraries
import numpy as np
import pandas as pd
import faiss, torch
from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer

# Hugging Face Transformers
from transformers import AutoModelForCausalLM, AutoTokenizer

# Google Colab & Drive API
from google.colab import auth, userdata
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload

# Notebook UI
from IPython.display import display, Markdown
import ipywidgets as widgets

# Source Data Loading

In [3]:
# [Data Structures Initialisation]
# ---------------------------------------------------------------------
# This cell initialises the core data structures used throughout the
# notebook, including the main dataframe ('database_df') and the
# 'Chunk' dataclass, which stores extracted text segments and their
# associated metadata.
# ---------------------------------------------------------------------

# Initialise an empty dataframe to hold processed document data
database_df = pd.DataFrame()

# Initialise chunks
@dataclass
class Chunk:
  """Represents a discrete text segment extracted from a source document.

  Attributes:
    chunk_id (str): Unique identifier for the chunk.
    text (str): The text content of the chunk.
    filename (str): Original filename from which the chunk was extracted.
    doc_id (str): Document-level identifier (earning call transcript).
    speaker (str): Name or role of the speaker (for transcript data).
    row_index (int): Row number in the source dataframe.
    block_index (int): Sequential index of the chunk within a block.
    metadata (Dict[str, Any]): Additional contextual information.
  """
  chunk_id: str
  text: str
  filename: str
  doc_id: str
  speaker: str
  row_index: int
  block_index: int
  metadata: Dict[str, Any]

# Initialise storage containers
chunks: list[Chunk] = []                     # List of all extracted chunks
dense_embeddings: np.ndarray | None = None   # Placeholder for dense vector embeddings
faiss_index: faiss.Index | None = None       # Placeholder for FAISS vector index

In [4]:
# [Notebook Loading and Google Drive Integration]
# ---------------------------------------------------------------------
# This cell authenticates the current Colab session, locates a specified
# notebook within Google Drive, downloads it locally, and executes it
# to initialise shared objects 'database_df', 'chunks',
# 'faiss_index', and 'dense_embeddings'.
# ---------------------------------------------------------------------
# 1. Authenticate user for Google Drive access
auth.authenticate_user()
drive_service = build('drive', 'v3')

# 2. Configuration parameters
#    These specify the Drive folder and notebook to be fetched.
FOLDER_ID = "1cgcKiFUL8tHVWESI77pxCQhXp7ASCmg4"
FILE_NAME = "01_Load_Database.ipynb"

# 3. Search for the notebook within the given folder
results = drive_service.files().list(
  q=f"'{FOLDER_ID}' in parents and name='{FILE_NAME}'",
  fields="files(id, name)"
).execute()

files = results.get('files', [])
if not files:
  raise FileNotFoundError(f"{FILE_NAME} not found in shared folder.")
file_id = files[0]['id']

# 4. Download the notebook to the Colab runtime
request = drive_service.files().get_media(fileId=file_id)
fh = io.FileIO(FILE_NAME, "wb")
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
  status, done = downloader.next_chunk()

# 5. Execute the downloaded notebook
#    '%run' executes the notebook as a script within the same namespace
%run "$FILE_NAME"

In [5]:
# Sanity embeddings check
assert len(chunks) == dense_embeddings.shape[0]
for i in range(3):
    assert chunks[i].metadata.get("embedding_row") == i

# Language Model Initialization

In [6]:
# [LLM Loader]
# ---------------------------------------------------------------------
# This cell defines a helper function to load a selected Large Language
# Model (LLM) and its corresponding tokenizer from Hugging Face Hub.
# It retrieves the authentication token from 'google.colab.userdata'
# and automatically configures the device mapping (GPU/CPU).
# ---------------------------------------------------------------------
def load_llm(model_id: str):
  """Load a Hugging Face LLM model and tokenizer into the Colab environment.

  This function fetches the authentication token from the user's
  Colab 'userdata', then downloads and initialises the model and tokenizer
  from the Hugging Face Hub.

  Args:
    model_id (str): The model identifier on Hugging Face (e.g., "microsoft/phi-4-mini").

  Returns:
    tuple:
      model (torch.nn.Module): The loaded LLM model.
      tokenizer (transformers.PreTrainedTokenizer): The corresponding tokenizer.

  Raises:
    RuntimeError: If the Hugging Face token is not found in 'google.colab.userdata'.
  """
  # 1. Retrieve Hugging Face token
  token = userdata.get("HF_TOKEN")
  if not token:
    raise RuntimeError("HF_TOKEN secret not found in google.colab.userdata")

  # 2. Load tokenizer and model
  #    Automatically uses bfloat16 on GPU or float32 on CPU.
  tokenizer = AutoTokenizer.from_pretrained(model_id, token=token)
  model = AutoModelForCausalLM.from_pretrained(
    model_id,
    dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
    device_map="auto",
    token=token,
  )

  return model, tokenizer

In [7]:
# [LLM Loading]
# ---------------------------------------------------------------------
# This cell loads the chosen Large Language Model (LLM)
# along with its tokenizer, using the previously
# defined 'load_llm()' helper function.
# ---------------------------------------------------------------------

# Load LLM model and tokenizer
model, tok = load_llm("microsoft/phi-4") # "google/gemma-3-27b-it" or "microsoft/phi-4"

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

added_tokens.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/95.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/802 [00:00<?, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

model-00005-of-00006.safetensors:   0%|          | 0.00/4.77G [00:00<?, ?B/s]

model-00004-of-00006.safetensors:   0%|          | 0.00/4.77G [00:00<?, ?B/s]

model-00006-of-00006.safetensors:   0%|          | 0.00/4.99G [00:00<?, ?B/s]

model-00003-of-00006.safetensors:   0%|          | 0.00/4.90G [00:00<?, ?B/s]

model-00001-of-00006.safetensors:   0%|          | 0.00/4.93G [00:00<?, ?B/s]

model-00002-of-00006.safetensors:   0%|          | 0.00/4.95G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/156 [00:00<?, ?B/s]

In [8]:
# [Model Query Function]
# ---------------------------------------------------------------------
# This cell defines a utility function to interact with a loaded LLM.
# It sends a chat-style prompt (system + user messages) to the model and
# returns the generated response as a clean JSON-formatted string.
# ---------------------------------------------------------------------

def ask_model(model, tokenizer, role, task, tokens):
  """Prompt a selected LLM and return its response as plain text or JSON.

  Constructs a chat-style message using the Hugging Face chat template,
  sends it to the specified model, and decodes the generated output.
  The function is designed for concise and deterministic completions,
  typically returning structured JSON or short text outputs.

  Args:
    model (torch.nn.Module): The loaded LLM model.
    tokenizer (transformers.PreTrainedTokenizer): Tokenizer corresponding to the model.
    role (str): System message defining the assistant's role or context.
    task (str): User query or instruction for the model to complete.
    tokens (int): Maximum number of new tokens to generate.

  Returns:
    str: Model response as a decoded text string, trimmed of special tokens.

  Example:
    >>> ask_model(model, tok, "You are a summarizer.", "Summarize this text:", 200)
  """
  # 1. Construct system + user messages in chat format
  message = [
    {"role": "system", "content": role},
    {"role": "user", "content": task}
  ]
  # 2. Apply the chat template to form the final model prompt
  prompt =  tokenizer.apply_chat_template(
    message,
    tokenize=False,
    add_generation_prompt=True
  )
  # 3. Tokenize the prompt and move tensors to the model's device
  inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
  # 4. Generate model output and decode only the new tokens
  out = model.generate(**inputs, max_new_tokens=tokens)
  return tokenizer.decode(
    out[0, inputs["input_ids"].shape[1]:],
    skip_special_tokens=True
  ).strip()

# Evidence Retrieval

In [9]:
# [Chatbot Retrieval Setup]
# ---------------------------------------------------------------------
# This cell prepares the minimal retrieval infrastructure for the chatbot.
# It builds a BM25 index from tokenised text chunks and loads a
# sentence-embedding model for dense retrieval. The FAISS index is
# assumed to be pre-loaded in memory from the external notebook.
# ---------------------------------------------------------------------

# 1. Build BM25 index from chunk metadata
bm25_tokens = []
for chunk in chunks:
  bm25_tokens.append(chunk.metadata.get("bm25_tokens"))

bm25_index = BM25Okapi(bm25_tokens)

# 2. Load dense embedding model
embedder = SentenceTransformer("BAAI/bge-large-en-v1.5")

# 3. Confirm readiness
#    Note: 'faiss_index' is expected to be already available in memory.
print("Chatbot retrieval prerequisites ready.")

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/779 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/366 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/191 [00:00<?, ?B/s]

Chatbot retrieval prerequisites ready.


In [16]:
# [Retrieval Configuration]
# ---------------------------------------------------------------------
# Centralised knobs for retrieval + document metadata lookup.
# ---------------------------------------------------------------------

# Retrieval knobs
HYBRID_DENSE_WEIGHT = 0.55  # tune as needed (0 → BM25 only, 1 → dense only)
BM25_TOP_K = 50             # how many BM25 candidates to consider before merging
DENSE_TOP_K = 50            # how many dense candidates to consider before merging
MAX_CONTEXT_CHUNKS = 8      # final number of chunks handed to the generator
META_WEIGHT = 0.65          # metadata contribution when re-scoring [0..1]

# Lookup from dataframe metadata (assumes 'database_df' exists)
doc_lookup: Dict[str, Dict[str, Any]] = (
  database_df.set_index("doc_id")[["bank", "year", "quarter"]].to_dict("index")
)

# Pre-compute year range for recency-based scoring
_years = [v.get("year") for v in doc_lookup.values() if isinstance(v.get("year"), (int, float))]
YEAR_MIN = min(_years) if _years else None
YEAR_MAX = max(_years) if _years else None

In [18]:
# [Metadata Scoring Helpers]
# ---------------------------------------------------------------------
# Scoring utilities to weight chunks by metadata quality.
# Incorporates recency, speaker type, sentiment, numeric evidence,
# topical overlap, and query intent alignment.
# ---------------------------------------------------------------------

# --- Risk topic keyword mapping (used to infer query tags) ---
RISK_TOPIC_KEYWORDS = {
  "capital":       ["cet1", "capital", "rwa", "rwas", "mrel", "leverage"],
  "liquidity":     ["liquidity", "lcr", "nsfr", "deposit", "deposits", "outflows", "funding"],
  "asset_quality": ["impairment", "ecl", "expected credit loss", "credit quality", "npl", "stage 3", "charge-offs"],
  "profitability": ["nii", "nim", "net interest", "margin", "rote", "roe", "r ote", "cost-income", "ppe", "pbt"],
  "market":        ["trading", "markets", "volatility", "fx", "rates", "equities", "debt"],
  "costs":         ["cost", "opex", "expenses", "costs", "restructuring"],
  "outlook":       ["outlook", "guidance", "pipeline", "trend", "expect"],
}

# --- Query keywords indicating risk or performance focus ---
RISK_QUERY_KEYWORDS = [
  "risk", "concern", "deterioration", "impair", "ecl", "expected credit",
  "defaults", "npl", "stage 3", "outflow", "deposit outflow", "guidance",
  "outlook", "pressure", "shortfall", "write-off", "provision", "charge",
  "liquidity", "capital", "stress", "headwind",
]

# ---------------------------------------------------------------------
# Core utility functions
# ---------------------------------------------------------------------

def _normalise01(x: float, lo: float, hi: float) -> float:
  """Min–max normalise x into [0, 1]."""
  if lo is None or hi is None or hi <= lo:
    return 0.0
  return max(0.0, min(1.0, (x - lo) / (hi - lo)))


def _parse_temporal_hints(text: str) -> dict:
  """Extract temporal cues (year, quarter) from a query string."""
  m_q = re.search(r"\bQ([1-4])\b", text, re.I)
  m_y = re.search(r"\b(20\d{2})\b", text)
  return {
    "quarter": int(m_q.group(1)) if m_q else None,
    "year": int(m_y.group(1)) if m_y else None,
  }


def _query_topic_tags(text: str) -> set:
  """Infer topic tags from query text based on keyword mapping."""
  t = text.lower()
  return {tag for tag, kws in RISK_TOPIC_KEYWORDS.items() if any(kw in t for kw in kws)}


def _numeric_intent(text: str) -> bool:
  """True if query likely requests quantitative data."""
  return bool(re.search(r"\d|%|bps|pp|basis\s*points", text, re.I))


def _risk_intent(text: str) -> bool:
  """True if query expresses concern or risk intent."""
  t = text.lower()
  return any(k in t for k in RISK_QUERY_KEYWORDS)

# ---------------------------------------------------------------------
# Speaker / sentiment / recency weighting
# ---------------------------------------------------------------------

def _speaker_weight(chunk: Chunk, query: str) -> float:
  """Weight speaker role, with risk-aware analyst boost."""
  role = ((chunk.metadata or {}).get("person_type") or "").strip().lower()
  base = {"presenter": 0.8, "analyst": 0.6, "operator": 0.1}.get(role, 0.5)
  if _risk_intent(query) and role == "analyst":
    base = min(1.0, base + 0.2)
  return base


def _sentiment_weight(meta: Dict[str, Any]) -> float:
  """Weight by sentiment polarity."""
  label = (meta.get("sentiment", {}).get("label") or "").lower()
  return {"negative": 0.7, "neutral": 0.4, "positive": 0.2}.get(label, 0.0)


def _recency_weight(doc_meta: Dict[str, Any]) -> float:
  """Weight newer documents higher based on year."""
  y = doc_meta.get("year")
  if isinstance(y, (int, float)):
    return _normalise01(float(y), globals().get("YEAR_MIN"), globals().get("YEAR_MAX"))
  return 0.0

# ---------------------------------------------------------------------
# Topic / numeric / QA alignment
# ---------------------------------------------------------------------

def _tags_weight(chunk: Chunk, query_tags: set) -> float:
  """Measure overlap between chunk topics and query tags."""
  meta = chunk.metadata or {}
  topics = set(meta.get("topics") or [])
  rf_names = {str(r.get("name")) for r in (meta.get("risk_flags") or []) if isinstance(r, dict)}
  kpi_names = {str(k.get("name")) for k in (meta.get("kpis") or []) if isinstance(k, dict)}
  all_tags = {t.lower() for t in topics | rf_names | kpi_names}
  if not query_tags:
    return 0.0
  overlap = len(all_tags & {t.lower() for t in query_tags})
  return min(1.0, overlap / max(1, len(query_tags)))


def _numeric_presence(chunk: Chunk) -> float:
  """Detect numeric evidence in text or metadata."""
  if re.search(r"\d", chunk.text) or (chunk.metadata or {}).get("kpis"):
    return 0.5
  return 0.0


def _qa_section_weight(meta: Dict[str, Any], query: str) -> float:
  """Weight Q&A relevance for exploratory or risk-oriented queries."""
  t = (meta.get("type") or "").lower()
  q = query.lower()
  if any(k in q for k in ("why", "how", "outlook", "guidance")) or _risk_intent(query):
    if t in {"answer", "question"}:
      return 0.3
  return 0.0

# ---------------------------------------------------------------------
# Aggregate metadata score
# ---------------------------------------------------------------------

def _metadata_score(chunk: Chunk, query: str) -> float:
  """Compute overall metadata relevance score (0–1)."""
  doc_meta = doc_lookup.get(chunk.doc_id, {})
  q_hints = _parse_temporal_hints(query)
  q_tags = _query_topic_tags(query)

  # Temporal alignment boost
  align = 0.0
  doc_year = doc_meta.get("year")
  doc_quarter = doc_meta.get("quarter")
  if q_hints.get("year") is not None and doc_year is not None and doc_year == q_hints["year"]:
    align += 0.3
  if q_hints.get("quarter") is not None and doc_quarter is not None and doc_quarter == q_hints["quarter"]:
    align += 0.2

  # Weighted sum of factors
  score = (
    0.35 * _recency_weight(doc_meta)
    + 0.25 * _tags_weight(chunk, q_tags)
    + 0.20 * _speaker_weight(chunk, query)
    + 0.10 * _sentiment_weight(chunk.metadata or {})
    + 0.10 * (_numeric_intent(query) * _numeric_presence(chunk))
    + _qa_section_weight(chunk.metadata or {}, query)
    + align
  )

  return max(0.0, min(1.0, score))

In [19]:
# [Hybrid Retrieval Engine]
# ---------------------------------------------------------------------
# Hybrid retrieval: combines BM25 (sparse) and dense (embeddings/FAISS)
# to rank chunks, then applies a metadata-based re-scoring step.
# ---------------------------------------------------------------------

@dataclass
class RetrievedChunk:
  """Container for a retrieved chunk and its scores."""
  chunk: Chunk
  bm25_score: float
  dense_score: float
  hybrid_score: float
  meta_score: float
  final_score: float

def tokenise_for_bm25(text: str) -> list[str]:
  """Lightweight tokeniser for BM25 (whitespace, newline-normalised)."""
  return text.replace("\n", " ").split()

def _normalise(scores: List[float]) -> List[float]:
  """Min–max normalise a list of scores to [0, 1]."""
  if not scores:
    return []
  arr = np.asarray(scores, dtype=float)
  if np.allclose(arr.max(), arr.min()):
    return [1.0 for _ in scores]
  norm = (arr - arr.min()) / (arr.max() - arr.min())
  return norm.tolist()

def _dense_search(query_vec: np.ndarray, top_k: int):
  """Return (indices, scores) of the top_k dense matches.

  Uses FAISS if available; otherwise falls back to NumPy dot-product
  against 'dense_embeddings'.
  """
  if faiss_index is not None:
    scores, indices = faiss_index.search(query_vec.reshape(1, -1), top_k)
    return indices[0].tolist(), scores[0].tolist()
  # Fallback: NumPy dot-product
  sims = np.dot(dense_embeddings, query_vec)
  order = np.argsort(sims)[::-1][:top_k]
  return order.tolist(), sims[order].tolist()

def hybrid_retrieve(query: str, top_chunks: int = MAX_CONTEXT_CHUNKS) -> List[RetrievedChunk]:
  """Run BM25 + dense retrieval, merge scores, and apply metadata re-scoring.

  Steps:
    1) Compute BM25 scores and select BM25_TOP_K candidates.
    2) Encode query, compute dense similarities, select DENSE_TOP_K candidates.
    3) Merge candidates by index; blend normalised sparse/dense into hybrid_score.
    4) Compute meta_score via _metadata_score(chunk, query).
    5) Blend into final_score = (1 - META_WEIGHT)*hybrid + META_WEIGHT*meta.
    6) Sort by final_score and return top_chunks items.

  Args:
    query (str): User query text.
    top_chunks (int): Final number of chunks returned.

  Returns:
    List[RetrievedChunk]: Ranked retrieval results with score breakdowns.
  """
  # --- Sparse scores (BM25) ---
  query_tokens = tokenise_for_bm25(query)
  bm25_scores = bm25_index.get_scores(query_tokens)
  bm25_indices = np.argsort(bm25_scores)[::-1][:BM25_TOP_K]
  bm25_norm = _normalise([bm25_scores[i] for i in bm25_indices])

  # --- Dense scores (embeddings/FAISS) ---
  query_vec = embedder.encode([query], normalize_embeddings=True)[0].astype("float32")
  dense_indices, dense_scores = _dense_search(query_vec, DENSE_TOP_K)
  dense_norm = _normalise(dense_scores)

  # --- Merge normalised scores keyed by chunk index ---
  combined: Dict[int, Dict[str, float]] = {}

  for idx, score in zip(bm25_indices, bm25_norm):
    combined[idx] = {
      "bm25_score": bm25_scores[idx],
      "dense_score": 0.0,
      "bm25_norm": score,
      "dense_norm": 0.0,
    }

  for idx, score, raw in zip(dense_indices, dense_norm, dense_scores):
    entry = combined.setdefault(idx, {
      "bm25_score": 0.0,
      "dense_score": raw,
      "bm25_norm": 0.0,
      "dense_norm": 0.0,
    })
    entry["dense_score"] = raw
    entry["dense_norm"] = score

  # --- Build results with hybrid + metadata + final scores ---
  retrieved: List[RetrievedChunk] = []
  for idx, scores in combined.items():
    chunk = chunks[idx]
    hybrid_score = (
      (1 - HYBRID_DENSE_WEIGHT) * scores["bm25_norm"]
      + HYBRID_DENSE_WEIGHT * scores["dense_norm"]
    )
    meta_score = _metadata_score(chunk, query)
    final_score = (1 - META_WEIGHT) * hybrid_score + META_WEIGHT * meta_score

    retrieved.append(
      RetrievedChunk(
        chunk=chunk,
        bm25_score=scores["bm25_score"],
        dense_score=scores["dense_score"],
        hybrid_score=hybrid_score,
        meta_score=meta_score,
        final_score=final_score,
      )
    )

  # --- Rank by final score and truncate ---
  retrieved.sort(key=lambda item: item.final_score, reverse=True)
  return retrieved[:top_chunks]

# Prompt Assembly

In [20]:
# [Prompt Assembly]
# ---------------------------------------------------------------------
# Formats the system role and builds a grounded prompt for ask_model().
# - Includes per-chunk metadata (bank, quarter/year, type, sentiment,
#   number of risk flags) plus a score for transparency.
# ---------------------------------------------------------------------

SYSTEM_ROLE = (
  "You are a cautious assistant for Bank of England PRA supervisors. "
  "Use only the context provided. Provide concise bullet points with dates and figures. "
  "Cite each fact using [chunk_id]. If no evidence exists, reply exactly 'Not found in context.'"
)

def build_prompt(query: str, retrieved: List[RetrievedChunk]) -> str:
  """Format a grounded user prompt with citations and metadata.

  Args:
    query (str): The user's question.
    retrieved (List[RetrievedChunk]): Ranked chunks with scores/metadata.

  Returns:
    str: The final prompt string containing the query and a context block.
  """
  if not retrieved:
    return f"Query: {query}\n\nContext: <no evidence>\n\nAnswer:"

  context_lines = []
  for item in retrieved:
    chunk = item.chunk
    doc_meta = doc_lookup.get(chunk.doc_id, {})
    meta = chunk.metadata or {}
    sent_label = (meta.get("sentiment") or {}).get("label", "?")
    risk_ct = len(meta.get("risk_flags") or [])
    doc_year = doc_meta.get("year")
    doc_quarter = doc_meta.get("quarter")
    quarter_label = f"Q{doc_quarter}" if doc_quarter is not None else "?"
    year_label = doc_year if doc_year is not None else "?"
    header = (
      f"[{chunk.chunk_id}] speaker={chunk.speaker or 'unknown'} "
      f"| bank={doc_meta.get('bank', '?')} "
      f"| q={quarter_label}/{year_label} "
      f"| type={meta.get('type','?')} "
      f"| sent={sent_label} "
      f"| risk_flags={risk_ct} "
      f"| score={getattr(item,'final_score', item.hybrid_score):.3f}"
    )
    context_lines.append(f"{header}\n{chunk.text}")

  context_block = "\n\n".join(context_lines)
  return (
    f"Query: {query}\n\n"
    f"Context:\n{context_block}\n\n"
    "Answer using bullet points with citations, or state 'Not found in context.'"
  )

def prepare_question(query: str, top_chunks: int = MAX_CONTEXT_CHUNKS):
  """Retrieve evidence and assemble the grounded prompt for ask_model().

  Args:
    query (str): The user's question.
    top_chunks (int): Number of chunks to include from retrieval.

  Returns:
    tuple[str, str, List[RetrievedChunk]]: (system_role, prompt, retrieved)
  """
  retrieved = hybrid_retrieve(query, top_chunks=top_chunks)
  prompt = build_prompt(query, retrieved)
  return SYSTEM_ROLE, prompt, retrieved

# Chat Interface

In [22]:
# [Chatbot Widget UI]
# ---------------------------------------------------------------------
# Minimal interactive chat UI for querying the PRA assistant.
# - Scrollable Markdown history panel for conversation.
# - Text input + Send button to submit queries.
# - Uses prepare_question() to build grounded context and ask_model() to answer.
# ---------------------------------------------------------------------

# Configuration (UI + generation)
MAX_NEW_TOKENS = 800
MAX_CONTEXT_CHUNKS = 8

# UI elements
history_md = widgets.Output(layout={"border": "1px solid #ccc", "padding": "8px"})
input_box = widgets.Text(
  value="",
  placeholder="Ask about PRA risk signals…",
  description="You:",
  layout=widgets.Layout(width="100%"),
)
send_button = widgets.Button(description="Send", button_style="primary", icon="paper-plane")

def _pretty_citation(item):
  chunk = item.chunk
  m = re.search(r"item(\d+).*sub(\d+)", chunk.chunk_id)
  para = (int(m.group(1)) + 1) if m else ((getattr(chunk, 'block_index', None) or 0) + 1)
  part = (int(m.group(2)) + 1) if m else 1
  doc_meta = doc_lookup.get(getattr(chunk, 'doc_id', ''), {})
  q = doc_meta.get('quarter', '?')
  y = doc_meta.get('year', '?')
  score = getattr(item, 'final_score', getattr(item, 'hybrid_score', 0.0))
  return f"Paragraph {para} • Part {part} (Q{q} {y}) — score={score:.3f}"

def _escape_markdown(text: str) -> str:
    return text.replace("_", r"\_").replace("*", r"\*")

def format_response(answer_text: str, retrieved):
  """Render assistant reply with inline citations.

  Args:
    answer_text (str): Model's raw text answer.
    retrieved (list[RetrievedChunk]): Evidence items used to answer.

  Returns:
    str: Markdown-formatted response with citation list.
  """
  if answer_text.strip().lower() == "not found in context.":
    return f"**Assistant:** {answer_text}"

  citations = "\n".join(
    f"- {_pretty_citation(item)}"
    for item in retrieved
  )

  escaped = _escape_markdown(answer_text)
  if escaped.strip().lower() == "not found in context.":
    return f"**Assistant:** {escaped}"

  return f"**Assistant:**\n\n{escaped}\n\n**Citations:**\n{citations}"

def handle_query(_=None):
  """Submit the user's query, retrieve context, invoke model, and display the result.

  Steps:
    1) Read and clear input.
    2) Append user query to history (Markdown).
    3) Build grounded prompt via prepare_question(...).
    4) Query LLM with ask_model(...).
    5) Display assistant response + citations.
  """
  query = input_box.value.strip()
  if not query:
    return

  # Clear immediately for responsive UX
  input_box.value = ""

  # Show user message
  with history_md:
    display(Markdown(f"**You:** {query}"))

  # Build prompt and query the model
  system_role, user_task, retrieved = prepare_question(query, top_chunks=MAX_CONTEXT_CHUNKS)
  answer = ask_model(model, tok, system_role, user_task, MAX_NEW_TOKENS)

  # Show model reply with citations
  response_md = format_response(answer, retrieved)
  with history_md:
    display(Markdown(response_md))
    display(Markdown("---"))

# Wire up interactions
send_button.on_click(handle_query)
input_box.on_submit(handle_query)

# Compose and render the UI
chat_ui = widgets.VBox([history_md, widgets.HBox([input_box, send_button])])
display(chat_ui)

print("Chatbot ready. Type a question and press Enter or Send.")

VBox(children=(Output(layout=Layout(border='1px solid #ccc', padding='8px')), HBox(children=(Text(value='', de…

Chatbot ready. Type a question and press Enter or Send.
