In [1]:
from __future__ import annotations
import os, json, numpy as np, pandas as pd
from pathlib import Path
from typing import Dict, Any, List
from sentence_transformers import SentenceTransformer
import faiss  # type: ignore
_FAISS_OK = True


class RetrieverIndex:
    """
    Loads a FAISS index if possible; otherwise loads/creates a local NumPy index:
      - uses documents.parquet and metadata.parquet
      - builds/saves embeddings.npz on first run if not present
    """
    def __init__(self, store_dir: Path, embed_model_name: str):
        self.store_dir = Path(store_dir)
        self.embedder = SentenceTransformer(embed_model_name, device="cpu")

        self.index_faiss = self.store_dir / "index.faiss"
        self.meta_path   = self.store_dir / "metadata.parquet"
        self.docs_path   = self.store_dir / "documents.parquet"
        self.emb_npz     = self.store_dir / "embeddings.npz"  # optional

        if not self.meta_path.exists() or not self.docs_path.exists():
            raise FileNotFoundError(
                f"Missing metadata/documents parquet in: {self.store_dir}\n"
                f"Expected files: {self.meta_path.name}, {self.docs_path.name}"
            )

        self.meta = pd.read_parquet(self.meta_path)
        self.docs = pd.read_parquet(self.docs_path)

        # Choose backend
        self.backend = "faiss" if _FAISS_OK and self.index_faiss.exists() else "numpy"

        if self.backend == "faiss":
            self.index = faiss.read_index(str(self.index_faiss))
            # Ensure ids column exists in meta/docs
            if "id" not in self.meta.columns or "id" not in self.docs.columns:
                raise ValueError("Parquet metadata/documents must contain an 'id' column aligned to FAISS IDs.")
        else:
            # NumPy fallback: we need embeddings matrix
            if self.emb_npz.exists():
                self.emb = np.load(self.emb_npz)["emb"].astype(np.float32)
            else:
                # Build embeddings once, save to disk
                texts = self.docs.sort_values("id")["text"].tolist()
                embs: List[np.ndarray] = []
                B = 512
                for i in range(0, len(texts), B):
                    batch = self.embedder.encode(
                        texts[i:i+B],
                        batch_size=64,
                        normalize_embeddings=True,
                        convert_to_numpy=True,
                        show_progress_bar=False,
                    )
                    embs.append(batch.astype(np.float32))
                self.emb = np.vstack(embs)
                np.savez_compressed(self.emb_npz, emb=self.emb)

            # Align meta/docs by id ascending
            self.meta = self.meta.sort_values("id").reset_index(drop=True)
            self.docs = self.docs.sort_values("id").reset_index(drop=True)

    def search(self, query: str, k: int = 10) -> List[Dict[str, Any]]:
        q = self.embedder.encode([query], normalize_embeddings=True, convert_to_numpy=True).astype(np.float32)
        if self.backend == "faiss":
            sims, ids = self.index.search(q, k)
            sims, ids = sims[0], ids[0]
            rows = []
            id_to_row = self.meta.set_index("id")
            txt_map = self.docs.set_index("id")["text"]
            for s, i in zip(sims, ids):
                md = id_to_row.loc[int(i)].to_dict()
                md["score"] = float(s)
                md["text"] = txt_map.loc[int(i)][:240]
                rows.append(md)
            return rows
        else:
            # cosine with normalized vectors => inner product
            sims = (self.emb @ q[0])
            idx = np.argpartition(-sims, k)[:k]
            top = idx[np.argsort(-sims[idx])]
            rows = []
            for i in top:
                md = self.meta.iloc[int(i)].to_dict()
                md["score"] = float(sims[i])
                md["text"] = self.docs.iloc[int(i)]["text"][:240]
                rows.append(md)
            return rows

    def info(self) -> str:
        n = len(self.meta)
        return f"RetrieverIndex(backend={self.backend}, vectors={n})"

  from tqdm.autonotebook import tqdm, trange


In [4]:
import time
from typing import List, Dict, Any
import yfinance as yf
import feedparser

from config import VECTOR_FAISS_DIR, EMBEDDING_MODEL, TOP_K

