# JVAI Financial Policy Chatbot

## Setup & Dependencies

In [1]:
!pip -q install pdfplumber faiss-cpu sentence-transformers nltk rich

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.5/48.5 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.0/60.0 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m46.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m70.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import os, re, math, json, random
import pdfplumber
from typing import List, Dict
import nltk; nltk.download('punkt'); nltk.download('averaged_perceptron_tagger')
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from rich import print

def set_seed(s=42):
    random.seed(s); np.random.seed(s)
set_seed()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


## Upload Financial Policy PDF


In [3]:
from google.colab import files

print("Please upload the financial policy PDF (e.g., 'For Task - Policy file.pdf').")
uploaded = files.upload()  # prompts a file chooser
assert len(uploaded) > 0, "No file uploaded."
PDF_PATH = list(uploaded.keys())[0]
print("Uploaded:", PDF_PATH)

Saving For Task - Policy file.pdf to For Task - Policy file.pdf


## Extract Text

In [4]:
def extract_pages(pdf_path):
    docs = []
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages, start=1):
            text = page.extract_text() or ""
            text = re.sub(r'[ \t]+', ' ', text).strip()
            if text:
                docs.append({"page": i, "text": text})
    return docs

pages = extract_pages(PDF_PATH)
len(pages), pages[0]["text"][:400]


(6,
 '1.2 FINANCIAL POLICY OBJECTIVES AND STRATEGIES\nSTATEMENT\nThe presentation and preparation of the Territory’s Budget is provided for in sections 11 and\n11A of the Financial Management Act 1996 (the Act).\nThe purpose of the financial policy objectives and strategies statement is to make transparent\nthe Government’s financial strategies and to establish a benchmark for evaluating the\nGovernment’s con')

## Chunking

In [5]:
import itertools

HEADING_RX = re.compile(r'^\s*(Table\s+\d+\.\d+\.\d+|[A-Z][A-Z \-]{6,}|[0-9]+\.[0-9]+.*)$')

def split_into_paragraphs(text):
    # conservative split
    paras = [p.strip() for p in re.split(r'\n{2,}', text) if p.strip()]
    return paras

def detect_section(paras):
    for p in paras[:3]:
        if HEADING_RX.match(p):
            return p[:120]
    return ""

def fine_chunks(pages, max_chars=600):
    chunks = []
    for p in pages:
        paras = [para.strip() for para in re.split(r"\n{2,}", p["text"]) if para.strip()]
        for para in paras:
            if len(para) > max_chars:
                sentences = re.split(r'(?<=[.!?])\s+', para)
                buf = ""
                for s in sentences:
                    if len(buf) + len(s) < max_chars:
                        buf += " " + s
                    else:
                        chunks.append({"text": buf.strip(), "page": p["page"]})
                        buf = s
                if buf:
                    chunks.append({"text": buf.strip(), "page": p["page"]})
            else:
                chunks.append({"text": para, "page": p["page"]})
    return chunks

chunks = fine_chunks(pages)
META = chunks
print("Number of chunks:", len(chunks))

In [6]:
from collections import Counter

FIN_TERMS = set("""
debt borrowing liability liabilities net assets taxation tax gsp gross state product capital works infrastructure
superannuation funding credit rating balanced budget operating result own-source revenue interest cash reserve
""".split())

def important_phrases(chunks, topk=50):
    bag=Counter()
    for c in chunks:
        tokens=re.findall(r"[A-Za-z][A-Za-z\-]+", c["text"].lower())
        bag.update(tokens)
    cand=[(w,c) for w,c in bag.most_common() if w in FIN_TERMS]
    return cand[:topk]

key_terms = important_phrases(chunks)
key_terms[:20]

