In [2]:
!pip install feedparser
!pip install duckduckgo-search
!pip install ftfy html5lib
!pip install sentence-transformers scikit-learn numpy tqdm
!pip install chromadb
!pip install langchain langgraph google-generativeai

Collecting feedparser
  Downloading feedparser-6.0.12-py3-none-any.whl.metadata (2.7 kB)
Collecting sgmllib3k (from feedparser)
  Downloading sgmllib3k-1.0.0.tar.gz (5.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading feedparser-6.0.12-py3-none-any.whl (81 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.5/81.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: sgmllib3k
  Building wheel for sgmllib3k (setup.py) ... [?25l[?25hdone
  Created wheel for sgmllib3k: filename=sgmllib3k-1.0.0-py3-none-any.whl size=6046 sha256=53ba8478bcb025bd6ca280c7b1a3455b1caef01b6aa33a02ffd7c301f9ffbed7
  Stored in directory: /root/.cache/pip/wheels/03/f5/1a/23761066dac1d0e8e683e5fdb27e12de53209d05a4a37e6246
Successfully built sgmllib3k
Installing collected packages: sgmllib3k, feedparser
Successfully installed feedparser-6.0.12 sgmllib3k-1.0.0
Collecting duckduckgo-search
  Downloading duckduckgo_search-8.1.1-py3-none-any.



In [3]:
import json
import os
import time
from datetime import datetime
from urllib.parse import urlparse

import feedparser

# --- CONFIG ---
RSS_URLS = [
    "https://economictimes.indiatimes.com/markets/rssfeeds/1977021501.cms",
    "https://economictimes.indiatimes.com/industry/banking/finance/banking/rssfeeds/3948143.cms",

    "https://www.business-standard.com/rss/markets-106.rss",
    "https://www.business-standard.com/rss/companies-102.rss",

    "https://www.financialexpress.com/market/feed/",
    "https://www.financialexpress.com/economy/feed/",

    "https://www.livemint.com/rss/news",
    "https://www.livemint.com/rss/money",
]

MAX_ITEMS_TOTAL = 100
OUTPUT_PATH = "data/rss_news.json"
# ---------------


def parse_entry_to_item(entry, source_name):
    """Normalize a feedparser entry to our schema."""
    title = entry.get("title", "").strip()
    # Some feeds use 'summary' others use 'description' or 'content'
    content = entry.get("summary") or entry.get("description") or ""
    # if content is a list or dict in some feeds, coerce
    if isinstance(content, list):
        content = " ".join([c.get("value", "") if isinstance(c, dict) else str(c) for c in content])
    link = entry.get("link", "").strip()
    published = entry.get("published") or entry.get("updated") or ""
    # fallback: try published_parsed
    if not published and entry.get("published_parsed"):
        published = datetime(*entry.published_parsed[:6]).isoformat()
    # normalized id
    uid = None
    if entry.get("id"):
        uid = entry["id"]
    else:
        # create simple deterministic id from link or title
        key = link or title
        uid = f"rss::{abs(hash(key))}"

    item = {
        "id": uid,
        "title": title,
        "content": content,
        "source": source_name,
        "link": link,
        "published": published,
    }
    return item


def fetch_rss_feed(url):
    """Fetch a single RSS feed and return normalized items."""
    # derive source name from domain for simplicity
    parsed = urlparse(url)
    domain = parsed.netloc or url
    feed = feedparser.parse(url)
    entries = feed.entries or []
    items = []
    for e in entries:
        try:
            item = parse_entry_to_item(e, domain)
            # skip empty titles
            if not item["title"]:
                continue
            items.append(item)
        except Exception as ex:
            # don't fail the whole run for one bad entry
            print(f"[WARN] Failed to parse entry from {domain}: {ex}")
    return items


def collect_rss(urls, max_total=12, sleep_between=0.5):
    """Collect items across feeds, deduplicate by (title, link), limit total."""
    seen = set()
    collected = []
    for url in urls:
        print(f"[INFO] Fetching: {url}")
        items = fetch_rss_feed(url)
        # optional: sleep a bit to be polite
        time.sleep(sleep_between)
        for it in items:
            dedup_key = (it.get("title", "").lower().strip(), it.get("link", "").strip())
            if dedup_key in seen:
                continue
            seen.add(dedup_key)
            collected.append(it)
            if len(collected) >= max_total:
                print(f"[INFO] Reached limit of {max_total} items.")
                return collected
    print(f"[INFO] Collected {len(collected)} RSS items from {len(urls)} feeds.")
    return collected


def save_json(items, out_path="data/rss_news.json"):
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    with open(out_path, "w", encoding="utf8") as f:
        json.dump(items, f, ensure_ascii=False, indent=2)
    print(f"[INFO] Saved {len(items)} items to {out_path}")


if __name__ == "__main__":
    items = collect_rss(RSS_URLS, max_total=MAX_ITEMS_TOTAL)
    # basic normalization: ensure content field exists
    for it in items:
        if not it.get("content"):
            it["content"] = it.get("title", "")
    save_json(items, OUTPUT_PATH)


[INFO] Fetching: https://economictimes.indiatimes.com/markets/rssfeeds/1977021501.cms
[INFO] Fetching: https://economictimes.indiatimes.com/industry/banking/finance/banking/rssfeeds/3948143.cms
[INFO] Fetching: https://www.business-standard.com/rss/markets-106.rss
[INFO] Fetching: https://www.business-standard.com/rss/companies-102.rss
[INFO] Fetching: https://www.financialexpress.com/market/feed/
[INFO] Fetching: https://www.financialexpress.com/economy/feed/
[INFO] Fetching: https://www.livemint.com/rss/news
[INFO] Fetching: https://www.livemint.com/rss/money
[INFO] Reached limit of 100 items.
[INFO] Saved 100 items to data/rss_news.json


In [4]:
# ddg_fetcher.py
import json
import os
from duckduckgo_search import DDGS
from datetime import datetime

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)




QUERY = "Indian stock market news OR RBI updates OR HDFC Bank news"
MAX_RESULTS = 100
OUTPUT_PATH = "data/ddg_news.json"


def ddg_to_item(entry):
    return {
        "id": entry.get("url"),
        "title": entry.get("title"),
        "content": entry.get("body") or "",
        "source": entry.get("source") or "duckduckgo",
        "link": entry.get("url"),
        "published": entry.get("date") or "",
    }


def fetch_ddg_news(query, max_results=100):
    ddg = DDGS()
    results = ddg.news(query, max_results=max_results)
    items = []

    for r in results:
        try:
            item = ddg_to_item(r)
            if item["title"]:
                items.append(item)
        except Exception as e:
            print("[WARN] Error: ", e)
            continue

    return items


items = fetch_ddg_news(QUERY, MAX_RESULTS)

os.makedirs("data", exist_ok=True)
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(items, f, indent=2, ensure_ascii=False)

print(f"Saved {len(items)} DDG news items → {OUTPUT_PATH}")


  ddg = DDGS()
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Saved 36 DDG news items → data/ddg_news.json


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


In [5]:
import pandas as pd
import json
import os
from datetime import datetime
from datetime import datetime, timezone

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)


### Paths
NSE_ANN = "data/CF-AN-equities-HDFCBANK.csv"
NSE_PRESS = "data/press-release.csv"
RSS_PATH = "data/rss_news.json"
DDG_PATH = "data/ddg_news.json"
OUT_PATH = "data/all_news.json"

os.makedirs("data", exist_ok=True)

# ----------------------------------------------------
# Helper to convert date formats safely
# ----------------------------------------------------
def safe_date(dt):
    if not dt:
        return ""
    try:
        return str(pd.to_datetime(dt))
    except:
        return ""

# ----------------------------------------------------
# 1. Load NSE Equity Announcements
# ----------------------------------------------------
df_ann = pd.read_csv(NSE_ANN)

ann_items = []
for _, row in df_ann.iterrows():
    item = {
        "id": row.get("ATTACHMENT", "") or f"nse_announce::{_}",
        "title": str(row.get("SUBJECT", "")),
        "content": str(row.get("DETAILS", "")),
        "source": "nse_announce",
        "date": safe_date(row.get("BROADCAST DATE/TIME")),
        "link": str(row.get("ATTACHMENT", "")),
    }
    ann_items.append(item)

# ----------------------------------------------------
# 2. Load NSE Press Releases
# ----------------------------------------------------
df_press = pd.read_csv(NSE_PRESS)

press_items = []
for _, row in df_press.iterrows():
    item = {
        "id": row.get("DOWNLOAD", "") or f"nse_press::{_}",
        "title": str(row.get("SUBJECT", "")),
        "content": f"{row.get('CATEGORY', '')} — {row.get('SUBJECT', '')}",
        "source": "nse_press",
        "date": safe_date(row.get("DATE")),
        "link": str(row.get("DOWNLOAD", "")),
    }
    press_items.append(item)

# ----------------------------------------------------
# 3. Load RSS
# ----------------------------------------------------
with open(RSS_PATH, "r") as f:
    rss_items_raw = json.load(f)

rss_items = []
for r in rss_items_raw:
    rss_items.append({
        "id": r.get("id"),
        "title": r.get("title"),
        "content": r.get("content"),
        "source": "rss",
        "date": safe_date(r.get("published")),
        "link": r.get("link"),
    })

# ----------------------------------------------------
# 4. Load DuckDuckGo News
# ----------------------------------------------------
with open(DDG_PATH, "r") as f:
    ddg_items_raw = json.load(f)

ddg_items = []
for r in ddg_items_raw:
    ddg_items.append({
        "id": r.get("id"),
        "title": r.get("title"),
        "content": r.get("content"),
        "source": "ddg",
        "date": safe_date(r.get("published")),
        "link": r.get("link"),
    })

# ----------------------------------------------------
# MERGE ALL
# ----------------------------------------------------
all_items = ann_items + press_items + rss_items + ddg_items

with open(OUT_PATH, "w", encoding="utf8") as f:
    json.dump(all_items, f, indent=2, ensure_ascii=False)

print("Merged dataset saved to:", OUT_PATH)
print("Total records:", len(all_items))


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Merged dataset saved to: data/all_news.json
Total records: 7773


# **Data audit : understand your collection**



In [6]:
import json
from collections import Counter
from datetime import datetime

import warnings
warnings.filterwarnings(
    "ignore",
    category=DeprecationWarning,
    message="datetime.datetime.utcnow() is deprecated"
)

DATA_PATH = "data/all_news.json"
with open(DATA_PATH, "r", encoding="utf8") as f:
    items = json.load(f)

print("Total items:", len(items))
# sample 5
for i in items[:5]:
    print(i["source"], "-", i["date"], "-", (i["title"][:80] if i["title"] else "<no title>"))

# source distribution
src_counts = Counter([it.get("source","") for it in items])
print("Sources:", src_counts)

# empty content count
empty_cnt = sum(1 for it in items if not it.get("content"))
print("Empty content:", empty_cnt)

# short content (under 20 chars)
short_cnt = sum(1 for it in items if isinstance(it.get("content",""), str) and len(it.get("content",""))<20)
print("Short content (<20 chars):", short_cnt)

Total items: 7773
nse_announce - 2025-11-28 20:49:38 - General Updates
nse_announce - 2025-11-28 20:44:12 - News Verification
nse_announce - 2025-11-28 18:55:36 - Appointment
nse_announce - 2025-11-28 18:13:23 - News Verification
nse_announce - 2025-11-27 14:24:12 - ESOP/ESOS/ESPS
Sources: Counter({'nse_press': 5418, 'nse_announce': 2219, 'rss': 100, 'ddg': 36})
Empty content: 0
Short content (<20 chars): 1


# **Basic cleaning (trim, remove HTML, normalize whitespace)**

In [7]:
import re
import ftfy
from html import unescape

def clean_text(s):
    if not s:
        return ""
    s = str(s)
    s = ftfy.fix_text(s)
    s = unescape(s)
    # remove html tags
    s = re.sub(r"<[^>]+>", " ", s)
    # replace multiple whitespace/newlines
    s = re.sub(r"\s+", " ", s).strip()
    return s

# apply to dataset and save a preview
for it in items:
    it["title"] = clean_text(it.get("title",""))
    it["content"] = clean_text(it.get("content",""))

# **Fix IDs & dates : unique ids, ISO dates**

In [8]:
import uuid
import pandas as pd

def ensure_id(it):
    if it.get("id") and it["id"]!="-":
        return it["id"]
    # else create one
    return f"{it.get('source','unk')}::{uuid.uuid4().hex}"

def ensure_date(it):
    d = it.get("date") or it.get("published") or ""
    if not d:
        return ""
    try:
        return str(pd.to_datetime(d, utc=True))
    except:
        return ""

for it in items:
    it["id"] = ensure_id(it)
    it["date"] = ensure_date(it)

# **Remove empty / too-short content & drop bad rows**

In [9]:
cleaned = []
for it in items:
    content = (it.get("content") or "").strip()
    title = (it.get("title") or "").strip()
    if not content or len(content) < 30:
        # optionally skip if title is descriptive enough:
        if not title or len(title) < 20:
            continue
    cleaned.append(it)

print("Before:", len(items), "After:", len(cleaned))
items = cleaned

Before: 7773 After: 7770


# **Normalize source labels & optional mapping**

In [10]:
def normalize_source(s):
    if not s: return "unknown"
    s = s.lower()
    if "nse" in s and "press" in s: return "nse_press"
    if "nse" in s and "announce" in s: return "nse_announce"
    if "economictimes" in s or "rss" in s: return "rss"
    if "duckduckgo" in s or s=="ddg": return "ddg"
    return s.replace("https://","").split("/")[0]

for it in items:
    it["source"] = normalize_source(it.get("source",""))


# **Chunking : split long docs into retrieval-friendly pieces**

In [11]:
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm

model = SentenceTransformer("all-MiniLM-L6-v2")

# create a representative text for embedding (title + content)
texts = [(it["title"] + ". " + it["content"])[:2000] for it in items]  # cut long
embs = model.encode(texts, show_progress_bar=True, batch_size=64, convert_to_numpy=True)

# greedy dedup: sort by length or date, keep first, drop near neighbors
threshold = 0.90
keep = []
removed = set()
for i in tqdm(range(len(embs))):
    if i in removed:
        continue
    keep.append(i)
    sims = cosine_similarity(embs[i:i+1], embs)[0]
    # mark as removed those with sim >= threshold (except itself)
    to_rm = [j for j,s in enumerate(sims) if s>=threshold and j!=i]
    for j in to_rm:
        removed.add(j)

items_dedup = [items[i] for i in keep]
print("Before dedup:", len(items), "After dedup:", len(items_dedup))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]



tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/122 [00:00<?, ?it/s]

100%|██████████| 7770/7770 [01:14<00:00, 104.81it/s]

Before dedup: 7770 After dedup: 3606





In [12]:
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')

from nltk.tokenize import sent_tokenize

def chunk_text(text, max_sent=6):
    sents = sent_tokenize(text)
    chunks = []
    cur = []
    for sent in sents:
        cur.append(sent)
        if len(cur) >= max_sent:
            chunks.append(" ".join(cur))
            cur=[]
    if cur:
        chunks.append(" ".join(cur))
    return chunks

# produce chunked items with metadata
chunked = []
for it in items_dedup:
    chunks = chunk_text(it["title"] + ". " + it["content"], max_sent=6)
    for idx, ch in enumerate(chunks):
        chunked.append({
            "chunk_id": f"{it['id']}::c{idx}",
            "text": ch,
            "meta": {
                "source": it["source"],
                "orig_id": it["id"],
                "title": it["title"],
                "date": it["date"],
                "link": it.get("link","")
            }
        })

print("Total chunks:", len(chunked))

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


Total chunks: 3627


# **Embedding + Store into ChromaDB**

In [13]:
import chromadb
from chromadb.config import Settings

# Chroma client
client = chromadb.PersistentClient(
    path="chroma_db",
    settings=Settings()
)

collection = client.get_or_create_collection("financial_news")

# ---------------------------------------
# 1. Build lists directly from chunked
# ---------------------------------------

texts = [c["text"] for c in chunked]
metas = [c["meta"] for c in chunked]
ids = [c["chunk_id"] for c in chunked]

# Compute embeddings
embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True)
emb_list = embeddings.tolist()