class DataAgent:
    def __init__(self):
        self.index = RetrieverIndex(VECTOR_FAISS_DIR, EMBEDDING_MODEL)

    # ---------- Vector retrieval ----------
    def retrieve(self, query: str, k: int = TOP_K) -> List[Dict[str, Any]]:
        return self.index.search(query, k=k)

    # ---------- Live prices ----------
    def fetch_prices(self, symbols: List[str], period: str = "1mo", interval: str = "1d") -> Dict[str, List[Dict[str, Any]]]:
        out: Dict[str, List[Dict[str, Any]]] = {}
        for s in symbols:
            try:
                df = yf.Ticker(s).history(period=period, interval=interval, auto_adjust=False)
                if df.empty: continue
                df = df.rename_axis("date").reset_index()
                df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None)
                df = df.rename(columns={
                    "Open":"open","High":"high","Low":"low","Close":"close",
                    "Adj Close":"adj_close","Volume":"volume"
                })
                if "adj_close" not in df.columns and "close" in df.columns:
                    df["adj_close"] = df["close"]
                out[s] = df[["date","open","high","low","close","adj_close","volume"]].tail(5).to_dict(orient="records")
                time.sleep(0.15)
            except Exception:
                pass
        return out

    # ---------- Google News RSS ----------
    @staticmethod
    def _rss_url(q: str, hl="en-IN", gl="IN", ceid="IN:en") -> str:
        from urllib.parse import quote_plus
        return f"https://news.google.com/rss/search?q={quote_plus(q)}&hl={hl}&gl={gl}&ceid={ceid}"

    def fetch_rss(self, queries: List[str], per_query_limit: int = 30, sleep_s: float = 0.25) -> List[Dict[str, Any]]:
        items, seen = [], set()
        for q in queries:
            feed_url = self._rss_url(q)
            feed = feedparser.parse(feed_url)
            entries = feed.entries[:per_query_limit] if getattr(feed, "entries", None) else []
            for e in entries:
                link = (e.get("link") or "").strip()
                if not link or link in seen: continue
                seen.add(link)
                items.append({
                    "title": (e.get("title") or "").strip(),
                    "link": link,
                    "published": (e.get("published") or e.get("updated") or "").strip(),
                    "source_feed": feed_url,
                    "query": q,
                })
            time.sleep(sleep_s)
        return items

    # ---------- One-call pipeline ----------
    def _latest_summary(self, df: pd.DataFrame) -> dict:
        s = df.copy()
        s["date"] = pd.to_datetime(s["date"]).dt.tz_localize(None)
        s = s.sort_values("date")
        last = s.iloc[-1]
        prev = s.iloc[-2] if len(s) > 1 else last
        # realized vol over last 10 closes (simple annualization with sqrt(252))
        closes = s["close"].astype(float).tail(10).pct_change().dropna()
        vol10 = float(closes.std() * (252 ** 0.5)) if len(closes) > 1 else None
        pct = float((last["close"] - prev["close"]) / prev["close"]) if prev["close"] else 0.0
        return {
            "as_of": last["date"].isoformat(),
            "latest_close": float(last["close"]),
            "pct_change_1d": pct,
            "realized_vol_10d": vol10,
            "vendor": "yfinance"
        }

    def _trim_text(self, txt: str, max_tokens: int, tokenizer) -> str:
        if not max_tokens: 
            return txt
        ids = tokenizer.encode(txt, add_special_tokens=False)
        if len(ids) <= max_tokens: 
            return txt
        keep = tokenizer.decode(ids[:max_tokens], skip_special_tokens=True)
        return keep

    def run_pipeline(
        self,
        user_query: str,
        tickers: List[str],
        rss_queries: List[str],
        k: int = TOP_K,
        limit_tokens_for_evidence: int = 256,
        max_latency_seconds: int = 8
    ) -> Dict[str, Any]:
        import time, math
        from transformers import AutoTokenizer
        t0 = time.time()
        tok = AutoTokenizer.from_pretrained(EMBEDDING_MODEL)

        t1 = time.time()
        evidence = self.retrieve(user_query, k=k)
        # enrich and trim evidence
        ev_out = []
        for e in evidence:
            ev_out.append({
                "id": int(e.get("id", e.get("chunk", 0))) if isinstance(e, dict) else None,
                "external_id": f"{e.get('url','')}|{e.get('chunk',0)}",
                "url": e.get("url",""),
                "title": e.get("title",""),
                "published": e.get("published",""),
                "domain": e.get("domain",""),
                "score": float(e.get("score", 0.0)),
                "chunk": int(e.get("chunk", 0)),
                "text": self._trim_text(e.get("text",""), limit_tokens_for_evidence, tok)
            })
        t2 = time.time()
        prices_raw = self.fetch_prices(tickers)
        market = {"symbols": {}, "timeseries": {}}
        for sym, rows in prices_raw.items():
            df = pd.DataFrame(rows)
            if not df.empty:
                market["symbols"][sym] = self._latest_summary(df)
                # keep a small tail for context, already ISO in your fetcher
                market["timeseries"][sym] = df.tail(5).to_dict(orient="records")
        t3 = time.time()
        headlines = self.fetch_rss(rss_queries)
        t4 = time.time()

        bundle = {
            "query": {
                "text": user_query,
                "timestamp": pd.Timestamp.now(tz="Asia/Kolkata").isoformat()
            },
            "evidence": ev_out,
            "market": market,
            "news": {
                "rss": headlines,
                "source": "GoogleNewsRSS"
            },
            "diagnostics": {
                "index_backend": self.index.backend if hasattr(self, "index") else "unknown",
                "vectors": len(getattr(self.index, "meta", [])) if hasattr(self.index, "meta") else None,
                "timing_ms": {
                    "total": int((t4 - t0) * 1000),
                    "retrieve": int((t2 - t1) * 1000),
                    "prices": int((t3 - t2) * 1000),
                    "rss": int((t4 - t3) * 1000)
                },
                "errors": []
            }
        }
        # optional persistence for reproducibility
        try:
            from pathlib import Path
            import json, re
            slug = re.sub(r"[^a-z0-9]+", "-", user_query.lower()).strip("-")[:60]
            runs_dir = Path("runs"); runs_dir.mkdir(exist_ok=True)
            out_path = runs_dir / f"{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')}_{slug}.json"
            with open(out_path, "w", encoding="utf-8") as f:
                json.dump(bundle, f, ensure_ascii=False, indent=2)
            bundle["diagnostics"]["persisted"] = str(out_path)
        except Exception as ex:
            bundle["diagnostics"]["errors"].append(f"persist_fail: {ex}")
        return bundle

