
# Agentic RAG End-to-End Demo (PDF vs SQL routing)

**Purpose:** A reproducible, self-contained Jupyter notebook that demonstrates an *agentic* RAG pipeline:
- Data preparation (SQL sample DB + sample PDF-like docs)
- Ingestion & chunking
- Embedding & vector index creation (FAISS with `sentence-transformers` embeddings)
- Separate VectorDBs: `pdf_index` and `sql_schema_index`
- Router agent (embedding-based; optional OpenAI LLM router)
- PDF retrieval pipeline (retrieve chunks → summarize/polish by LLM or fallback)
- SQL pipeline (retrieve schema snippets → generate SQL with LLM or fallback → execute → polish)
- Final orchestration that shows the difference between Traditional RAG and Agentic RAG

**Notes:** This notebook uses local embeddings (`sentence-transformers/all-MiniLM-L6-v2`) and FAISS so it runs offline. Optional OpenAI functionality is included (router, SQL generator, polish) — set `OPENAI_API_KEY` in your environment to enable those steps.


## 0) Install (one-time) and environment notes

Run the following in your environment if you don't have the required packages:

```bash
pip install sentence-transformers faiss-cpu numpy pandas python-dotenv openai reportlab python-docx PyMuPDF
# If faiss-cpu is not available for your platform, use chromadb instead.
```

Set `OPENAI_API_KEY` env var if you want to use OpenAI steps.

## 1) Imports and helper utilities

In [None]:

# Imports & helper functions
import os, sqlite3, json, uuid, textwrap, time
from pathlib import Path
from typing import List, Dict, Any
import numpy as np
import pandas as pd

# Embedding model
from sentence_transformers import SentenceTransformer

# FAISS (vector index)
import faiss

# small helpers
def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-10))

print("Imports OK")


## 2) Create sample SQL database (SQLite)

In [None]:

# Create a small demo SQLite DB with multiple tables and relationships
DB_PATH = "/mnt/data/demo_claims.db"
if os.path.exists(DB_PATH):
    os.remove(DB_PATH)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()

