# State-of-the-Art RAG Pipeline: From Document to Dialogue

This notebook demonstrates a complete, advanced Retrieval-Augmented Generation (RAG) pipeline. We will take a raw document (like a PDF insurance policy), process it, store its knowledge permanently, and then build an intelligent query engine that can answer questions about it.

**Key Features:**
- **Persistent Knowledge:** Uses `ChromaDB` to store document knowledge so it's not forgotten.
- **Hybrid Search:** Combines traditional keyword search (`BM25`) with modern semantic search for the best of both worlds.
- **Advanced Reranking:** Uses a `Cross-Encoder` to deeply analyze initial search results and find the absolute best context.
- **Intelligent Routing:** Can determine if a question is related to the document or is a general knowledge query.

## Step 1: Installing Dependencies

First, we need to install all the necessary Python libraries. We'll use a virtual environment to keep our project dependencies clean.

In [17]:
%pip install jupyter notebook ipywidgets
%pip install fitz PyMuPDF python-docx
%pip install sentence-transformers transformers
%pip install rank_bm25
%pip install chromadb
%pip install ollama
%pip install PyMuPDF

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Collecting fitz
  Using cached fitz-0.0.1.dev2-py2.py3-none-any.whl (20 kB)
Installing collected packages: fitz
Successfully installed fitz-0.0.1.dev2
Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


## Step 2: Imports and Global Setup

Now, we import all the libraries and load our AI models into memory. We also set up the paths for our persistent ChromaDB database. Loading the models once at the start is crucial for performance.

In [18]:
# --- Core Libraries ---
import pymupdf
import docx
import re
import os
import pickle
import json
from typing import List, Dict, Tuple

# --- AI & NLP Libraries ---
import numpy as np
import ollama
import chromadb
from sentence_transformers import SentenceTransformer, CrossEncoder
from rank_bm25 import BM25Okapi
from transformers import AutoTokenizer

# --- Global Setup ---
print("Loading models into memory... This might take a moment.")
TOKENIZER = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
EMBEDDING_MODEL = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
RERANKER = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
OLLAMA_MODEL = 'phi3'

# --- Database Setup ---
DB_PATH = "./rag_database"
os.makedirs(DB_PATH, exist_ok=True)
CHROMA_CLIENT = chromadb.PersistentClient(path=DB_PATH)
BM25_INDEX_PATH = os.path.join(DB_PATH, "bm25_index.pkl")
METADATA_STORE_PATH = os.path.join(DB_PATH, "metadata_store.pkl")
COLLECTION_NAME = "documents"
print("✅ Models and database ready.")

Loading models into memory... This might take a moment.
✅ Models and database ready.


## Step 3: Helper Functions (The RAG Toolkit)

These are the core functions that perform the heavy lifting of our pipeline. Each function has a specific job, from extracting text to generating an answer.

In [19]:
def extract_pdf_with_structure(pdf_path: str) -> List[Dict]:
    """Opens a PDF and extracts text, keeping track of pages and headings."""
    doc = pymupdf.open(pdf_path)
    results = []
    for pageno in range(doc.page_count):
        page = doc.load_page(pageno)
        blocks = page.get_text("dict")["blocks"]
        for b_idx, block in enumerate(blocks):
            if block.get("lines"):
                lines = [span.get("text", "") for line in block.get("lines", []) for span in line.get("spans", [])]
                text = " ".join(lines).strip()
                if text:
                    heading = text if len(text) < 120 and (text.isupper() or text.endswith(":")) else None
                    results.append({"text": text, "page": pageno + 1, "block": b_idx, "heading": heading})
    return results

def extract_docx_with_structure(docx_path: str) -> List[Dict]:
    """Opens a .docx file and extracts paragraphs, checking for heading styles."""
    doc = docx.Document(docx_path)
    results = []
    for i, para in enumerate(doc.paragraphs):
        text = para.text.strip()
        if text:
            style = para.style.name.lower() if para.style else ""
            heading = text if "heading" in style else None
            results.append({"text": text, "para_idx": i, "heading": heading})
    return results