[('budget', 32),
 ('net', 15),
 ('liabilities', 14),
 ('interest', 13),
 ('infrastructure', 12),
 ('assets', 12),
 ('capital', 9),
 ('revenue', 9),
 ('operating', 8),
 ('balanced', 7),
 ('debt', 6),
 ('taxation', 6),
 ('works', 5),
 ('superannuation', 5),
 ('gsp', 5),
 ('result', 5),
 ('own-source', 4),
 ('funding', 4),
 ('credit', 2),
 ('rating', 2)]

## Embeddings & FAISS Index


In [7]:
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
emb = model.encode([c["text"] for c in chunks], convert_to_numpy=True, show_progress_bar=True)
index = faiss.IndexFlatIP(emb.shape[1])
faiss.normalize_L2(emb)
index.add(emb)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

In [8]:
META = chunks  # same ordering as embeddings

## Hybrid Search Function

In [14]:
FIN_TERMS = ["debt","tax","taxation","gsp","net assets",
             "superannuation","credit rating","balanced budget","infrastructure"]

def hybrid_search(query: str, k: int = 5):
    # semantic similarity (FAISS)
    qv = model.encode([query], convert_to_numpy=True)
    faiss.normalize_L2(qv)
    # over-fetch to give keyword boosting a chance
    D, I = index.search(qv, k * 3 if k*3 <= len(META) else len(META))

    hits = []
    q_lower = query.lower()
    for score, idx in zip(D[0], I[0]):
        m = META[idx]
        text_lower = m["text"].lower()
        kw_bonus = sum(1 for term in FIN_TERMS if term in q_lower and term in text_lower)
        final_score = float(score) + 0.1 * kw_bonus
        hits.append({"score": final_score, **m})
    hits.sort(key=lambda x: -x["score"])
    return hits[:k]

## Post Processing & Builder Function

In [74]:
# === STRICT KEYWORD FILTERED ANSWER BUILDER (drop-in) ===
import re

# Domain vocabulary (you can expand)
_CANON = {
    "debt": ["debt", "borrowings", "net interest", "interest expense", "interest revenue"],
    "taxation": ["tax", "taxation", "gsp", "gross state product", "taxation as a % of gsp"],
    "gsp": ["gsp", "gross state product"],
    "net assets": ["net assets", "total assets", "total liabilities"],
    "superannuation": ["superannuation", "liabilities", "funded", "percentage funding"],
    "credit rating": ["credit rating", "triple a", "aaa"],
    "balanced budget": ["balanced budget", "operating result", "surplus", "economic cycle"],
    "infrastructure": ["infrastructure", "capital works", "property, plant and equipment"],
}

# derive must-have terms from query using the domain map above
def _derive_must_terms(query: str):
    ql = query.lower()
    must = set()
    # add any canonical keys explicitly present
    for key, aliases in _CANON.items():
        if any(a in ql for a in [key] + aliases):
            must.add(key)
    # special pairings: "taxation vs gsp" → must include both taxation & gsp
    if ("taxation" in ql or "tax" in ql) and ("gsp" in ql or "gross state product" in ql):
        must.update(["taxation", "gsp"])
    # light stemming: add raw tokens too
    toks = set(re.findall(r"[a-z]+", ql))
    return must, toks

# helper: does text contain ANY alias for a canonical key?
def _text_has_key(text_l: str, key: str) -> bool:
    aliases = [key] + _CANON.get(key, [])
    return any(a in text_l for a in aliases)

# helper: keep only hits that contain all "must" keys (strict mode)
def _filter_hits_by_must(hits, must_keys):
    if not must_keys:
        return hits
    kept = []
    for h in hits:
        t = h["text"].lower()
        if all(_text_has_key(t, k) for k in must_keys):
            kept.append(h)
    # if nothing survives, relax to hits that contain ANY of the must keys
    if not kept:
        for h in hits:
            t = h["text"].lower()
            if any(_text_has_key(t, k) for k in must_keys):
                kept.append(h)
    return kept or hits