cur.executescript(\"\"\"
CREATE TABLE policy_header (
    policy_id INTEGER PRIMARY KEY,
    policy_holder TEXT,
    start_date TEXT,
    end_date TEXT
);

CREATE TABLE claim_header (
    claim_id INTEGER PRIMARY KEY,
    policy_id INTEGER,
    loss_date TEXT,
    claim_status TEXT,
    loss_amount REAL,
    FOREIGN KEY(policy_id) REFERENCES policy_header(policy_id)
);

CREATE TABLE transaction_log (
    transaction_id INTEGER PRIMARY KEY,
    claim_id INTEGER,
    payment_date TEXT,
    amount_paid REAL,
    FOREIGN KEY(claim_id) REFERENCES claim_header(claim_id)
);
\"\"\")

cur.executemany("INSERT INTO policy_header(policy_id, policy_holder, start_date, end_date) VALUES (?,?,?,?)", [
    (1, "Acme Corp", "2023-01-01", "2024-12-31"),
    (2, "John Doe", "2022-05-10", "2025-05-09"),
])
cur.executemany("INSERT INTO claim_header(claim_id, policy_id, loss_date, claim_status, loss_amount) VALUES (?,?,?,?,?)", [
    (101, 1, "2024-01-20", "open", 15000.0),
    (102, 2, "2024-03-05", "closed", 4000.0),
    (103, 1, "2024-06-10", "closed", 2200.0),
])
cur.executemany("INSERT INTO transaction_log(transaction_id, claim_id, payment_date, amount_paid) VALUES (?,?,?,?)", [
    (1001, 102, "2024-03-10", 4000.0),
    (1002, 103, "2024-06-15", 2200.0)
])
conn.commit()
print("Created demo DB at", DB_PATH)


## 3) Create sample document texts (PDF-like)

In [None]:

# We'll use simple .txt files to represent PDF contents for this demo
DOC_DIR = "/mnt/data/demo_docs"
os.makedirs(DOC_DIR, exist_ok=True)

policy_doc = \"\"\"Policy Document - Claims Process
1. Claim filing: The claimant should notify within 30 days.
2. Initial review: Documents collected and verified.
3. Field inspection: If damage is physical, an inspector visits within 5 days.
4. Approval & payment: Once approved, payment processed within 10 business days.
\"\"\"

notes_doc = \"\"\"Claims handling - internal notes
- For complex claims escalate to Senior Adjuster.
- Authentication & fraud checks necessary for claims > $10,000.
\"\"\"

with open(os.path.join(DOC_DIR, "policy_claims.txt"), "w") as f:
    f.write(policy_doc)
with open(os.path.join(DOC_DIR, "notes_claims.txt"), "w") as f:
    f.write(notes_doc)

print("Wrote demo docs to", DOC_DIR)
for p in os.listdir(DOC_DIR):
    print("-", p)


## 4) Chunking function (split long text into overlapping chunks)

In [None]:

def chunk_text(text: str, chunk_size_words: int = 150, overlap_words: int = 30) -> List[str]:
    words = text.split()
    chunks = []
    i = 0
    while i < len(words):
        chunk = " ".join(words[i:i+chunk_size_words])
        chunks.append(chunk)
        i += chunk_size_words - overlap_words
    return chunks

# quick test
print(chunk_text(' '.join(['word']*400), chunk_size_words=100, overlap_words=20)[:2])


## 5) Load embedding model (sentence-transformers)

In [None]:

EMBED_MODEL_NAME = "all-MiniLM-L6-v2"
print("Loading embedding model:", EMBED_MODEL_NAME)
embedder = SentenceTransformer(EMBED_MODEL_NAME)

def embed_texts(texts: List[str]):
    embs = embedder.encode(texts, convert_to_numpy=True, show_progress_bar=False, normalize_embeddings=True)
    return embs
print("Embedding model ready.")


## 6) FAISS index wrapper (store embeddings + metadata)

In [None]:

class FaissIndex:
    def __init__(self, dim: int):
        self.dim = dim
        self.index = faiss.IndexFlatIP(dim)  # inner product on normalized vectors => cosine similarity
        self.metadatas = []

    def add(self, embeddings: np.ndarray, metadatas: List[dict]):
        if embeddings.ndim == 1:
            embeddings = embeddings.reshape(1, -1)
        self.index.add(embeddings.astype('float32'))
        self.metadatas.extend(metadatas)

    def search(self, query_emb: np.ndarray, top_k: int = 5):
        if query_emb.ndim == 1:
            query_emb = query_emb.reshape(1, -1)
        D, I = self.index.search(query_emb.astype('float32'), top_k)
        out = []
        for ids, scores in zip(I, D):
            hits = []
            for idx, score in zip(ids, scores):
                if idx < 0 or idx >= len(self.metadatas):
                    continue
                md = dict(self.metadatas[idx])
                md['_score'] = float(score)
                md['_id'] = int(idx)
                hits.append(md)
            out.append(hits)
        return out


## 7) Ingest docs → chunk → embed → build `pdf_index`

In [None]:

# Read docs, chunk, embed, add to index
pdf_index = None
all_chunks = []
for fname in os.listdir(DOC_DIR):
    path = os.path.join(DOC_DIR, fname)
    with open(path, 'r') as f:
        txt = f.read()
    chunks = chunk_text(txt, chunk_size_words=120, overlap_words=30)
    embs = embed_texts(chunks)
    if pdf_index is None:
        pdf_index = FaissIndex(dim=embs.shape[1])
    metas = [{"source":"pdf", "doc": fname, "chunk": i, "text": chunks[i]} for i in range(len(chunks))]
    pdf_index.add(embs, metas)
    all_chunks.extend(metas)

print("PDF index built — total chunks:", len(pdf_index.metadatas))


## 8) Extract SQL schema metadata → create textual docs → embed → build `sql_index`

In [None]:

def extract_schema_docs(sql_conn: sqlite3.Connection) -> List[dict]:
    cur = sql_conn.cursor()
    cur.execute(\"\"\"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';\"\"\")
    tables = [r[0] for r in cur.fetchall()]
    docs = []
    for table in tables:
        cur.execute(f"PRAGMA table_info({table});")
        cols = cur.fetchall()
        col_lines = []
        for cid, name, ctype, notnull, dflt, pk in cols:
            # sample values (non-exhaustive)
            try:
                cur.execute(f"SELECT {name} FROM {table} WHERE {name} IS NOT NULL LIMIT 3;")
                sample = [str(r[0]) for r in cur.fetchall()]
            except Exception:
                sample = []
            col_lines.append(f"- {name} ({ctype}) sample: {', '.join(sample)}")
        cur.execute(f"PRAGMA foreign_key_list({table});")
        fkeys = cur.fetchall()
        fk_lines = []
        for fk in fkeys:
            # fk format: (id, seq, table, from, to, on_update, on_delete, match)
            fk_lines.append(f"- {fk[3]}.{fk[4]} -> {fk[2]}.{fk[4]}")
        text = f\"Table: {table}\\nColumns:\\n\" + \"\\n\".join(col_lines)
        if fk_lines:
            text += "\\nForeignKeys:\\n" + "\\n".join(fk_lines)
        docs.append({"table": table, "text": text})
    return docs

sql_schema_docs = extract_schema_docs(conn)
schema_texts = [d["text"] for d in sql_schema_docs]
schema_embs = embed_texts(schema_texts)
sql_index = FaissIndex(dim=schema_embs.shape[1])
metas = [{"source":"sql_schema", "table": sql_schema_docs[i]["table"], "text": schema_texts[i]} for i in range(len(schema_texts))]
sql_index.add(schema_embs, metas)
print("Built SQL schema index for tables:", [m['table'] for m in metas])


## 9) Router agent — embedding classifier (fast offline) + optional OpenAI router

In [None]:

# Prototype texts for 'pdf' vs 'sql' routing
proto_texts = [
    ("pdf", "Where is the claims filing process described in the policy document?"),
    ("sql", "Give total paid amount per policy for 2024; aggregate by policy")
]
proto_embs = embed_texts([p[1] for p in proto_texts])
proto_labels = [p[0] for p in proto_texts]

def route_with_embedding(query: str) -> str:
    q_emb = embed_texts([query])[0]
    sims = [cosine_sim(q_emb, pe) for pe in proto_embs]
    best = int(np.argmax(sims))
    return proto_labels[best]

# Optional OpenAI router (requires OPENAI_API_KEY)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
def route_with_openai(query: str) -> str:
    if not OPENAI_API_KEY:
        raise RuntimeError("OPENAI_API_KEY not set")
    import openai
    openai.api_key = OPENAI_API_KEY
    prompt = f\"\"\"You are a router. Decide ONLY one label for the user question: 'pdf' or 'sql'. Provide only the label.
User question: {query}
\"\"\"
    resp = openai.ChatCompletion.create(model="gpt-4o-mini", messages=[{"role":"user","content":prompt}], temperature=0)
    label = resp.choices[0].message.content.strip().lower()
    return label

# quick tests
print("Route test 1:", route_with_embedding("How does the claims filing process work?"))
print("Route test 2:", route_with_embedding("Show total paid amount per policy for 2024"))


## 10) PDF retrieval pipeline (retrieve top chunks → (optional) LLM summarize/polish)

In [None]:

def pdf_pipeline(query: str, top_k: int = 3, use_openai=False) -> str:
    q_emb = embed_texts([query])
    hits = pdf_index.search(q_emb, top_k=top_k)[0]
    context = "\n\n".join([f\"[doc:{h['doc']}|chunk:{h['chunk']}] {h['text']}\" for h in hits])
    if use_openai and OPENAI_API_KEY:
        import openai
        openai.api_key = OPENAI_API_KEY
        prompt = f\"\"\"You are an assistant. Use the following context (from documents) to answer the user's question in a concise, client-ready way. Include citations to documents where appropriate.\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:\"\"\"
        resp = openai.ChatCompletion.create(model="gpt-4o-mini", messages=[{"role":"user","content":prompt}], temperature=0)
        return resp.choices[0].message.content
    else:
        return "RETRIEVED (top chunks):\\n\\n" + context

# demo
print(pdf_pipeline("How does the claims filing process work?", top_k=2, use_openai=False)[:500])


## 11) SQL pipeline — retrieve schema snippets → generate SQL → execute → polish

In [None]:

def retrieve_relevant_schema(query: str, top_k: int = 3):
    q_emb = embed_texts([query])
    hits = sql_index.search(q_emb, top_k=top_k)[0]
    return hits

def generate_sql_fallback(query: str, schema_hits: List[dict]) -> str:
    # Simple heuristic SQL generator for demo purposes (handles aggregate payment per policy example)
    text = " ".join([h["text"] for h in schema_hits]).lower()
    if "total" in query.lower() and ("amount" in query.lower() or "paid" in query.lower()):
        # use known table names from our demo DB
        sql = \"\"\"SELECT p.policy_holder, SUM(t.amount_paid) as total_paid
FROM transaction_log t
JOIN claim_header c ON t.claim_id = c.claim_id
JOIN policy_header p ON c.policy_id = p.policy_id
WHERE c.loss_date LIKE '2024-%'
GROUP BY p.policy_holder
ORDER BY total_paid DESC;\"\"\"
        return sql
    return "SELECT * FROM claim_header LIMIT 5;"

def generate_sql_with_openai(query: str, schema_snippets: List[str]) -> str:
    if not OPENAI_API_KEY:
        raise RuntimeError("OPENAI_API_KEY not set")
    import openai
    openai.api_key = OPENAI_API_KEY
    schema_context = "\n\n".join(schema_snippets)
    prompt = f\"\"\"You are an SQL generator for SQLite. Given the schema snippets below, produce a syntactically correct SQL query (ONLY SQL) that answers the user's question. Use only tables/columns from the schema snippets.\n\nSchema:\n{schema_context}\n\nUser question: {query}\n\nSQL:\"\"\"
    resp = openai.ChatCompletion.create(model="gpt-4o-mini", messages=[{"role":"user","content":prompt}], temperature=0)
    return resp.choices[0].message.content.strip()

def execute_sql(conn: sqlite3.Connection, sql: str) -> pd.DataFrame:
    try:
        df = pd.read_sql_query(sql, conn)
        return df
    except Exception as e:
        return pd.DataFrame({"error":[str(e)], "sql":[sql]})

def sql_pipeline(query: str, use_openai_sql=False):
    schema_hits = retrieve_relevant_schema(query, top_k=5)
    schema_snips = [h["text"] for h in schema_hits]
    # Choose SQL generator
    if use_openai_sql and OPENAI_API_KEY:
        sql = generate_sql_with_openai(query, schema_snips)
    else:
        sql = generate_sql_fallback(query, schema_hits)
    df = execute_sql(conn, sql)
    # Polishing
    if OPENAI_API_KEY:
        import openai
        openai.api_key = OPENAI_API_KEY
        context = f\"SQL:\\n{sql}\\n\\nResults:\\n{df.head(20).to_string(index=False)}\"
        prompt = f\"Polish the SQL results into a user-friendly summary. Highlight anomalies and key numbers.\\n\\n{context}\\n\\nSummary:\"
        resp = openai.ChatCompletion.create(model="gpt-4o-mini", messages=[{"role":"user","content":prompt}], temperature=0)
        polished = resp.choices[0].message.content
    else:
        polished = df.head(10).to_string(index=False)
    return {"sql": sql, "raw": df, "polished": polished}

# Demo SQL pipeline
demo_q = "Show total paid amount per policy for claims opened in 2024"
out = sql_pipeline(demo_q, use_openai_sql=False)
print("Generated SQL:\\n", out["sql"])
print("\\nPolished / result preview:\\n", out["polished"][:800])


## 12) Orchestration: routing → appropriate pipeline → final output

This cell demonstrates the full orchestration and compares **Traditional RAG** (single vector DB search) vs **Agentic RAG** (router → specialized indexes).

In [None]:

def traditional_rag(query: str):
    # naive single-index approach: search PDF index and SQL schema index combined (here we simulate mixing by searching pdf_index only)
    q_emb = embed_texts([query])
    pdf_hits = pdf_index.search(q_emb, top_k=3)[0]
    context = "\n\n".join([f\"[doc:{h['doc']}] {h['text']}\" for h in pdf_hits])
    return {"route":"traditional", "context": context}

def agentic_rag(query: str, prefer_openai_router=False, prefer_openai_sql=False, prefer_openai_pdf=False):
    # routing
    try:
        if prefer_openai_router and OPENAI_API_KEY:
            route = route_with_openai(query)
        else:
            route = route_with_embedding(query)
    except Exception:
        route = route_with_embedding(query)
    route = route.lower()
    if route == "pdf":
        answer = pdf_pipeline(query, top_k=3, use_openai=prefer_openai_pdf and bool(OPENAI_API_KEY))
        return {"route":"pdf", "answer": answer}
    elif route == "sql":
        sql_out = sql_pipeline(query, use_openai_sql=prefer_openai_sql and bool(OPENAI_API_KEY))
        return {"route":"sql", "sql": sql_out["sql"], "result": sql_out["polished"], "raw_df": sql_out["raw"]}
    else:
        return {"route":"unknown", "answer":"Could not route the query."}

# Try examples
q_pdf = "How does the claims filing process work according to the policy?"
q_sql = "Show total paid amount per policy for claims opened in 2024"

print('--- Traditional RAG (naive) for PDF question ---')
t1 = traditional_rag(q_pdf)
print(t1["context"][:800])

print('\\n--- Agentic RAG for PDF question ---')
a1 = agentic_rag(q_pdf)
print('Routed to:', a1['route'])
print(a1['answer'][:800])

print('\\n--- Agentic RAG for SQL question ---')
a2 = agentic_rag(q_sql)
print('Routed to:', a2['route'])
print('SQL used:\\n', a2.get('sql','(n/a)'))
print('Result summary:\\n', a2.get('result')[:800] if isinstance(a2.get('result'), str) else a2.get('result').head().to_string())



## 13) Teaching notes & what to explain to your junior

- **Why separate indexes?** PDFs vs SQL schema have different chunking and retrieval needs. Keeping them separate avoids noisy matches and allows tailored chunking/embedding.
- **Schema-first approach:** SQL agent first retrieves *schema snippets* — this prevents hallucinated column/table names and enables safe SQL generation.
- **Safety:** Always validate generated SQL (allowlist SELECTs, use read-only roles, run EXPLAIN or dry-run). For production, prefer a SQL generator LLM prompt that restricts to given schema and forbids destructive statements.
- **Polish step:** Never send raw DB rows to users; send a human-friendly summary. The polish step reduces cognitive load and provides context.
- **Extensibility:** You can add more indexing spaces (images, code repos, APIs) and extend the router to more classes.


## 14) Save & download

The notebook saves the demo DB and any generated outputs under `/mnt/data`. You can download this notebook file from the environment.