def semantic_chunker(structured_parts: List[Dict], max_tokens=350, overlap_tokens=50) -> List[Dict]:
    """Breaks down the extracted text into smart, overlapping chunks."""
    CLAUSE_RE = re.compile(r'^\s*\d+(\.\d+){0,}\s+')
    chunks, buffer, buffer_meta = [], "", {"pages": set(), "headings": [], "sources": []}
    def flush_buffer():
        nonlocal buffer, buffer_meta
        if not buffer.strip(): return
        chunks.append({"text": buffer.strip(), "pages": sorted(list(buffer_meta["pages"])), "headings": list(dict.fromkeys(h for h in buffer_meta["headings"] if h)), "sources": buffer_meta["sources"]})
        toks = TOKENIZER.encode(buffer)
        overlap_toks = toks[-overlap_tokens:] if len(toks) > overlap_tokens else []
        buffer = TOKENIZER.decode(overlap_toks) if overlap_toks else ""
        buffer_meta = {"pages": set(), "headings": [], "sources": []}
    for part in structured_parts:
        text = part["text"]
        part_meta = {"page": part.get("page", 0), "heading": part.get("heading")}
        if part.get("heading") or CLAUSE_RE.match(text):
            if len(TOKENIZER.encode(buffer)) > overlap_tokens: flush_buffer()
        if len(TOKENIZER.encode(buffer + " " + text)) > max_tokens: flush_buffer()
        buffer += (" " + text)
        buffer_meta["pages"].add(part_meta["page"])
        if part_meta["heading"]: buffer_meta["headings"].append(part_meta["heading"])
        buffer_meta["sources"].append((part_meta["page"], part.get("block", part.get("para_idx", 0))))
    if buffer.strip(): flush_buffer()
    return chunks

def embed_texts(texts: List[str]) -> List[List[float]]:
    """Turns text into numerical vectors (embeddings)."""
    embs = EMBEDDING_MODEL.encode(texts, show_progress_bar=False, convert_to_numpy=True)
    norms = np.linalg.norm(embs, axis=1, keepdims=True)
    norms[norms == 0] = 1e-12
    return (embs / norms).tolist()

def hybrid_retrieve(query: str, collection, bm25, metadata_store) -> Tuple[List, float]:
    """Performs hybrid search and reranking to find the best context."""
    num_candidates = 25
    tokenized_q = TOKENIZER.tokenize(query)
    bm25_scores = bm25.get_scores(tokenized_q)
    bm25_doc_indices = np.argsort(bm25_scores)[::-1][:num_candidates]
    bm25_texts = [metadata_store[str(i)]['text'] for i in bm25_doc_indices]

    query_embedding = embed_texts([query])[0]
    results = collection.query(query_embeddings=[query_embedding], n_results=num_candidates, include=["documents"])
    semantic_texts = results["documents"][0]
    
    combined_texts = list(dict.fromkeys(semantic_texts + bm25_texts))
    if not combined_texts: return [], 0.0

    pairs = [(query, t) for t in combined_texts]
    scores = RERANKER.predict(pairs, show_progress_bar=False)
    
    ranked_results = sorted(zip(combined_texts, scores), key=lambda x: x[1], reverse=True)
    final_chunks = [(i, text, score) for i, (text, score) in enumerate(ranked_results)]
    top_score = float(final_chunks[0][2]) if final_chunks else 0.0
    return final_chunks[:8], top_score

def llm_call_fn(prompt: str) -> Dict:
    """Calls the local LLM and ensures a JSON response."""
    try:
        response = ollama.chat(model=OLLAMA_MODEL, messages=[{'role': 'user', 'content': prompt}], format='json')
        return json.loads(response['message']['content'])
    except Exception as e:
        print(f"LLM call failed: {e}")
        return {"answer": "Error: Could not generate a response.", "evidence": [], "explanation": "", "confidence": 0.0}