In [8]:
from config import NSE_TICKERS, RSS_QUERIES

agent = DataAgent()
print(agent.index.info())

user_query = "Should I buy Infosys stocks?"
bundle = agent.run_pipeline(user_query, tickers=NSE_TICKERS, rss_queries=RSS_QUERIES, k=10)

print("\n=== Retrieval (top chunks) ===")
evidence = bundle.get("evidence", bundle.get("retrieval", []))  # fallback if you still return old key
for i, hit in enumerate(evidence[:5], 1):
    score = hit.get("score", 0.0)
    title = hit.get("title", "")
    url = hit.get("url", "")
    text = (hit.get("text", "") or "")[:220].replace("\n", " ")
    print(f"[{i}] score={score:.4f} | {title}")
    print(url)
    print(text + "...\n")

print("=== Prices (last 5 rows) ===")
market = bundle.get("market", {})
timeseries = market.get("timeseries", bundle.get("prices", {}))  # fallback to old key
for sym, rows in timeseries.items():
    print(sym, "→", rows)

print("\n=== RSS (sample) ===")
news = bundle.get("news", {})
rss = news.get("rss", bundle.get("rss", []))  # fallback to old key
for h in rss[:5]:
    print("-", h.get("title", ""), "->", h.get("link", ""))

print("\n=== Diagnostics ===")
print(bundle.get("diagnostics", {}))

RetrieverIndex(backend=faiss, vectors=3544)

=== Retrieval (top chunks) ===
[1] score=0.7860 | infosys share - Infosys down 3 % today ; should you buy this stock ahead of Q4 results on April 17 ? - Infosys down 3 % today ; should you buy this stock ahead of Q4 results on April 17 ? BusinessToday
https://www.businesstoday.in/markets/stocks/story/infosys-down-3-today-should-you-buy-this-stock-ahead-of-q4-results-on-april-17-470106-2025-04-01
infosys share - Infosys down 3 % today ; should you buy this stock ahead of Q4 results on April 17 ? - Infosys down 3 % today ; should you buy this stock ahead of Q4 results on April 17 ? BusinessToday...

[2] score=0.7503 | Infosys plunges 16 % in 2025 : 5 key factors investors need to know ahead of Q4 earnings
https://www.financialexpress.com/shorts/market/infosys-plunges-16-in-2025-5-key-factors-investors-need-to-know-ahead-of-q4-earnings-3779157/
Infosys plunges 16 % in 2025 : 5 key factors investors need to know ahead of Q4 earnings...

[3] scor