# ---------------------------------------
# 2. Deduplicate before inserting
# ---------------------------------------

unique_chunks = {}

for chunk_id, text, meta, emb in zip(ids, texts, metas, emb_list):
    if chunk_id not in unique_chunks:
        unique_chunks[chunk_id] = {"text": text, "meta": meta, "embedding": emb}

# Final lists
ids = list(unique_chunks.keys())
texts = [v["text"] for v in unique_chunks.values()]
metas = [v["meta"] for v in unique_chunks.values()]
emb_list = [v["embedding"] for v in unique_chunks.values()]

# ---------------------------------------
# 3. Upsert into Chroma
# ---------------------------------------

collection.upsert(
    ids=ids,
    documents=texts,
    metadatas=metas,
    embeddings=emb_list
)

print("Stored", len(ids), "unique chunks in Chroma")


Batches:   0%|          | 0/114 [00:00<?, ?it/s]

Stored 3614 unique chunks in Chroma


# **Save final artifacts + small tests**

In [14]:
import json, os
os.makedirs("data", exist_ok=True)
with open("data/all_news_cleaned.json","w",encoding="utf8") as f:
    json.dump(items_dedup, f, ensure_ascii=False, indent=2)

with open("data/chunks.json","w",encoding="utf8") as f:
    json.dump(chunked, f, ensure_ascii=False, indent=2)