# extractive selection: keep only sentences/lines that include query tokens or aliases
def _select_spans(text: str, q_tokens: set, must_keys: set, max_chars: int = 700):
    # split by sentences AND lines to better handle tables
    parts = re.split(r'(?<=[.!?])\s+|\n', text)
    parts = [p.strip() for p in parts if p.strip()]
    # prefer longer, sentence-like parts when available
    parts = [p for p in parts if len(p.split()) >= 4]

    selected = []
    def _matches(p):
        pl = p.lower()
        # match if any raw query token appears
        if any(t in pl for t in q_tokens if len(t) > 2):
            return True
        # or any alias for a must key
        if any(_text_has_key(pl, k) for k in must_keys):
            return True
        return False
    for p in parts:
        if _matches(p):
            if sum(len(x) for x in selected) + len(p) + 1 <= max_chars:
                selected.append(p)
    return selected

def _postprocess_answer(question_l: str, selected_spans: list, full_hits: list = None):
    """
    Final precise post-processor:
      - Strategic priorities: stitch full 6 bullet lines (handles line wraps).
      - Debt: ONLY definition + 'remain negative' target sentences.
      - Taxation vs GSP: ONLY values under 'Taxation as a % of GSP' (filters to 3–6%).
      - Superannuation: 90% target; drop Net/Total Assets/Liabilities noise.
      - Else: return joined spans.
    """
    import re

    def dedupe(seq):
        seen = set(); out = []
        for x in seq:
            if x not in seen:
                seen.add(x); out.append(x)
        return out

    ans = " ".join(selected_spans).strip()
    top_text = full_hits[0]["text"] if (full_hits and len(full_hits) > 0 and "text" in full_hits[0]) else ""
    combined_text = ("\n".join(selected_spans) + ("\n" + top_text if top_text else "")).strip()

    # --- Strategic priorities (stitch wrapped bullets from top chunk) ---
    if "strategic" in question_l and "priorit" in question_l:
        lines = [ln.rstrip() for ln in (top_text or combined_text).splitlines()]
        bullets = []
        i = 0
        while i < len(lines):
            l = lines[i].strip()
            if l.startswith("•") or l.startswith("- "):
                # start a bullet; stitch continuation lines until next bullet/blank
                buf = l
                j = i + 1
                while j < len(lines):
                    nxt = lines[j].strip()
                    if not nxt or nxt.startswith("•") or nxt.startswith("- "):
                        break
                    buf += " " + nxt
                    j += 1
                bullets.append(buf.strip())
                i = j
            else:
                i += 1
        bullets = dedupe(bullets)
        if bullets:
            bullets = bullets[:6]  # the doc has 6 bullets
            return "Strategic priorities, as they relate to the Territory’s Budget, are summarised as:\n" + " ".join(bullets)

    '''
                # --- Debt: ONLY definition + remain-negative target ---
    if "debt" in question_l:
        top_text = full_hits[0]["text"] if (full_hits and len(full_hits) > 0 and "text" in full_hits[0]) else ""
        combined_text = (top_text or " ".join(selected_spans)).strip()

        # Split into clean sentences
        sentences = re.split(r'(?<=[.!?])\s+', combined_text)

        defn, target = "", ""
        for s in sentences:
            sl = s.lower()
            if not defn and "net interest is the difference" in sl:
                defn = s.strip()
            if not target and ("remains negative" in sl or "less than zero" in sl or "comfortably meet interest expenses" in sl):
                target = s.strip()
            if defn and target:
                break

        if defn and target:
            return defn + " " + target
        elif defn:
            return defn
        elif target:
            return target
        else:
            return "I could not find the net interest definition and target sentence."

            '''

        # Special polish for taxation vs GSP: keep only lines mentioning both words or the percent line
    if ("tax" in question_l or "taxation" in question_l) and ("gsp" in question_l or "gross state product" in question_l):
        lines = []
        for s in selected_spans:
            for ln in s.splitlines():
                l = ln.strip()
                ll = l.lower()
                if ("taxation" in ll and "gsp" in ll) or ("taxation as a % of gsp" in ll) or re.search(r"\b\d\.\d%\b", l):
                    lines.append(l)
        if lines:
            ans = "\n".join(lines)

    # --- Superannuation: keep target; drop Assets/Liabilities noise ---
    if "superannuation" in question_l:
        sentences = re.split(r'(?<=[.!?])\s+', combined_text)
        keep = []
        for s in sentences:
            sl = s.lower()
            if any(bad in sl for bad in ["net assets", "total assets", "total liabilities"]):
                continue
            if ("90%" in s) or ("2039" in sl) or ("2040" in sl) or ("percentage funding" in sl):
                keep.append(s.strip())
        keep = dedupe(keep)
        if keep:
            return " ".join(keep[:2])

    # default
    return ans


