In [2]:
#!pip install fastapi
#!pip install statsmodels

Impor Library

In [2]:
import os
import json
import math
import uuid
import time
import sqlite3
import tempfile
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
from dataclasses import dataclass, asdict

import numpy as np
import pandas as pd

# Tokenization
try:
    import tiktoken
    _TIKTOKEN_AVAILABLE = True
except Exception:
    _TIKTOKEN_AVAILABLE = False

# OpenAI
import openai

# FAISS
import faiss

# LangChain + SQLAlchemy + LLM wrappers
from sqlalchemy import create_engine, text
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.agents import create_sql_agent
from langchain_community.utilities.sql_database import SQLDatabase

# FastAPI for demo endpoints
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

# Plotly
import plotly.express as px
import plotly.io as pio

# Statsmodels
from statsmodels.tsa.seasonal import seasonal_decompose

# Load dotenv if present
from dotenv import load_dotenv
load_dotenv()


True

CONFIG

In [3]:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", None) 
if not OPENAI_API_KEY:
    raise EnvironmentError("Set OPENAI_API_KEY in environment variables")
openai.api_key = OPENAI_API_KEY

LLM config

In [4]:
LLM_MODEL_NAME = os.getenv("LLM_MODEL_NAME", "gpt-4o-mini")  # change to available model
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")

FAISS persist directory

In [5]:
FAISS_INDEX_DIR = Path("./faiss_data")
FAISS_INDEX_DIR.mkdir(exist_ok=True, parents=True)

UTIL: Tokenizer & chunker