# **LLM Configuration**

In [15]:
import google.generativeai as genai
genai.configure(api_key="*********")
model = genai.GenerativeModel("gemini-2.5-flash-lite")

# **ROUTER AGENT**

In [16]:
# ROUTER AGENT USING GEMINI FLASH LITE

from langchain_core.messages import HumanMessage, SystemMessage
from google.generativeai import GenerativeModel

router_model = model

def route_query(query):

    system_prompt = """
    You are a routing agent for a financial news AI system.

    Your job is to analyze the user query and classify it into ONE
    of these categories:

    1. nse_announce  → corporate actions, penalties, ESOP, board meetings
    2. nse_press     → press releases, surveillance announcements, indices
    3. rss_news      → general market news, Indian economy, global markets
    4. duckduckgo    → broad internet search if query is general
    5. unknown       → if none match

    Respond ONLY with the category name.
    """

    response = router_model.generate_content(
        [system_prompt, query]
    )

    return response.text.strip().lower()


# **Loading Data into Chroma DB**

In [17]:
import json
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer

# Load embeddings model (why this one? fast + good quality)
embedder = SentenceTransformer("all-MiniLM-L6-v2")

chroma_client = chromadb.PersistentClient(path="vectorstore")
collection = chroma_client.get_or_create_collection(
    name="news_rag",
    metadata={"hnsw:space": "cosine"}
)
print("Chroma DB ready.")

Chroma DB ready.


In [18]:
import uuid

with open("data/all_news_cleaned.json", "r") as f:
    data = json.load(f)

for row in data:

    # for now use 'content' field as full text
    if "content" not in row or not row["content"]:
        continue

    text = row["content"]
    source = row.get("source", "unknown")
    original_id = row.get("id", "")

    embedding = embedder.encode(text).tolist()

    collection.add(
        ids=[str(uuid.uuid4())],
        embeddings=[embedding],
        metadatas=[{
            "source": source,
            "original_id": original_id
        }],
        documents=[text]
    )

print("Inserted into Chroma:", len(data))


Inserted into Chroma: 3606


# **semantic search**

In [19]:
def semantic_search(query, dataset, top_k=5):
    """
    dataset → 'nse_announce', 'nse_press', 'rss_news', 'duckduckgo'
    """

    # Step 1: Encode query
    q_emb = embedder.encode(query).tolist()

    # Step 2: Query Chroma with metadata filter
    results = collection.query(
        query_embeddings=[q_emb],
        n_results=top_k,
        where={"source": dataset}
    )

    hits = []

    for i in range(len(results["documents"][0])):
        hits.append({
            "text": results["documents"][0][i],
            "score": results["distances"][0][i],
            "metadata": results["metadatas"][0][i],
        })

    return hits

**Testing Route**

In [20]:
router_output = route_query("Why did RBI penalize HDFC Bank?")
hits = semantic_search("Why did RBI penalize HDFC Bank?", router_output)

for h in hits:
    print(h["metadata"]["source"], " | ", h["metadata"]["original_id"])
    print(h["text"][:200], "...\n")


nse_announce  |  https://nsearchives.nseindia.com/corporate/HDFCBANK_28112025204119_RBI_Penalty_-_November_28_2025.pdf
The Exchange has sought clarification from HDFC Bank Limited with respect to recent news item captioned RBI imposes monetary penalty on HDFC Bank for lapses in KYC, interest rate and outsourcing compl ...

nse_announce  |  https://nsearchives.nseindia.com/corporate/HDFCBANK_29102018181933_SEIntimationMDReapp_348.pdf
HDFC Bank Limited has informed the Exchange regarding 'Pursuant to Regulation 30 of the SEBI Listing Regulations, we wish to inform you that Reserve Bank of India, vide its letter dated 22nd October,  ...

nse_announce  |  https://nsearchives.nseindia.com/corporate/SEIntimation_22092016122717_125.zip
HDFC Bank Limited has informed the Exchange that the Bank has issued and allotted on September 21, 2016 on a private placement basis Senior, Unsecured, Redeemable, Long Term, Non-Convertible Bonds in  ...

nse_announce  |  nse_announce::39146054538b4fc19166a507

In [21]:
def format_chunks(chunks):
    out = []
    for r in chunks:
        out.append(
            f"Source: {r['metadata']['source']}\n"
            f"ID: {r['metadata']['original_id']}\n"
            f"Text: {r['text']}\n"
            "--------------------"
        )
    return "\n".join(out)


# **Generate Answer using Route Agent**

In [22]:
def generate_answer(query, retrieved_chunks):

    context_text = format_chunks(retrieved_chunks)

    system_prompt = """
You are a financial news RAG assistant.
Use ONLY the context provided.
If the answer is not found, reply:
"Information not available in the provided data."
No outside knowledge. No assumptions.
"""

    final_prompt = f"""
{system_prompt}

### CONTEXT:
{context_text}

### QUERY:
{query}

### INSTRUCTIONS:
Give a factual answer. Do not hallucinate.
"""

    response = model.generate_content(final_prompt)

    return response.text


In [23]:
def ask_system(query, top_k=5):

    dataset = route_query(query)
    chunks = semantic_search(query, dataset)
    answer = generate_answer(query, chunks)

    return {
        "router": dataset,
        "chunks_used": len(chunks),
        "answer": answer
    }

**Testing Route Agent**

In [24]:
res = ask_system("Why did RBI penalize HDFC Bank?")
print(res["router"])
print(res["answer"])

nse_announce
RBI imposes monetary penalty on HDFC Bank for lapses in KYC, interest rate and outsourcing compliance.


# **Defining Multiple Agents :**


*   Ingestion Agent
*   Duplication Agent


*   Entity Agent
*   Impact Agent

*   Storage Agent
*   Query Agent












**Ingestion Agent**

In [25]:
%%writefile ingest_agent.py
import json
from typing import List, Dict, Any


def load_cleaned_news(path: str = "data/all_news_cleaned.json") -> List[Dict[str, Any]]:
    """
    Load cleaned dataset.
    Supports BOTH:
    - JSONL (each line = JSON object)
    - JSON array ([ {...}, {...} ])
    """

    print(f"[Ingestion Agent] Loading cleaned dataset from {path} ...")

    with open(path, "r") as f:

        first_char = f.read(1)
        f.seek(0)

        # Case 1: JSON Array  → starts with '['
        if first_char == "[":
            print("[Ingestion Agent] Detected JSON ARRAY format")
            data = json.load(f)

        # Case 2: JSONL → each line is an object
        else:
            print("[Ingestion Agent] Detected JSONL (line-delimited) format")
            data = []
            for line in f:
                line = line.strip()
                if not line:
                    continue
                data.append(json.loads(line))

    print(f"[Ingestion Agent] Loaded {len(data)} cleaned items")
    return data


Writing ingest_agent.py


In [26]:
from ingest_agent import load_cleaned_news

news = load_cleaned_news()
print(len(news))
print(news[0])


[Ingestion Agent] Loading cleaned dataset from data/all_news_cleaned.json ...
[Ingestion Agent] Detected JSON ARRAY format
[Ingestion Agent] Loaded 3606 cleaned items
3606
{'id': 'https://nsearchives.nseindia.com/corporate/HDFCBANK_28112025204843_RBI_Penalty_-_November_28_2025.pdf', 'title': 'General Updates', 'content': 'HDFC Bank Limited has informed the Exchange about Intimation under Regulation 30 of SEBI (Listing Obligations and Disclosure Requirements) Regulations, 2015 ( SEBI Listing Regulations )', 'source': 'nse_announce', 'date': '2025-11-28 20:49:38+00:00', 'link': 'https://nsearchives.nseindia.com/corporate/HDFCBANK_28112025204843_RBI_Penalty_-_November_28_2025.pdf'}