def generate_answer(query: str, top_chunks: List) -> Dict:
    """Builds the final prompt for the LLM to generate a factual answer."""
    sources_text = "\n\n".join([f"[Source {i+1}]:\n{txt}" for i, (txt, _) in enumerate(top_chunks)])
    prompt = f"""You are a helpful assistant. First, critically evaluate if the provided SOURCES are relevant to the user's QUERY. If not, your answer must be "The provided documents do not contain specific information on this topic."
If the sources are relevant, use ONLY the provided sources to answer. Respond in a valid JSON format with keys: "answer", "evidence", "explanation", "confidence".
The "answer" must be a complete sentence that rephrases the query.
SOURCES:\n---\n{sources_text}\n---\nQUERY: {query}"""
    return llm_call_fn(prompt)

## Step 4: The Processing Pipeline (One-Time Ingestion)

This is where we run the first major part of our pipeline. We take a source document, process it, and store its knowledge permanently in our ChromaDB database. You only need to run this once for each new document you want to add to your system's knowledge base.

In [20]:
def process_and_store_document(file_path: str):
    """Handles the document upload, processing, and storage."""
    if not os.path.exists(file_path):
        print(f"Error: File not found at {file_path}")
        return
    
    print(f"Processing document: {os.path.basename(file_path)}")
    try:
        CHROMA_CLIENT.delete_collection(name=COLLECTION_NAME)
        print(f"Deleted existing collection: '{COLLECTION_NAME}' to start fresh.")
    except Exception:
        print(f"No existing collection found. Creating new one.")
    
    collection = CHROMA_CLIENT.create_collection(name=COLLECTION_NAME)
    
    file_extension = os.path.splitext(file_path)[-1].lower()
    if file_extension == '.pdf': structured = extract_pdf_with_structure(file_path)
    elif file_extension == '.docx': structured = extract_docx_with_structure(file_path)
    else: 
        print("Error: Unsupported file type.")
        return
    
    chunks = semantic_chunker(structured)
    texts = [c["text"] for c in chunks]
    embeddings = embed_texts(texts)
    
    # We need to make sure all metadata values are simple types (str, int, float, bool)
    # We'll convert any lists into JSON strings.
    sanitized_chunks = []
    for chunk in chunks:
        sanitized_chunk = {}
        for key, value in chunk.items():
            if isinstance(value, list):
                sanitized_chunk[key] = json.dumps(value)  # Convert list to JSON string
            else:
                sanitized_chunk[key] = value
        sanitized_chunks.append(sanitized_chunk)
    
    ids = [str(i) for i in range(len(texts))]
    collection.add(embeddings=embeddings, documents=texts, metadatas=sanitized_chunks, ids=ids)
    
    tokenized_corpus = [TOKENIZER.tokenize(t) for t in texts]
    bm25 = BM25Okapi(tokenized_corpus)
    metadata_store = {id: chunk for id, chunk in zip(ids, sanitized_chunks)}
    
    with open(BM25_INDEX_PATH, "wb") as f: pickle.dump(bm25, f)
    with open(METADATA_STORE_PATH, "wb") as f: pickle.dump(metadata_store, f)
        
    print(f"✅ Successfully processed and stored '{os.path.basename(file_path)}'.")

# --- Run the processing pipeline ---
# Make sure you have a PDF file named 'sample_policy.pdf' in the same directory
process_and_store_document("policy.pdf")

Processing document: policy.pdf
Deleted existing collection: 'documents' to start fresh.


Token indices sequence length is longer than the specified maximum sequence length for this model (527 > 512). Running this sequence through the model will result in indexing errors


✅ Successfully processed and stored 'policy.pdf'.


## Step 5: The Query Pipeline (Answering Questions)

Now that our knowledge base is built, we can ask questions. This function loads the pre-built indexes from our database and uses the intelligent routing logic to answer a user's query.