In [6]:
def get_token_count(text: str, model: str = "gpt-4o-mini") -> int:
    if _TIKTOKEN_AVAILABLE:
        enc = tiktoken.encoding_for_model(model)
        return len(enc.encode(text))
    # fallback approx: 1 token ~ 4 chars
    return max(1, len(text) // 4)

In [7]:
def chunk_text_token_aware(text: str, min_tokens=300, max_tokens=500, model="gpt-4o-mini") -> List[str]:
    """
    Token-aware chunker: uses tiktoken when available, otherwise a simple word-based fallback.
    Keeps overlap to preserve context.
    """
    if _TIKTOKEN_AVAILABLE:
        enc = tiktoken.encoding_for_model(model)
        toks = enc.encode(text)
        chunks = []
        i = 0
        overlap = int(min_tokens * 0.1)
        while i < len(toks):
            end = min(i + max_tokens, len(toks))
            chunk_toks = toks[i:end]
            chunks.append(enc.decode(chunk_toks))
            if end == len(toks):
                break
            i = end - overlap
        return chunks
    else:
        words = text.split()
        chunks = []
        i = 0
        overlap = max(10, int(min_tokens*0.1))
        while i < len(words):
            end = min(i + max_tokens, len(words))
            chunks.append(" ".join(words[i:end]))
            if end == len(words):
                break
            i = end - overlap
        return chunks

DOC LOADER & METADATA

In [8]:
@dataclass
class DocChunk:
    id: str
    doc_id: str
    chunk_id: int
    text: str
    metadata: Dict[str, Any]

In [9]:
def load_text_docs_from_folder(folder: Path, pattern: str = "*.txt") -> List[Tuple[str,str]]:
    docs = []
    for f in folder.glob(pattern):
        docs.append((str(f), f.read_text(encoding="utf-8")))
    return docs

In [10]:
def create_chunks_from_doc(doc_id: str, text: str, min_tokens=300, max_tokens=500) -> List[DocChunk]:
    raw_chunks = chunk_text_token_aware(text, min_tokens=min_tokens, max_tokens=max_tokens)
    chunks = []
    for i, c in enumerate(raw_chunks):
        chunks.append(DocChunk(
            id=str(uuid.uuid4()),
            doc_id=doc_id,
            chunk_id=i,
            text=c,
            metadata={"doc_id": doc_id, "chunk_id": i}
        ))
    return chunks

EMBEDDING CLIENT

In [12]:
class EmbeddingClient:
    def __init__(self, model_name=EMBEDDING_MODEL):
        self.model = model_name

    def embed_batch(self, texts: List[str]) -> List[List[float]]:
        # OpenAI embeddings
        resp = openai.Embedding.create(model=self.model, input=texts)
        embeds = [r["embedding"] for r in resp["data"]]
        return embeds

VECTORSTORE: FAISS wrappe

In [14]:
class FaissVectorStore:
    def __init__(self, dim: int, persist_dir: Path = FAISS_INDEX_DIR, index_name: str = "default"):
        self.dim = dim
        self.index_name = index_name
        self.persist_dir = persist_dir
        self.index_path = persist_dir / f"{index_name}.index"
        self.id_to_meta_path = persist_dir / f"{index_name}_meta.json"

        # IndexFlatIP + L2-normalized embeddings for cosine similarity
        self.index = faiss.IndexFlatIP(dim)
        self.ids = []  # parallel list of ids
        self.metadict = {}

        if self.index_path.exists() and self.id_to_meta_path.exists():
            try:
                self.load()
            except Exception as e:
                print("Failed to load existing FAISS index:", e)

    def add(self, ids: List[str], vectors: np.ndarray, metadatas: List[dict]):
        assert vectors.shape[1] == self.dim
        # normalize
        faiss.normalize_L2(vectors)
        self.index.add(vectors)
        self.ids.extend(ids)
        for i, _id in enumerate(ids):
            self.metadict[_id] = metadatas[i]
        self._save()

    def search(self, qvec: np.ndarray, top_k=10) -> List[Tuple[str, float]]:
        # qvec expected shape (dim,)
        q = qvec.reshape(1, -1).astype('float32')
        faiss.normalize_L2(q)
        dists, idxs = self.index.search(q, top_k)
        results = []
        for score, idx in zip(dists[0], idxs[0]):
            if idx < 0 or idx >= len(self.ids):
                continue
            results.append((self.ids[idx], float(score)))
        return results

    def _save(self):
        faiss.write_index(self.index, str(self.index_path))
        with open(self.id_to_meta_path, "w", encoding="utf-8") as f:
            json.dump(self.metadict, f)

    def load(self):
        self.index = faiss.read_index(str(self.index_path))
        with open(self.id_to_meta_path, "r", encoding="utf-8") as f:
            self.metadict = json.load(f)
        self.ids = list(self.metadict.keys())

RETRIEVER: Dense + Keyword overlap hybrid

In [15]:
class HybridRetriever:
    def __init__(self, vectorstore: FaissVectorStore, embedding_client: EmbeddingClient):
        self.vs = vectorstore
        self.emb = embedding_client

    def _keyword_score(self, query: str, text: str) -> float:
        # cheap token overlap score normalized
        qset = set(query.lower().split())
        tset = set(text.lower().split())
        if not qset:
            return 0.0
        return len(qset.intersection(tset)) / len(qset)

    def retrieve(self, query: str, top_k=5, dense_k=30) -> List[Dict[str, Any]]:
        # 1. dense search
        q_embed = np.array(self.emb.embed_batch([query]), dtype='float32')
        qvec = q_embed[0]
        dense_results = self.vs.search(qvec, top_k=dense_k)  # returns (id, score)
        # build candidate list
        candidates = []
        for _id, score in dense_results:
            meta = self.vs.metadict.get(_id, {})
            text = meta.get("text", "")
            kw_score = self._keyword_score(query, text)
            combined_score = 0.7 * score + 0.3 * kw_score
            candidates.append({"id": _id, "score": combined_score, "meta": meta, "text": text})
        # rank and return top_k
        candidates = sorted(candidates, key=lambda x: x["score"], reverse=True)[:top_k]
        return candidates


RAG / RetrievalQA wrapper using LangChain

In [16]:
class RAGService:
    def __init__(self, retriever: HybridRetriever, llm_model_name=LLM_MODEL_NAME, temperature=0.0):
        self.retriever = retriever
        # wrap OpenAI chat model via LangChain
        self.llm = ChatOpenAI(model=llm_model_name, temperature=temperature)

    def answer(self, query: str, top_k=5) -> Dict[str, Any]:
        docs = self.retriever.retrieve(query, top_k=top_k)
        # convert to langchain Documents
        lc_docs = []
        for d in docs:
            metadata = d["meta"].copy()
            # include provenance
            provenance = {"id": d["id"], "score": d["score"]}
            metadata["provenance"] = provenance
            lc_docs.append(Document(page_content=d["text"], metadata=metadata))
        # Build a simple RetrievalQA chain
        # For large scale: use map_rerank or custom chain
        qa = RetrievalQA.from_chain_type(llm=self.llm, chain_type="map_rerank", retriever=None)
        # Since we don't use a langchain vectorstore retriever here, we directly run LLM with docs as context
        # Compose a prompt manually: system + context + user query
        context_text = "\n\n---\n\n".join([f"[{doc.metadata.get('provenance')}] {doc.page_content}" for doc in lc_docs])
        system_prompt = (
            "You are an expert data scientist and knowledge assistant. Use the provided context strictly, "
            "cite provenance in square brackets (doc_id,chunk_id). Answer concisely and show steps if applicable."
        )
        user_prompt = f"Context:\n{context_text}\n\nUser question: {query}\n\nAnswer with provenance."
        resp = self.llm.predict_messages([{"role":"system","content":system_prompt}, {"role":"user","content":user_prompt}])
        return {"answer": resp.content, "provenance": [doc.metadata.get("provenance") for doc in lc_docs], "docs": [asdict(d) for d in docs]}


SQL Agent & DB setup

In [17]:
class SQLAgentService:
    def __init__(self, db_engine):
        self.engine = db_engine
        self.db = SQLDatabase(self.engine)
        self.llm = ChatOpenAI(model=LLM_MODEL_NAME, temperature=0)

        # create agent
        self.agent = create_sql_agent(self.llm, self.db, verbose=False)

    def run_nl_query(self, nl: str) -> str:
        # produce SQL & run it via agent
        try:
            res = self.agent.run(nl)
            return res
        except Exception as e:
            return f"Error running NL->SQL agent: {e}"

Analytics Engine

In [18]:
class AnalyticsEngine:
    def __init__(self):
        pass

    def compute_kpis(self, df: pd.DataFrame) -> Dict[str, Any]:
        kpis = {}
        if "revenue" in df.columns:
            kpis["total_revenue"] = float(df["revenue"].sum())
            kpis["avg_revenue"] = float(df["revenue"].mean())
        if "user_id" in df.columns:
            kpis["unique_users"] = int(df["user_id"].nunique())
        return kpis

    def trend_and_decompose(self, df: pd.DataFrame, date_col: str, value_col: str, freq: str = "D"):
        df2 = df.copy()
        df2[date_col] = pd.to_datetime(df2[date_col])
        ts = df2.set_index(date_col).resample(freq).sum()[value_col].fillna(0)
        # decomposition
        decomposition = seasonal_decompose(ts, period=7, model="additive", extrapolate_trend='freq')
        # make plotly figure
        fig = px.line(ts.reset_index(), x=date_col, y=value_col, title=f"{value_col} trend")
        return {
            "decomposition": {
                "trend": decomposition.trend.dropna().to_dict(),
                "seasonal": decomposition.seasonal.dropna().to_dict(),
                "resid": decomposition.resid.dropna().to_dict()
            },
            "fig_json": fig.to_json()
        }

    def detect_anomalies_zscore(self, series: pd.Series, z_thresh: float = 3.0) -> List[Dict[str, Any]]:
        mean = series.mean()
        std = series.std(ddof=0)
        z = (series - mean) / (std + 1e-9)
        anomalies = series[abs(z) > z_thresh]
        return [{"index": str(idx), "value": float(val), "zscore": float(z.loc[idx])} for idx, val in anomalies.items()]

Demo app wiring (FastAPI)

In [19]:
app = FastAPI(title="Advanced RAG + Analytics Prototype")

 Global in-memory objects (for demo / POC)

In [20]:
EMB_CLIENT = EmbeddingClient()
DIM = 1536  # typical for many embed models; we'll infer later
FAISS_STORE: Optional[FaissVectorStore] = None
HYBRID_RETRIEVER: Optional[HybridRetriever] = None
RAG_SERVICE: Optional[RAGService] = None
SQL_AGENT: Optional[SQLAgentService] = None
ANALYTICS = AnalyticsEngine()

For demo, create a temporary sqlite DB and seed with sample data

In [21]:
def create_demo_sqlite(path="demo_data.db"):
    engine = create_engine(f"sqlite:///{path}")
    df = pd.DataFrame({
        "date": pd.date_range("2025-01-01", periods=120, freq="D"),
        "order_id": [f"ORD{1000+i}" for i in range(120)],
        "user_id": [f"U{(i%10)}" for i in range(120)],
        "product": ["A" if i%3==0 else "B" if i%3==1 else "C" for i in range(120)],
        "revenue": np.abs(np.random.randn(120).cumsum()) + 50
    })
    df.to_sql("sales", engine, if_exists="replace", index=False)
    return engine

@app.on_event("startup")
def startup_event():
    global FAISS_STORE, HYBRID_RETRIEVER, RAG_SERVICE, SQL_AGENT, DIM, EMB_CLIENT
    # set DIM by doing a dummy embedding of small text
    try:
        vec = EMB_CLIENT.embed_batch(["test"])
        DIM = len(vec[0])
        print("Embedding dim detected:", DIM)
    except Exception as e:
        print("Warning: embedding call failed at startup:", e)
        DIM = 1536
    FAISS_STORE = FaissVectorStore(dim=DIM, persist_dir=FAISS_INDEX_DIR, index_name="demo")
    HYBRID_RETRIEVER = HybridRetriever(FAISS_STORE, EMB_CLIENT)
    RAG_SERVICE = RAGService(HYBRID_RETRIEVER)
    engine = create_demo_sqlite()
    SQL_AGENT = SQLAgentService(engine)
    print("Startup complete. Services ready.")

# Pydantic models
class UploadDocRequest(BaseModel):
    doc_id: str
    text: str

class QueryRequest(BaseModel):
    query: str
    top_k: Optional[int] = 5

class NLSQLRequest(BaseModel):
    nl: str

class AnalyticsRequest(BaseModel):
    date_col: str
    value_col: str
    freq: Optional[str] = "D"

# Endpoints
@app.post("/upload_doc")
def upload_doc(req: UploadDocRequest):
    """Chunk, embed, and upsert to FAISS"""
    global EMB_CLIENT, FAISS_STORE
    chunks = create_chunks_from_doc(req.doc_id, req.text)
    texts = [c.text for c in chunks]
    meta = []
    ids = []
    try:
        embeds = EMB_CLIENT.embed_batch(texts)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Embedding failed: {e}")

    arr = np.array(embeds).astype("float32")
    for i, c in enumerate(chunks):
        c.metadata["text"] = c.text  # store the short text for cheap keyword scoring
        ids.append(c.id)
        meta.append(c.metadata)
    # init vectorstore dim check
    if FAISS_STORE.dim != arr.shape[1]:
        print("WARNING: embedding dim mismatch, recreating FAISS store")
        FAISS_STORE = FaissVectorStore(dim=arr.shape[1], persist_dir=FAISS_INDEX_DIR, index_name="demo")
    FAISS_STORE.add(ids, arr, meta)
    return {"status": "ok", "n_chunks": len(chunks)}

@app.post("/rag_query")
def rag_query(req: QueryRequest):
    global RAG_SERVICE
    return RAG_SERVICE.answer(req.query, top_k=req.top_k)

@app.post("/nl_sql")
def nl_sql(req: NLSQLRequest):
    global SQL_AGENT
    return {"result": SQL_AGENT.run_nl_query(req.nl)}

@app.post("/analytics/run")
def run_analytics(req: AnalyticsRequest):
    # For demo, read the sales table from demo sqlite and run analytics
    engine = SQL_AGENT.engine
    df = pd.read_sql("SELECT * FROM sales", engine)
    kpis = ANALYTICS.compute_kpis(df)
    dec = ANALYTICS.trend_and_decompose(df, req.date_col, req.value_col, freq=req.freq)
    # convert chart to JSON for client rendering
    return {"kpis": kpis, "decomposition": dec["decomposition"], "fig_json": dec["fig_json"]}

# Local run helper (seed docs)
def seed_demo_docs():
    demo_folder = Path("./demo_docs")
    demo_folder.mkdir(exist_ok=True)
    # create a few sample long docs
    doc1 = (
        "Billing policy: Refunds are allowed within 30 days of purchase with proof of purchase. "
        "For subscription churn, prorated refunds are applied. The finance team must approve exceptions. "
    ) * 50
    doc2 = (
        "Deployment notes: The ML model uses FAISS for vector search. Embeddings are generated via OpenAI. "
        "Indexing pipeline runs nightly and stores metadata including doc_id and chunk_id. "
    ) * 50
    (demo_folder / "billing.txt").write_text(doc1)
    (demo_folder / "infra.txt").write_text(doc2)
    return demo_folder

if __name__ == "__main__":
    # seed demo docs and upload them using the API programmatically
    # Start server with uvicorn
    demo_folder = seed_demo_docs()
    # start uvicorn server
    uvicorn.run("rag_analytics_pipeline:app", host="0.0.0.0", port=8000, reload=False)


        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("startup")


RuntimeError: asyncio.run() cannot be called from a running event loop