**Duplication Agent**

In [27]:
%%writefile dedupe_agent.py
import uuid
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

model = SentenceTransformer("all-MiniLM-L6-v2")

def dedupe_cluster(stories, threshold=0.82):
    """
    Performs semantic deduplication:
      - clusters similar stories
      - assigns NEW story_id
      - merges fields
    """

    print("[Dedupe Agent] Starting semantic clustering...")

    texts = [s["content"] for s in stories]
    embeddings = model.encode(texts)
    sim_matrix = cosine_similarity(embeddings)

    n = len(stories)
    used = set()
    clusters = []

    for i in range(n):
        if i in used:
            continue

        cluster = [i]
        used.add(i)

        for j in range(i+1, n):
            if j not in used and sim_matrix[i][j] > threshold:
                cluster.append(j)
                used.add(j)

        clusters.append(cluster)

    print(f"[Dedupe Agent] Formed {len(clusters)} unique story clusters.")

    # --------------------
    # Merge each cluster into a single unified story
    # --------------------
    merged_stories = []

    for cluster in clusters:
        # generate NEW unique story id
        new_id = "story_" + uuid.uuid4().hex[:12]

        # consolidate fields
        merged = {
            "id": new_id,
            "title": stories[cluster[0]].get("title", ""),
            "content": " ".join([stories[i]["content"] for i in cluster]),
            "source": stories[cluster[0]].get("source", ""),
            "date": max([stories[i].get("date", "") for i in cluster]),
            "cluster_size": len(cluster)
        }

        merged_stories.append(merged)

    print(f"[Dedupe Agent] Final deduplicated count: {len(merged_stories)}")

    return merged_stories

Writing dedupe_agent.py


In [28]:
from dedupe_agent import dedupe_cluster
stories = load_cleaned_news()
deduped = dedupe_cluster(stories)
print(len(deduped))

[Ingestion Agent] Loading cleaned dataset from data/all_news_cleaned.json ...
[Ingestion Agent] Detected JSON ARRAY format
[Ingestion Agent] Loaded 3606 cleaned items
[Dedupe Agent] Starting semantic clustering...
[Dedupe Agent] Formed 2496 unique story clusters.
[Dedupe Agent] Final deduplicated count: 2496
2496


**Entity Agent**

In [29]:
# entity_agent_hybrid.py
%%writefile entity_agent_hybrid.py


"""
Hybrid Entity Extraction Agent (SpaCy + optional LLM fallback)
- Main entry: extract_entities(stories, use_llm=False, llm=None)
- stories: list of dicts with keys like "title", "content", "id", "date", ...
- returns: list of enriched story dicts with "entities" key:
    entities = {
        "companies": [{"name":..., "symbol":..., "span":(start,end)} ...],
        "sectors": [...],
        "regulators": [...],
        "events": [...],
        "confidence": float  # aggregate confidence 0..1
    }
"""

from typing import List, Dict, Any, Optional
import spacy
from spacy.matcher import PhraseMatcher
import re
import json
import time

# -------------------------
# Configuration / mappings
# -------------------------
# Minimal company->symbol map. Extend this from your NSE/BSE CSV.
COMPANY_TO_SYMBOL = {
    "HDFC BANK": "HDFCBANK",
    "HDFC": "HDFCBANK",
    "ICICI BANK": "ICICIBANK",
    "INFOSYS": "INFY",
    "TATA CONSULTANCY SERVICES": "TCS",
    "TCS": "TCS",
    "WIPRO": "WIPRO",
    "RELIANCE": "RELIANCE",
    "AXIS BANK": "AXISBANK",
    "KOTAK MAHINDRA BANK": "KOTAKBANK",
    "STATE BANK": "SBIN",
    "STATE BANK OF INDIA": "SBIN",
    # add from authoritative mapping CSV as you have it
}

SECTOR_KEYWORDS = {
    "banking": ["bank", "lender", "banking", "banking sector"],
    "it": ["IT", "technology", "software", "IT services", "tech"],
    "energy": ["oil", "gas", "refinery", "energy"],
    "finance": ["finance", "financial", "NBFC", "lending"],
}

REGULATOR_KEYWORDS = {
    "RBI": ["Reserve Bank of India", r"\bRBI\b"],
    "SEBI": ["Securities and Exchange Board", r"\bSEBI\b"],
}

EVENT_KEYWORDS = [
    "merger", "acquisition", "penalty", "fine", "dividend", "default", "fraud",
    "buyback", "rights issue", "split", "bonus issue", "listing", "delisting"
]

# -------------------------
# Load spaCy once (small model for speed)
# -------------------------
try:
    nlp = spacy.load("en_core_web_sm")
except Exception:
    import spacy.cli
    spacy.cli.download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")

# Build phrase matcher for companies (from mapping) to increase recall
_phrase_matcher = PhraseMatcher(nlp.vocab, attr="LOWER")
_company_patterns = [nlp.make_doc(name) for name in COMPANY_TO_SYMBOL.keys()]
if _company_patterns:
    _phrase_matcher.add("COMPANY_MAP", _company_patterns)

# -------------------------
# Helper functions
# -------------------------
def _find_companies_spacy(text: str) -> List[Dict[str, Any]]:
    """Use spaCy NER + phrase matcher for company detection."""
    doc = nlp(text)
    comps = []

    # spaCy ORG entities
    for ent in doc.ents:
        if ent.label_ in ("ORG", "PRODUCT"):
            name = ent.text.strip()
            symbol = COMPANY_TO_SYMBOL.get(name.upper())
            comps.append({"name": name, "symbol": symbol, "span": (ent.start_char, ent.end_char), "source": "spacy"})

    # phrase matcher to cover common abbreviations / names
    matches = _phrase_matcher(doc)
    for match_id, start, end in matches:
        span = doc[start:end]
        name = span.text.strip()
        # avoid duplicates
        if not any(c["name"].lower() == name.lower() for c in comps):
            comps.append({"name": name, "symbol": COMPANY_TO_SYMBOL.get(name.upper()), "span": (span.start_char, span.end_char), "source": "matcher"})

    seen=set(); uniq=[]
    for c in comps:
        key = c.get("symbol") or c["name"].lower()
        if key in seen: continue
        seen.add(key)
        uniq.append(c)
    return uniq

def _find_sectors(text: str) -> List[str]:
    found=set()
    tl = text.lower()
    for sector, words in SECTOR_KEYWORDS.items():
        for w in words:
            if w.lower() in tl:
                found.add(sector)
                break
    return list(found)

def _find_regulators(text: str) -> List[str]:
    found=set()
    for reg, keys in REGULATOR_KEYWORDS.items():
        for k in keys:
            if re.search(k, text, flags=re.IGNORECASE):
                found.add(reg)
                break
    return list(found)

def _find_events(text: str) -> List[str]:
    found=set()
    tl = text.lower()
    for ev in EVENT_KEYWORDS:
        if ev.lower() in tl:
            found.add(ev)
    return list(found)

# -------------------------
# OPTIONAL: LLM fallback helper
# -------------------------
LLM_PROMPT = """Extract entities from the following news text.
Return a JSON object with fields: companies (list of names), sectors (list), regulators (list), events (list).
Example output:
{{"companies":["HDFC Bank"], "sectors":["banking"], "regulators":["RBI"], "events":["penalty"]}}
Text:
\"\"\"{text}\"\"\"
"""

def _call_llm_extract(text: str, llm) -> Dict[str, Any]:
    """
    Minimal LLM wrapper. Expects `llm` to have generate_content(prompt) -> returns object with .text
    The response text must be valid JSON object (we try to parse).
    """
    prompt = LLM_PROMPT.format(text=text[:3000])  # limit prompt size
    resp = llm.generate_content(prompt)
    # resp may be an object; try str(resp.text)
    content = getattr(resp, "text", str(resp))
    try:
        out = json.loads(content)
    except Exception:
        # if the LLM responds with backticks or explanation, try to extract JSON substring
        m = re.search(r"\{.*\}", content, flags=re.DOTALL)
        if m:
            try:
                out = json.loads(m.group(0))
            except Exception:
                out = {}
        else:
            out = {}
    # normalize keys
    return {
        "companies": out.get("companies", []) if isinstance(out.get("companies", []), list) else [],
        "sectors": out.get("sectors", []) if isinstance(out.get("sectors", []), list) else [],
        "regulators": out.get("regulators", []) if isinstance(out.get("regulators", []), list) else [],
        "events": out.get("events", []) if isinstance(out.get("events", []), list) else []
    }