In [21]:
def answer_query(query: str):
    """Handles a user's question, performs the search, and generates the final answer."""
    if not query: 
        print("Please provide a question.")
        return
    try:
        collection = CHROMA_CLIENT.get_collection(name=COLLECTION_NAME)
        with open(BM25_INDEX_PATH, "rb") as f: bm25 = pickle.load(f)
        with open(METADATA_STORE_PATH, "rb") as f: metadata_store = pickle.load(f)
    except Exception:
        print("Error: No document has been processed. Please run the processing pipeline first.")
        return

    top_chunks, top_score = hybrid_retrieve(query, collection, bm25, metadata_store)
    
    RELEVANCE_THRESHOLD = 0.1
    answer_text, mode = "", ""

    print(f"\n[ ❓ Query ]: {query}")
    print(f"Relevance Score: {top_score:.2f}")

    if top_score < RELEVANCE_THRESHOLD:
        mode = "General Mode"
        print("--> Query seems unrelated to the document. Switching to General Mode.")
        response = ollama.chat(model=OLLAMA_MODEL, messages=[{'role': 'user', 'content': query}])
        answer_text = response['message']['content']
    else:
        mode = "Document Mode"
        print("--> Query is relevant. Using document sources to answer.")
        response_json = generate_answer(query, [(chunk[1], chunk[2]) for chunk in top_chunks])
        answer_text = response_json.get("answer", "Could not generate an answer from the document.")
        
    print(f"\n[ 💬 Answer ({mode}) ]: {answer_text}")

# --- Ask some questions ---
answer_query("What is the document about? and is maternity leave applicabble")
print("\n---\n")
answer_query("I have done an IVF for Rs 56,000. Is it covered?")
print("\n---\n")
answer_query("Give me a python function to calculate the factorial of a number.")


[ ❓ Query ]: What is the document about? and is maternity leave applicabble
Relevance Score: -8.25
--> Query seems unrelated to the document. Switching to General Mode.

[ 💬 Answer (General Mode) ]: Title: The Impact of Maternity Leave Policies on Women'semotivation, Well-being, and Return to Work Outcomes in Japan. by Kiyomi Nakamura (Japanese Journal of Hygiene Science & Toxicology, March 2023)

Abstract: This study investigates the effects that maternity leave policies have on Japanese working mothers' motivation, well-being, and their return to work outcomes. Using a sample size of 576 respondents from various industries in Japan who gave birth within two years prior, data were collected through online questionnaires completed by participants between January 2021 and December 2021 regarding maternity leave experiences.

Key findings: The researchers found that extended paid-maternity leaves led to increased job satisfaction among the respondents but did not significantly affect th

In [22]:
# =====================================================================================
# Step 6: Final Evaluation
# =====================================================================================
# We'll first slightly modify our answer_query function to return the final answer
# This makes it easier to use in a testing loop.

def evaluate_query(query: str) -> str:
    """
    This is a modified version of answer_query that returns the answer text
    instead of printing it, making it suitable for automated testing.
    """
    try:
        collection = CHROMA_CLIENT.get_collection(name=COLLECTION_NAME)
        with open(BM25_INDEX_PATH, "rb") as f: bm25 = pickle.load(f)
        with open(METADATA_STORE_PATH, "rb") as f: metadata_store = pickle.load(f)
    except Exception:
        return "Error: Could not load the processed document. Please run the ingestion cell first."

    top_chunks, top_score = hybrid_retrieve(query, collection, bm25, metadata_store)
    
    RELEVANCE_THRESHOLD = 0.1
    answer_text = ""

    if top_score < RELEVANCE_THRESHOLD:
        response = ollama.chat(model=OLLAMA_MODEL, messages=[{'role': 'user', 'content': query}])
        answer_text = response['message']['content']
    else:
        response_json = generate_answer(query, [(chunk[1], chunk[2]) for chunk in top_chunks])
        answer_text = response_json.get("answer", "Could not generate an answer from the document.")
        
    return answer_text