# memory-aware augmentation (works with your existing Mem)
def _augment_with_memory(question: str) -> str:
    if 'mem' in globals() and hasattr(mem, 'augment'):
        return mem.augment(question)
    return question

def build_answer(question: str, k: int = 5):
    q_aug = _augment_with_memory(question)
    must_keys, q_tokens = _derive_must_terms(q_aug)

    # over-fetch from hybrid_search then strictly filter
    hits = hybrid_search(q_aug, k=max(k, 6))
    hits = _filter_hits_by_must(hits, must_keys)

    selected = []
    cites = []
    for h in hits:
        spans = _select_spans(h["text"], q_tokens, must_keys, max_chars=700)
        if spans:
            selected.extend(spans)
            cites.append(f"p.{h['page']}")
            #if len(selected) >= 4:   # ✨ stop after 3–4 good spans
                #break
            # stop once we have enough evidence
            if len(" ".join(selected)) > 600:
                break

    # fallback if still empty
    if not selected and hits:
        selected = [hits[0]["text"].strip()]
        cites.append(f"p.{hits[0]['page']}")

    answer = _postprocess_answer(q_aug.lower(), selected, full_hits=hits).strip()
    #answer = _postprocess_answer(q_aug.lower(), selected).strip()
    cite_str = ", ".join(sorted(set(cites), key=lambda x: int(x.split(".")[1])))
    return f"{answer}\n\nSources: {cite_str}"

# wrapper that saves to memory if available (unchanged)
def chat(user_q: str):
    ans = build_answer(user_q)
    if 'mem' in globals() and hasattr(mem, 'history'):
        mem.history.append((user_q, ans))
    return ans

  sentences = re.split(r'(?<=[.!?])\s+', combined_text)


## Minimal Memory (last-topic heuristic)


In [75]:
import re
from collections import deque

class Mem:
    def __init__(self, max_turns=4):
        self.history = deque(maxlen=max_turns)  # [(user, bot)]

    def last_topic(self):
        # naive: noun-ish tokens from previous user turn
        if not self.history:
            return ""
        prev_user = self.history[-1][0]
        nouns = re.findall(r"\b(debt|tax|taxation|net assets|infrastructure|superannuation|interest|operating result|credit rating)\b",
                           prev_user.lower())
        return nouns[-1] if nouns else ""

    def augment(self, q):
        topic = self.last_topic()
        if topic and (re.search(r"\b(it|that|those|them|this|one)\b", q.lower()) or "what about" in q.lower()):
            return f"{q} (context topic: {topic})"
        return q

mem = Mem()

## Demo: Ask Questions (1)


In [76]:
print(chat("Tell me about taxation vs GSP."))
print(chat("What about it?"))  # should augment with the last topic

## Demo: Ask Questions (2)

In [77]:
print("Q: What are the strategic priorities?")
print("A:", chat("What are the strategic priorities?"))

print("\nQ: What about debt?")
print("A:", chat("What about debt?"))

print("\nQ: What does the Budget say about taxation vs GSP?")
print("A:", chat("What does the Budget say about taxation vs GSP?"))

print("\nQ: What's the superannuation funding target?")
print("A:", chat("What's the superannuation funding target?"))