# -------------------------
# Main extraction functions
# -------------------------
def extract_entities_from_text(text: str, use_llm: bool=False, llm: Optional[Any]=None) -> Dict[str, Any]:
    """
    Extract entities for a single text span (title + content).
    Returns a dict {companies: [...], sectors: [...], regulators: [...], events: [...], confidence: float}
    confidence is heuristic: 0..1 (1 when many signals)
    """
    text = (text or "").strip()
    out = {"companies": [], "sectors": [], "regulators": [], "events": [], "confidence": 0.0}

    if not text:
        return out

    # spaCy + rule-based
    companies = _find_companies_spacy(text)
    sectors = _find_sectors(text)
    regulators = _find_regulators(text)
    events = _find_events(text)

    # baseline confidence calculation
    conf = 0.0
    if companies: conf += 0.4
    if sectors: conf += 0.2
    if regulators: conf += 0.2
    if events: conf += 0.2
    conf = min(1.0, conf)

    out["companies"] = companies
    out["sectors"] = sectors
    out["regulators"] = regulators
    out["events"] = events
    out["confidence"] = conf

    # LLM fallback when confidence low and user asked for LLM
    if use_llm and llm and conf < 0.7:
        try:
            llm_res = _call_llm_extract(text, llm)
            # merge LLM results with spaCy (avoid duplicates)
            # LLM returns plain names; attach symbol mapping if available
            for name in llm_res.get("companies", []):
                if not any(c["name"].lower() == name.lower() for c in out["companies"]):
                    out["companies"].append({
                        "name": name,
                        "symbol": COMPANY_TO_SYMBOL.get(name.upper()),
                        "span": None,
                        "source": "llm"
                    })
            # sectors/regulators/events -> just append unique
            for k in ("sectors","regulators","events"):
                for v in llm_res.get(k, []):
                    if v and v not in out[k]:
                        out[k].append(v)
            # bump confidence
            out["confidence"] = min(1.0, max(out["confidence"], 0.85))
        except Exception as e:
            # LLM error -> ignore and keep spaCy result
            pass

    return out

def extract_entities(stories: List[Dict[str, Any]], use_llm: bool=False, llm: Optional[Any]=None, batch_size: int=128) -> List[Dict[str, Any]]:
    """
    Batch extractor. stories: list of dicts which must contain 'title' or 'content' keys.
    Returns new list where each story has 'entities' key added (dict).
    """
    enriched=[]
    t0=time.time()
    for i, s in enumerate(stories):
        text = (s.get("title","") or "") + "\n" + (s.get("content","") or "")
        ent = extract_entities_from_text(text, use_llm=use_llm, llm=llm)
        new_s = s.copy()
        new_s["entities"] = ent
        enriched.append(new_s)
        # optional: small progress print
        if (i+1) % 500 == 0:
            print(f"[entity_agent] processed {i+1}/{len(stories)}")
    dt=time.time()-t0
    print(f"[entity_agent] Finished {len(stories)} stories in {dt:.1f}s (avg {dt/len(stories):.3f}s per story)")
    return enriched


Writing entity_agent_hybrid.py


In [30]:
from entity_agent_hybrid import extract_entities
stories_with_entities = extract_entities(deduped, use_llm=False)   # 'news' loaded earlier from ingestion agent
print(len(stories_with_entities))
print(stories_with_entities[0]["entities"])