# --- Test Data ---
test_cases = [
    {
        "question": "What is the grace period for premium payment under the National Parivar Mediclaim Plus Policy?",
        "expected_answer": "A grace period of thirty days is provided for premium payment after the due date to renew or continue the policy without losing continuity benefits."
    },
    {
        "question": "What is the waiting period for pre-existing diseases (PED) to be covered?",
        "expected_answer": "There is a waiting period of thirty-six (36) months of continuous coverage from the first policy inception for pre-existing diseases and their direct complications to be covered."
    },
    {
        "question": "Does this policy cover maternity expenses, and what are the conditions?",
        "expected_answer": "Yes, the policy covers maternity expenses, including childbirth and lawful medical termination of pregnancy. To be eligible, the female insured person must have been continuously covered for at least 24 months. The benefit is limited to two deliveries or terminations during the policy period."
    },
    {
        "question": "What is the waiting period for cataract surgery?",
        "expected_answer": "The policy has a specific waiting period of two (2) years for cataract surgery."
    },
    {
        "question": "Are the medical expenses for an organ donor covered under this policy?",
        "expected_answer": "Yes, the policy indemnifies the medical expenses for the organ donor's hospitalization for the purpose of harvesting the organ, provided the organ is for an insured person and the donation complies with the Transplantation of Human Organs Act, 1994."
    },
    {
        "question": "What is the No Claim Discount (NCD) offered in this policy?",
        "expected_answer": "A No Claim Discount of 5% on the base premium is offered on renewal for a one-year policy term if no claims were made in the preceding year. The maximum aggregate NCD is capped at 5% of the total base premium."
    },
    {
        "question": "Is there a benefit for preventive health check-ups?",
        "expected_answer": "Yes, the policy reimburses expenses for health check-ups at the end of every block of two continuous policy years, provided the policy has been renewed without a break. The amount is subject to the limits specified in the Table of Benefits."
    },
    {
        "question": "How does the policy define a 'Hospital'?",
        "expected_answer": "A hospital is defined as an institution with at least 10 inpatient beds (in towns with a population below ten lakhs) or 15 beds (in all other places), with qualified nursing staff and medical practitioners available 24/7, a fully equipped operation theatre, and which maintains daily records of patients."
    },
    {
        "question": "What is the extent of coverage for AYUSH treatments?",
        "expected_answer": "The policy covers medical expenses for inpatient treatment under Ayurveda, Yoga, Naturopathy, Unani, Siddha, and Homeopathy systems up to the Sum Insured limit, provided the treatment is taken in an AYUSH Hospital."
    },
    {
        "question": "Are there any sub-limits on room rent and ICU charges for Plan A?",
        "expected_answer": "Yes, for Plan A, the daily room rent is capped at 1% of the Sum Insured, and ICU charges are capped at 2% of the Sum Insured. These limits do not apply if the treatment is for a listed procedure in a Preferred Provider Network (PPN)."
    }
]

# --- Run the Evaluation Loop ---
print("="*50)
print("RUNNING PIPELINE EVALUATION")
print("="*50)

for i, case in enumerate(test_cases):
    question = case["question"]
    expected = case["expected_answer"]
    
    print(f"\n--- [ Test Case {i+1}/{len(test_cases)} ] ---")
    print(f"❓ QUESTION:\n{question}")
    
    # Get the actual answer from your pipeline
    actual_answer = evaluate_query(question)
    
    print(f"\n✅ EXPECTED ANSWER:\n{expected}")
    print(f"\n🤖 ACTUAL ANSWER:\n{actual_answer}")
    print("-"*(20 + len(str(i+1)) + len(str(len(test_cases))) + 6))

RUNNING PIPELINE EVALUATION

--- [ Test Case 1/10 ] ---
❓ QUESTION:
What is the grace period for premium payment under the National Parivar Mediclaim Plus Policy?

✅ EXPECTED ANSWER:
A grace period of thirty days is provided for premium payment after the due date to renew or continue the policy without losing continuity benefits.

🤖 ACTUAL ANSWER:
The Grace Period for payment of the premium shall be thirty days.
-----------------------------

--- [ Test Case 2/10 ] ---
❓ QUESTION:
What is the waiting period for pre-existing diseases (PED) to be covered?

✅ EXPECTED ANSWER:
There is a waiting period of thirty-six (36) months of continuous coverage from the first policy inception for pre-existing diseases and their direct complications to be covered.

🤖 ACTUAL ANSWER:
The document stipulates that expenses related to PED and its direct complications are excluded until a patient has been continuously insured without any break, which reduces the waiting period for these conditions. If there