[entity_agent] processed 500/2496
[entity_agent] processed 1000/2496
[entity_agent] processed 1500/2496
[entity_agent] processed 2000/2496
[entity_agent] Finished 2496 stories in 46.0s (avg 0.018s per story)
2496
{'companies': [{'name': 'Exchange', 'symbol': None, 'span': (51, 59), 'source': 'spacy'}, {'name': 'Intimation under Regulation 30', 'symbol': None, 'span': (66, 96), 'source': 'spacy'}, {'name': 'the Securities and Exchange Board of India (Prohibition of Insider Trading) Regulations', 'symbol': None, 'span': (292, 379), 'source': 'spacy'}, {'name': 'HDFC Bank Limited', 'symbol': None, 'span': (571, 588), 'source': 'spacy'}, {'name': 'National Stock Exchange of', 'symbol': None, 'span': (762, 788), 'source': 'spacy'}, {'name': 'BSE Limited', 'symbol': None, 'span': (807, 818), 'source': 'spacy'}, {'name': 'HDFC Life Insurance Company Limited', 'symbol': None, 'span': (822, 857), 'source': 'spacy'}, {'name': 'Mauritius Holdings', 'symbol': None, 'span': (889, 907), 'source': 's

**Impact Agent**

In [31]:
# impact_agent.py
%%writefile impact_agent.py

# -------------------------------------------------------
# 1. COMPANY → STOCK Symbol Mapping
# -------------------------------------------------------

COMPANY_TO_SYMBOL = {
    "HDFC BANK": "HDFCBANK",
    "HDFC": "HDFCBANK",
    "ICICI BANK": "ICICIBANK",
    "INFOSYS": "INFY",
    "TATA CONSULTANCY SERVICES": "TCS",
    "WIPRO": "WIPRO",
    "RELIANCE": "RELIANCE",
    "AXIS BANK": "AXISBANK",
    "KOTAK MAHINDRA BANK": "KOTAKBANK",
    # Add more as needed
}

# -------------------------------------------------------
# 2. Sector → Multiple Stock Symbols
# -------------------------------------------------------

SECTOR_TO_SYMBOLS = {
    "banking": ["HDFCBANK", "ICICIBANK", "AXISBANK", "KOTAKBANK"],
    "it": ["INFY", "TCS", "WIPRO"],
    "oil_gas": ["RELIANCE"],
    "finance": ["HDFCBANK", "ICICIBANK"],
    # Extend depending on data
}

# -------------------------------------------------------
# 3. Regulator → Market-Wide Index Impact
# -------------------------------------------------------

REGULATOR_IMPACT = {
    "RBI": "^NIFTYBANK",
    "SEBI": "^NIFTY50",
}


# -------------------------------------------------------
# ★ Main Agent Function
# -------------------------------------------------------

def map_stock_impact(stories_with_entities):
    """
    Input:
        [
          {
             "id": "...",
             "title": "...",
             "content": "...",
             "date": "...",
             "entities": {
                "companies": [...],
                "sectors": [...],
                "regulators": [...],
                "events": [...]
             }
          },
          ...
        ]

    Output:
        Same list BUT each story has an additional key:
        story["impact"] = [ {symbol, confidence, type, ...}, ... ]
    """

    final_stories = []

    for story in stories_with_entities:
        entities = story.get("entities", {})
        impact_list = []

        # ------------------------------
        # 1. Direct Company → Equity Impact
        # ------------------------------
        for comp in entities.get("companies", []):
            if isinstance(comp, dict):
                comp_text = comp.get("text", "")
            else:
                comp_text = comp

            key = comp_text.upper().strip()
            if key in COMPANY_TO_SYMBOL:
                impact_list.append({
                    "symbol": COMPANY_TO_SYMBOL[key],
                    "confidence": 1.0,
                    "type": "direct_company",
                    "entity": comp_text
                })

        # ------------------------------
        # 2. Sector → Multi-stock Impact
        # ------------------------------
        for sector in entities.get("sectors", []):
          if isinstance(sector, dict):
                  sec = sector.get("text", "")
          else:
              sec = sector

          sec_key = sec.lower().strip()
          if sec_key in SECTOR_TO_SYMBOLS:
              for sym in SECTOR_TO_SYMBOLS[sec_key]:
                  impact_list.append({
                      "symbol": sym,
                      "confidence": 0.7,
                      "type": "sector",
                      "sector": sec
                  })

        # ------------------------------
        # 3. Regulator Activity → Market Index Impact
        # ------------------------------
        for reg in entities.get("regulators", []):
            if isinstance(reg, dict):
                reg_text = reg.get("text", "")
            else:
                reg_text = reg

            reg_key = reg_text.upper().strip()
            if reg_key in REGULATOR_IMPACT:
                impact_list.append({
                    "symbol": REGULATOR_IMPACT[reg_key],
                    "confidence": 0.5,
                    "type": "regulator",
                    "regulator": reg_text
                })

        # ------------------------------
        # 4. No Entities? → Assign No Impact
        # ------------------------------
        if not impact_list:
            impact_list = []

        # ------------------------------
        # Add impact to story
        # ------------------------------
        enriched_story = story.copy()
        enriched_story["impact"] = impact_list

        final_stories.append(enriched_story)

    return final_stories

def compute_impact_for_stories(stories_with_entities):
    return map_stock_impact(stories_with_entities)


Writing impact_agent.py


In [32]:
from impact_agent import compute_impact_for_stories
stories_with_impact = compute_impact_for_stories(stories_with_entities)
print(stories_with_impact[0]["entities"])

{'companies': [{'name': 'Exchange', 'symbol': None, 'span': (51, 59), 'source': 'spacy'}, {'name': 'Intimation under Regulation 30', 'symbol': None, 'span': (66, 96), 'source': 'spacy'}, {'name': 'the Securities and Exchange Board of India (Prohibition of Insider Trading) Regulations', 'symbol': None, 'span': (292, 379), 'source': 'spacy'}, {'name': 'HDFC Bank Limited', 'symbol': None, 'span': (571, 588), 'source': 'spacy'}, {'name': 'National Stock Exchange of', 'symbol': None, 'span': (762, 788), 'source': 'spacy'}, {'name': 'BSE Limited', 'symbol': None, 'span': (807, 818), 'source': 'spacy'}, {'name': 'HDFC Life Insurance Company Limited', 'symbol': None, 'span': (822, 857), 'source': 'spacy'}, {'name': 'Mauritius Holdings', 'symbol': None, 'span': (889, 907), 'source': 'spacy'}, {'name': "HDFC Credila Financial Services Limited'", 'symbol': None, 'span': (1221, 1261), 'source': 'spacy'}, {'name': 'the Securities and Exchange Board of India', 'symbol': None, 'span': (1930, 1972), '

**Storage Agent**

In [33]:
# storage.py (fixed)
%%writefile storage.py

import json
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer

embedder = SentenceTransformer("all-MiniLM-L6-v2")

def init_chroma():
    return chromadb.PersistentClient(path="vectorstore")

def store_stories_chroma(stories):
    chroma_client = init_chroma()

    collection = chroma_client.get_or_create_collection(
        name="news_index",
        metadata={"hnsw:space": "cosine"}
    )

    ids = []
    docs = []
    metas = []

    for story in stories:
        story_id = story.get("id") or story.get("title")
        ids.append(str(story_id))

        text_block = story.get("title", "") + "\n" + story.get("content", "")
        docs.append(text_block)

        # --- FIX: convert LIST → STRING for impact ---
        impact = story.get("impact", [])
        impact_str = json.dumps(impact)  # Chroma-safe

        metas.append({
            "title": story.get("title", ""),
            "date": story.get("date", ""),
            "source": story.get("source", ""),
            "impact": impact_str
        })


    # ---- FIX: Generate 1 embedding per document ----
    embeddings = embedder.encode(docs, convert_to_numpy=True).tolist()

    if not (len(ids) == len(docs) == len(metas) == len(embeddings)):
        raise ValueError("Length mismatch detected before adding to Chroma")

    collection.add(
        ids=ids,
        documents=docs,
        metadatas=metas,
        embeddings=embeddings   # <-- CORRECT
    )

    print(f"Stored {len(ids)} stories in Chroma.")


Writing storage.py


In [34]:
from storage import init_chroma, store_stories_chroma, store_stories_chroma

conn = init_chroma()
# store_stories_chroma(conn, consolidated_annotated_stories)
store_stories_chroma(stories_with_impact)


Stored 2496 stories in Chroma.


**Query Agent**

In [35]:
# query_agent2.py
%%writefile query_agent.py

import chromadb
from sentence_transformers import SentenceTransformer
import numpy as np
from datetime import datetime

model = SentenceTransformer("all-MiniLM-L6-v2")

client = chromadb.PersistentClient(path="vectorstore")
collection = client.get_collection("news_index")   # << your correct collection

# -------------------------------------
# Helper: embed
# -------------------------------------
def embed(text: str):
    return model.encode(text).tolist()

# -------------------------------------
# 1) Retrieve documents
# -------------------------------------
def retrieve(query: str, top_k: int = 10):
    q_emb = embed(query)
    res = collection.query(
        query_embeddings=[q_emb],
        n_results=top_k
    )

    matches = []
    for i in range(len(res["ids"][0])):
        matches.append({
            "id": res["ids"][0][i],
            "text": res["documents"][0][i],
            "metadata": res["metadatas"][0][i],
            "score": res["distances"][0][i]
        })

    return matches

# -------------------------------------
# 2) Filter using entities (strict)
# -------------------------------------
def entity_filter(matches, required_terms):
    """
    required_terms = ["RBI", "HDFC"]
    """
    filtered = []
    for m in matches:
        text = m["text"].lower()
        if all(term.lower() in text for term in required_terms):
            filtered.append(m)
    return filtered if filtered else matches   # fallback

# -------------------------------------
# 3) Date sort (recent > old)
# -------------------------------------
def sort_by_date(matches):
    def extract_date(m):
        dt = m["metadata"].get("date", "")
        try:
            return datetime.fromisoformat(dt.replace("Z",""))
        except:
            return datetime.min
    return sorted(matches, key=extract_date, reverse=True)

# -------------------------------------
# 4) Summarization (LLM)
# -------------------------------------
# import google.generativeai as genai
# genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

import google.generativeai as genai
genai.configure(api_key="*******")


def summarize_context(query, context):
    # model = genai.GenerativeModel("gemini-2.5-flash-lite")
    model = genai.GenerativeModel("gemini-2.5-flash-lite")

    prompt = f"""
You are a factual financial assistant. Use ONLY the CONTEXT below to answer the user's question.
If the answer is not present in the context, reply exactly: "Information not available in the indexed news."

User question:
{query}

CONTEXT (do not invent anything):
{context}

Return:
1) A Polished human factual answer.
2) A short evidence line citing the source title and date.
3) A confidence score between 0.0 and 1.0 (based only on context quality).
Format the output as JSON with keys: answer, evidence, confidence.
"""

    resp = model.generate_content(prompt)
    return resp.text

# -------------------------------------
# Main agent
# -------------------------------------
def ask_financial_agent(query: str):
    # Step 1: retrieve
    retrieved = retrieve(query, top_k=12)

    # Step 2: entity filter
    important_terms = ["RBI", "HDFC"]
    filtered = entity_filter(retrieved, important_terms)

    # Step 3: sort by recency
    final_list = sort_by_date(filtered)

    # Step 4: build context
    context = "\n\n".join([m["text"] for m in final_list[:5]])

    # Step 5: LLM summary
    answer = summarize_context(query, context)

    return {
        "query": query,
        "answer": answer,
        "matches": final_list[:5]
    }


Writing query_agent.py


In [36]:
from query_agent import ask_financial_agent

res = ask_financial_agent("Why did RBI penalize HDFC Bank?")
print(res["answer"])


```json
{
 "answer": "The Reserve Bank of India (RBI) imposed a penalty on HDFC Bank because the bank violated provisions of the Banking Regulation Act, 1949, and failed to comply with certain RBI directions. Specific lapses mentioned include KYC, interest rate, and outsourcing compliance.",
 "evidence": "Rs 91 lakh PENALTY on HDFC Bank: RBI's strict action against India's largest lender for THIS reason - DETAILS. According to the central bank, HDFC Bank violated provisions of the Banking Regulation Act, 1949, and failed to comply with certain RBI directions. HDFC Bank faces Rs 91 lakh RBI penalty for regulatory violations; stock in focus on Dec 1. The Exchange has sought clarification from HDFC Bank Limited with respect to recent news item captioned RBI imposes monetary penalty on HDFC Bank for lapses in KYC, interest rate and outsourcing compliance.",
 "confidence": 1.0
}
```


***Question to Test***

In [None]:
Why did RBI penalize HDFC Bank?
What penalties were imposed on HDFC Bank by RBI?
What are the recent market predictions for Nifty 50?
Which stocks did SEBI recently investigate for irregular trading?
What is the latest announcement from Reliance Industries regarding Jio?
What new regulations has SEBI introduced for mutual funds?
What are the penalties imposed on banks by RBI in the last 6 months?

# **Multi Agent Pipeline**


*   Implementation of Agents Sequence
*   Generation Response using Query Agent

*   Creating Node and Edges for Pipeline





In [37]:
# news_pipeline.py
%%writefile news_pipeline.py

import json
from typing import List, Dict, Any


# ---- IMPORT your existing agent functions ----


from ingest_agent import load_cleaned_news
from dedupe_agent import dedupe_cluster
from entity_agent_hybrid import extract_entities
from impact_agent import compute_impact_for_stories
from storage import store_stories_chroma


# -------------------------
# Node: ingest_node
# -------------------------
def ingest_node(path: str = "data/all_news_cleaned.json") -> List[Dict[str, Any]]:
    """
    Load cleaned dataset (single source-of-truth).
    """
    print("[ingest_node] Loading cleaned dataset...")
    stories = load_cleaned_news(path)
    print(f"[ingest_node] Loaded {len(stories)} items.")
    return stories

# -------------------------
# Node: dedupe_node
# -------------------------
def dedupe_node(stories: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Semantic deduplication; returns merged stories with NEW unique ids.
    """
    print("[dedupe_node] Running semantic deduplication...")
    merged = dedupe_cluster(stories)
    print(f"[dedupe_node] Dedupe result: {len(merged)} clusters.")
    return merged

# -------------------------
# Node: entity_node
# -------------------------
def entity_node(stories: List[Dict[str, Any]], use_llm: bool=False, llm=None) -> List[Dict[str, Any]]:
    """
    Run entity extraction (hybrid). Adds key 'entities' to each story.
    """
    print("[entity_node] Extracting entities (spaCy + optional LLM)...")
    enriched = extract_entities(stories, use_llm=use_llm, llm=llm)
    print(f"[entity_node] Completed entity extraction for {len(enriched)} stories.")
    return enriched

# -------------------------
# Node: impact_node
# -------------------------
def impact_node(stories_with_entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Compute stock impact and add 'impact' field to each story.
    """
    print("[impact_node] Computing stock impact...")
    impacted = compute_impact_for_stories(stories_with_entities)
    print(f"[impact_node] Impact computed for {len(impacted)} stories.")
    return impacted

# -------------------------
# Node: store_node
# -------------------------
def store_node(stories_with_impact: List[Dict[str, Any]], to_chroma: bool=True, to_sqlite: bool=False) -> Dict[str, Any]:
    """
    Store final enriched stories into Chroma (and optionally SQLite).
    Returns metrics dict.
    """
    metrics = {"stored_chroma": 0, "stored_sqlite": 0}
    if to_chroma:
        print("[store_node] Storing to Chroma...")
        store_stories_chroma(stories_with_impact)
        metrics["stored_chroma"] = len(stories_with_impact)
    if to_sqlite:
        print("[store_node] Storing to SQLite (if implemented)...")
        conn = init_sqlite()
        store_stories_sqlite(conn, stories_with_impact)
        metrics["stored_sqlite"] = len(stories_with_impact)
    print("[store_node] Storage complete.")
    return metrics

# -------------------------
# Orchestrator: run_news_pipeline
# -------------------------
def run_news_pipeline(path: str = "data/all_news_cleaned.json", use_llm_for_entities: bool=False, llm=None, to_chroma: bool=True, to_sqlite: bool=False):
    """
    Executes the sequential LangGraph-style pipeline (news processing).
    Returns final stored count and sample item.
    """
    # Node 1: ingest
    stories = ingest_node(path)

    # Node 2: dedupe
    deduped = dedupe_node(stories)

    # Node 3: entity extraction
    stories_with_entities = entity_node(deduped, use_llm=use_llm_for_entities, llm=llm)

    # Node 4: impact
    stories_with_impact = impact_node(stories_with_entities)

    # Node 5: store
    metrics = store_node(stories_with_impact, to_chroma=to_chroma, to_sqlite=to_sqlite)

    # Return summary
    return {
        "input_count": len(stories),
        "deduped_count": len(deduped),
        "stored_chroma": metrics["stored_chroma"],
        "sample_output": stories_with_impact[0] if stories_with_impact else None
    }


Writing news_pipeline.py


**Generation Response using Query Agent**

In [38]:
# query_pipeline.py
%%writefile query_pipeline.py

import json
import re
import chromadb
from datetime import datetime
from google import generativeai as genai

# -------------------------------
# Configure LLM
# -------------------------------
genai.configure(api_key="*********")
llm = genai.GenerativeModel("gemini-2.5-flash-lite")

# -------------------------------
# Chroma Client
# -------------------------------
CHROMA_PATH = "vectorstore"
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
collection = chroma_client.get_collection("news_index")


# -------------------------------
# Evidence dedupe
# -------------------------------
def dedupe_evidence(evidence_list):
    seen = {}

    for ev in evidence_list:
        key = ev.get("id") or ev.get("title", "").strip().lower()

        try:
            dt = datetime.fromisoformat(ev.get("date", "").replace("Z", ""))
        except:
            dt = None

        if key in seen:
            prev = seen[key]
            try:
                prev_dt = datetime.fromisoformat(prev.get("date", "").replace("Z", ""))
            except:
                prev_dt = None

            if dt and (not prev_dt or dt > prev_dt):
                seen[key] = ev
        else:
            seen[key] = ev

    def _sort(e):
        try:
            return datetime.fromisoformat(e.get("date", "").replace("Z", ""))
        except:
            return datetime.min

    return sorted(seen.values(), key=_sort, reverse=True)


# -------------------------------
# Confidence score
# -------------------------------
def compute_confidence_from_evidence(evidence_list):
    if not evidence_list:
        return 0.0

    base = 0.4
    sources = {ev.get("source") or ev.get("id") for ev in evidence_list}

    base += min(0.95 - base, 0.15 * (len(sources) - 1))

    try:
        dt = datetime.fromisoformat(evidence_list[0]["date"].replace("Z", ""))
        if (datetime.utcnow() - dt).days <= 30:
            base += 0.05
    except:
        pass

    return round(min(base, 1.0), 2)


# -------------------------------
# JSON-STRICT Prompt
# -------------------------------
PROMPT_TEMPLATE = """
You are a financial news analysis assistant.

Using ONLY the provided evidence below, answer the user's question.
Summarize in 2–3 factual sentences. DO NOT add external information.

Your ENTIRE output MUST be ONLY valid JSON.
It MUST start with an opening curly brace and end with a closing curly brace.
Do NOT include backticks, quotes around braces, or any explanation.
Output MUST be strictly valid JSON parseable using json.loads().


USER QUESTION:
{query}

EVIDENCE (multiple documents):
{context}

Return JSON with this exact shape:
{{
  "query": "...",
  "answer": "...",          # short factual answer ONLY from evidence
  "confidence": 0.0,        # 1.0 if answer clearly exists, else 0.0
  "evidence": []            # list of items you used to produce answer
}}

Rules:
- If the evidence clearly explains the reason → summarize it.
- If the evidence does NOT contain the answer → "Information not available in the indexed news."
- DO NOT hallucinate.
- DO NOT write anything outside JSON.
"""


# -------------------------------
# Retrieve from Chroma
# -------------------------------
def retrieve_news(query, k=5):
    q = collection.query(query_texts=[query], n_results=k)
    docs = []
    for i in range(len(q["documents"][0])):
        docs.append({
            "id": q["ids"][0][i],
            "text": q["documents"][0][i],
            "metadata": q["metadatas"][0][i],
            "score": q["distances"][0][i]
        })
    return docs


# -------------------------------
# JSON Extractor
# -------------------------------
def safe_parse_json(text: str):
    try:
        m = re.search(r"\{.*\}", text, flags=re.DOTALL)
        if m:
            return json.loads(m.group())
    except Exception:
        return None
    return None


# -------------------------------
# Main Query Function
# -------------------------------
def ask_financial_agent(user_query: str, k=5):
    matches = retrieve_news(user_query, k=k)

    if not matches:
        return {
            "query": user_query,
            "answer": "Information not available in the indexed news.",
            "confidence": 0.0,
            "evidence": []
        }

    # Build clean context — DO NOT TRUNCATE
    context_chunks = []
    for m in matches:
        meta = m.get("metadata", {}) or {}
        context_chunks.append(
            f"ID: {m.get('id','')}\n"
            f"TITLE: {meta.get('title','')}\n"
            f"DATE: {meta.get('date','')}\n"
            f"SOURCE: {meta.get('source','')}\n"
            f"CONTENT:\n{m.get('text','')}\n"
            f"---------------------------"
        )

    context = "\n\n".join(context_chunks)

    # Run LLM
    prompt = PROMPT_TEMPLATE.format(query=user_query, context=context)

    try:
        llm_resp = llm.generate_content(prompt)
        raw = llm_resp.text.strip()
        parsed = safe_parse_json(raw)

        if parsed:
            if "query" not in parsed:
                parsed["query"] = user_query
            if "evidence" not in parsed:
                parsed["evidence"] = matches[:5]
            return parsed

    except Exception as e:
        print("[LLM error]", e)

    # -------------------------------
    # Fallback (rare)
    # -------------------------------
    evidence = [{
        "id": m["id"],
        "title": (m.get("metadata") or {}).get("title", ""),
        "date": (m.get("metadata") or {}).get("date", ""),
        "snippet": m.get("text", ""),
        "source": (m.get("metadata") or {}).get("source", "")
    } for m in matches]

    return {
        "query": user_query,
        "answer": matches[0]["text"],
        "confidence": compute_confidence_from_evidence(evidence),
        "evidence": dedupe_evidence(evidence)
    }


Writing query_pipeline.py


**Creating Node and Edges for Pipeline**

In [39]:
# news_graph_pipeline.py

from langgraph.graph import StateGraph, END
from news_pipeline import ingest_node, dedupe_node, entity_node, impact_node, store_node

from query_pipeline import retrieve_news, ask_financial_agent
# -------------------------
# NODE WRAPPERS
# -------------------------
def node_ingest(state):
    stories = ingest_node()
    state["raw_news"] = stories
    return state

def node_dedupe(state):
    stories = state.get("raw_news", [])
    deduped = dedupe_node(stories) if stories else []
    state["deduped_news"] = deduped
    return state

def node_entities(state):
    stories = state.get("deduped_news", [])
    enriched = entity_node(stories) if stories else []
    state["enriched_news"] = enriched
    return state

def node_impact(state):
    stories = state.get("enriched_news", [])
    impacted = impact_node(stories) if stories else []
    state["impacted_news"] = impacted
    return state

def node_storage(state):
    stories = state.get("impacted_news", [])
    if stories:
        store_node(stories)
    state["stored"] = True
    return state

# -------------------------
# BUILD GRAPH
# -------------------------
news_graph = StateGraph(dict)
news_graph.add_node("ingest", node_ingest)
news_graph.add_node("dedupe", node_dedupe)
news_graph.add_node("entities", node_entities)
news_graph.add_node("impact", node_impact)
news_graph.add_node("storage", node_storage)

news_graph.set_entry_point("ingest")
news_graph.add_edge("ingest", "dedupe")
news_graph.add_edge("dedupe", "entities")
news_graph.add_edge("entities", "impact")
news_graph.add_edge("impact", "storage")
news_graph.add_edge("storage", END)

news_pipeline = news_graph.compile()


# ===========================================
# PART B — QUERY PROCESSING GRAPH
# ===========================================

# ---------- STATE for query pipeline ----------
class QueryState(dict):
    """
    Keys:
    - query
    - docs
    - answer
    """
    pass


# ---------- Node 1: Retrieval ----------
def node_retrieve(state):
    if state is None:
        state = {}
    new_state = dict(state)

    query = new_state.get("query", "")
    docs = retrieve_news(query)

    new_state["docs"] = docs
    return new_state


# ---------- Node 2: LLM Answer ----------
def node_llm_answer(state: QueryState):
    if state is None:
        state = {}

    state = dict(state)

    query = state.get("query", "")
    docs = state.get("docs", [])

    if not query:
        state["answer"] = {"error": "Query missing"}
        return state

    try:
        answer = ask_financial_agent(query) or {}
    except Exception as e:
        print("[ERROR in ask_financial_agent]", e)
        answer = {"error": str(e)}

    state["answer"] = answer
    return state




# ---------- Build Query Graph ----------
query_graph_builder = StateGraph(QueryState)

# query_graph_builder.add_node("retrieve", node_retrieve)
# query_graph_builder.add_node("answer", node_llm_answer)

query_graph_builder.add_node("retrieve", lambda s: print("STATE AFTER RETRIEVE:", s) or node_retrieve(s))
query_graph_builder.add_node("answer", lambda s: print("STATE BEFORE ANSWER:", s) or node_llm_answer(s))


query_graph_builder.set_entry_point("retrieve")
query_graph_builder.add_edge("retrieve", "answer")
query_graph_builder.add_edge("answer", END)


query_pipeline = query_graph_builder.compile()


# ===========================================
# HOW TO RUN BOTH PIPELINES
# ===========================================

def run_news_pipeline():
    print("\n Running News Processing Pipeline...")
    result = news_pipeline.invoke({})
    print("✔ Completed news pipeline")
    return result

def run_query(query):
    print(f"\n Running Query Pipeline for: {query}")

    state = {"query": query}

    # Retrieve step
    state = node_retrieve(state)
    if not isinstance(state, dict):
        state = {"query": query, "docs": []}

    # LLM step (THIS RETURNS FINAL CLEAN RESULT)
    result = node_llm_answer(state)
    if not isinstance(result, dict):
        result = {
            "query": query,
            "answer": "Information not available in the indexed news.",
            "confidence": 0.0,
            "evidence": []
        }

    # -------------------------------
    # Pretty Print
    # -------------------------------
    print("\n FINAL ANSWER:")
    print(f"Query: {result.get('query')}\n")

    print("Answer:")
    print(result.get("answer"), "\n")

    print("Confidence:", result.get("confidence"), "\n")

    print("Evidence:\n")
    for ev in result.get("evidence", []):
        print(f" • ID: {ev.get('id')}")
        print(f"   Title: {ev.get('title')}")
        print(f"   Date: {ev.get('date')}")
        print(f"   Snippet: {ev.get('snippet')}\n")

    return result


if __name__ == "__main__":
    run_news_pipeline()

    sample_query = "Why did RBI penalize HDFC Bank?"
    answer = run_query(sample_query)
    print("\n FINAL ANSWER:", answer)



 Running News Processing Pipeline...
[ingest_node] Loading cleaned dataset...
[Ingestion Agent] Loading cleaned dataset from data/all_news_cleaned.json ...
[Ingestion Agent] Detected JSON ARRAY format
[Ingestion Agent] Loaded 3606 cleaned items
[ingest_node] Loaded 3606 items.
[dedupe_node] Running semantic deduplication...
[Dedupe Agent] Starting semantic clustering...
[Dedupe Agent] Formed 2496 unique story clusters.
[Dedupe Agent] Final deduplicated count: 2496
[dedupe_node] Dedupe result: 2496 clusters.
[entity_node] Extracting entities (spaCy + optional LLM)...
[entity_agent] processed 500/2496
[entity_agent] processed 1000/2496
[entity_agent] processed 1500/2496
[entity_agent] processed 2000/2496
[entity_agent] Finished 2496 stories in 45.7s (avg 0.018s per story)
[entity_node] Completed entity extraction for 2496 stories.
[impact_node] Computing stock impact...
[impact_node] Impact computed for 2496 stories.
[store_node] Storing to Chroma...
Stored 2496 stories in Chroma.
[stor

/root/.cache/chroma/onnx_models/all-MiniLM-L6-v2/onnx.tar.gz: 100%|██████████| 79.3M/79.3M [00:01<00:00, 43.9MiB/s]



 FINAL ANSWER:
Query: Why did RBI penalize HDFC Bank?

Answer:
{'query': 'Why did RBI penalize HDFC Bank?', 'answer': 'The Reserve Bank of India (RBI) penalized HDFC Bank with Rs 91 lakh for violating provisions of the Banking Regulation Act, 1949, and failing to comply with certain RBI directions. These violations included lapses in Know Your Customer (KYC), interest rate, and outsourcing compliance.', 'confidence': 1.0, 'evidence': ['story_9c7a2ef17eff', 'story_a0e686faae23', 'story_cb29204f507e']} 

Confidence: None 

Evidence:


 FINAL ANSWER: {'query': 'Why did RBI penalize HDFC Bank?', 'docs': [{'id': 'story_9c7a2ef17eff', 'text': "Rs 91 lakh PENALTY on HDFC Bank: RBI's strict action against India's largest lender for THIS reason - DETAILS\nAccording to the central bank, HDFC Bank violated provisions of the Banking Regulation Act, 1949, and failed to comply with certain RBI directions.", 'metadata': {'title': "Rs 91 lakh PENALTY on HDFC Bank: RBI's strict action against India's 