# Comparing RAG Retrieval Strategies on the Indian Constitution

We compare five retrieval strategies — No RAG, Naive RAG, Reranker, Hybrid Search (BM25 + semantic), and Hierarchical RAG — on a 404-page PDF of the Indian Constitution, evaluated against a 35-question benchmark using GPT-4o as judge.

In [None]:
import os
from dotenv import load_dotenv

load_dotenv(dotenv_path="../.env")

print("OPENAI_API_KEY is set:", bool(os.environ.get("OPENAI_API_KEY")))
print("TOGETHER_API_KEY is set:", bool(os.environ.get("TOGETHER_API_KEY")))

## Setup

Load API keys from a `.env` file in the parent directory. You'll need:
- **OPENAI_API_KEY** — for `text-embedding-3-small` embeddings and GPT-4o evaluation
- **TOGETHER_API_KEY** — for `Llama-3.2-3B-Instruct-Turbo` generation via Together AI

In [None]:
import fitz  # pymupdf

doc = fitz.open("indiaconstitution.pdf")

# Extract all text page by page
pages = []
for page in doc:
    text = page.get_text()
    pages.append(text)

total_chars = sum(len(p) for p in pages)
print(f"Total pages: {len(doc)}")
print(f"Total characters: {total_chars:,}")

## Load and preview the document

In [None]:
for page_num in [0, 5, 50]:
    if page_num < len(pages):
        print(f"{'='*60}")
        print(f"PAGE {page_num + 1}")
        print(f"{'='*60}")
        print(pages[page_num][:1000])
        print("...\n")

all_text = "\n".join(pages)

## RAG Pipeline — Base Class

Abstract base class that all pipelines inherit from. Handles embedding (OpenAI), vector storage (ChromaDB), and generation (Together AI). Subclasses override `chunk()`, `retrieve()`, and `generate()`.

In [None]:
import json
import os
import re
from abc import ABC, abstractmethod

import chromadb
from openai import OpenAI


class RAGPipeline(ABC):
    """Base class for RAG pipelines. Subclass and override methods to swap techniques."""

    name: str = "base"

    def __init__(self):
        self.openai_client = OpenAI()
        self.together_client = OpenAI(
            api_key=os.environ.get("TOGETHER_API_KEY"),
            base_url="https://api.together.xyz/v1",
        )
        self.chroma_client = chromadb.Client()
        col_name = re.sub(r"[^a-zA-Z0-9_]", "_", self.name)[:60]
        self.collection = self.chroma_client.get_or_create_collection(
            name=col_name,
            metadata={"hnsw:space": "cosine"},
        )
        self.chunks: list[str] = []

    # ── Override these in subclasses ──────────────────────────────

    @abstractmethod
    def chunk(self, text: str) -> list[str]:
        """Split raw text into chunks."""
        ...

    def embed(self, texts: list[str]) -> list[list[float]]:
        """Embed a batch of texts in sub-batches to stay within API limits."""
        MAX_CHARS = 30_000
        SUB_BATCH = 20
        truncated = [t[:MAX_CHARS] for t in texts]
        all_embeddings = []
        for i in range(0, len(truncated), SUB_BATCH):
            batch = truncated[i : i + SUB_BATCH]
            resp = self.openai_client.embeddings.create(
                model="text-embedding-3-small", input=batch
            )
            all_embeddings.extend([item.embedding for item in resp.data])
        return all_embeddings

    @abstractmethod
    def retrieve(self, question: str, n_results: int = 5) -> str:
        """Return context string for a question."""
        ...

    @abstractmethod
    def generate(self, question: str, context: str) -> str:
        """Generate an answer given question + retrieved context."""
        ...

    # ── Shared infrastructure (usually not overridden) ───────────

    def build_index(self, text: str, batch_size: int = 100):
        """Chunk text, embed, and upsert into ChromaDB."""
        self.chunks = self.chunk(text)
        print(f"[{self.name}] {len(self.chunks)} chunks (avg {sum(len(c) for c in self.chunks) // len(self.chunks)} chars)")

        for i in range(0, len(self.chunks), batch_size):
            batch = self.chunks[i : i + batch_size]
            ids = [f"chunk_{j}" for j in range(i, i + len(batch))]
            embeddings = self.embed(batch)
            self.collection.upsert(ids=ids, documents=batch, embeddings=embeddings)
            print(f"  Embedded batch {i // batch_size + 1} ({len(batch)} chunks)")

        print(f"  Total in collection: {self.collection.count()}")

    def run(self, question: str) -> tuple[str, str]:
        """End-to-end: retrieve context then generate answer."""
        context = self.retrieve(question)
        answer = self.generate(question, context)
        return answer, context

    def generate_answers(self, val_path: str = "val.json") -> list[dict]:
        """Run the pipeline on a validation set and return results."""
        with open(val_path) as f:
            val_data = json.load(f)

        print(f"[{self.name}] Running on {len(val_data)} questions")

        results = []
        for item in val_data:
            q = item["question"]
            print(f"\n[{item['id']}] {item['category']}")
            print(f"Q: {q}")
            answer, context = self.run(q)
            results.append({
                "id": item["id"],
                "category": item["category"],
                "question": q,
                "answer": item["answer"],
                "citation": item["citation"],
                "ai_response": answer,
                "context": context,
            })
            print(f"A: {answer[:200]}...")
            print("-" * 60)

        return results

In [None]:
class NoRAG(RAGPipeline):
    """Baseline: no retrieval at all — the LLM answers from its own knowledge."""

    name = "no_rag"

    GEN_MODEL = "meta-llama/Llama-3.2-3B-Instruct-Turbo"

    def chunk(self, text: str) -> list[str]:
        # No chunking needed
        return []

    def build_index(self, text: str, batch_size: int = 100):
        # No index to build
        print(f"[{self.name}] Skipping index build (no retrieval)")

    def retrieve(self, question: str, n_results: int = 5) -> str:
        # No retrieval — return empty context
        return ""

    def generate(self, question: str, context: str) -> str:
        system = (
            "You are a helpful assistant that answers questions about the Indian Constitution. "
            "Be precise and cite relevant articles when possible."
        )
        resp = self.together_client.chat.completions.create(
            model=self.GEN_MODEL,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": f"Question: {question}\n\nAnswer:"},
            ],
        )
        return resp.choices[0].message.content

### No RAG (Baseline)

No retrieval — the LLM answers purely from its parametric knowledge.

In [None]:
class NaiveRAG(RAGPipeline):
    """Fixed-size chunking, cosine similarity retrieval, no reranking."""

    name = "naive_rag"

    CHUNK_SIZE = 1000
    CHUNK_OVERLAP = 200
    GEN_MODEL = "meta-llama/Llama-3.2-3B-Instruct-Turbo"

    def chunk(self, text: str) -> list[str]:
        chunks, start = [], 0
        while start < len(text):
            chunks.append(text[start : start + self.CHUNK_SIZE])
            start += self.CHUNK_SIZE - self.CHUNK_OVERLAP
        return chunks

    def retrieve(self, question: str, n_results: int = 5) -> str:
        q_emb = self.embed([question])[0]
        results = self.collection.query(query_embeddings=[q_emb], n_results=n_results)
        return "\n\n---\n\n".join(results["documents"][0])

    def generate(self, question: str, context: str) -> str:
        system = (
            "You are a helpful assistant that answers questions about the Indian Constitution. "
            "Use ONLY the provided context to answer. If the context doesn't contain enough "
            "information, say so. Be precise and cite relevant articles when possible."
        )
        resp = self.together_client.chat.completions.create(
            model=self.GEN_MODEL,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"},
            ],
        )
        return resp.choices[0].message.content

### Naive RAG

Fixed-size chunking (1000 chars, 200 overlap) with cosine similarity retrieval. No reranking.

In [None]:
class RerankerRAG(RAGPipeline):
    """Reranks initial vector-search candidates using an LLM before generation."""

    name = "reranker_rag"

    CHUNK_SIZE = 1000
    CHUNK_OVERLAP = 200
    GEN_MODEL = "meta-llama/Llama-3.2-3B-Instruct-Turbo"
    RERANK_MODEL = "meta-llama/Llama-3.2-3B-Instruct-Turbo"
    INITIAL_K = 10   # candidates fetched from vector DB
    FINAL_K = 5      # candidates kept after reranking

    def chunk(self, text: str) -> list[str]:
        chunks, start = [], 0
        while start < len(text):
            chunks.append(text[start : start + self.CHUNK_SIZE])
            start += self.CHUNK_SIZE - self.CHUNK_OVERLAP
        return chunks

    def _rerank_with_llm(self, query: str, documents: list[str], top_n: int) -> list[str]:
        """Ask an LLM to score each document's relevance, return top_n."""
        scored = []
        for i, doc in enumerate(documents):
            prompt = (
                f"On a scale of 1-10, rate how relevant the following document is to the query.\n"
                f"Query: {query}\n\n"
                f"Document:\n{doc[:800]}\n\n"
                f"Respond with ONLY a JSON object: {{\"score\": <integer 1-10>}}"
            )
            resp = self.together_client.chat.completions.create(
                model=self.RERANK_MODEL,
                messages=[{"role": "user", "content": prompt}],
                temperature=0,
            )
            try:
                score = int(json.loads(resp.choices[0].message.content)["score"])
            except (json.JSONDecodeError, KeyError, ValueError):
                score = 1
            scored.append((score, i, doc))

        scored.sort(key=lambda x: x[0], reverse=True)
        return [doc for _, _, doc in scored[:top_n]]

    def retrieve(self, question: str, n_results: int = 5) -> str:
        # Step 1: broad vector search
        q_emb = self.embed([question])[0]
        results = self.collection.query(query_embeddings=[q_emb], n_results=self.INITIAL_K)
        candidates = results["documents"][0]

        # Step 2: LLM rerank and keep top FINAL_K
        reranked = self._rerank_with_llm(question, candidates, self.FINAL_K)
        return "\n\n---\n\n".join(reranked)

    def generate(self, question: str, context: str) -> str:
        system = (
            "You are a helpful assistant that answers questions about the Indian Constitution. "
            "Use ONLY the provided context to answer. If the context doesn't contain enough "
            "information, say so. Be precise and cite relevant articles when possible."
        )
        resp = self.together_client.chat.completions.create(
            model=self.GEN_MODEL,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"},
            ],
        )
        return resp.choices[0].message.content

### Reranker RAG

Fetches 10 candidates via vector search, then uses an LLM to score each chunk's relevance (1-10) and keeps the top 5.

In [None]:
from collections import defaultdict
from rank_bm25 import BM25Okapi
import re as _re


class HybridSearchRAG(RAGPipeline):
    """Combines BM25 keyword search with semantic vector search using RRF fusion."""

    name = "hybrid_search_rag"

    CHUNK_SIZE = 1000
    CHUNK_OVERLAP = 200
    GEN_MODEL = "meta-llama/Llama-3.2-3B-Instruct-Turbo"
    SEMANTIC_K = 10
    BM25_K = 10
    FINAL_K = 5
    RRF_K = 60  # constant for Reciprocal Rank Fusion

    def chunk(self, text: str) -> list[str]:
        chunks, start = [], 0
        while start < len(text):
            chunks.append(text[start : start + self.CHUNK_SIZE])
            start += self.CHUNK_SIZE - self.CHUNK_OVERLAP
        return chunks

    def build_index(self, text: str, batch_size: int = 100):
        """Build both vector index and BM25 index."""
        super().build_index(text, batch_size)

        # Build BM25 index over the same chunks
        tokenized = [self._tokenize(c) for c in self.chunks]
        self.bm25 = BM25Okapi(tokenized)
        print(f"  BM25 index built over {len(self.chunks)} chunks")

    @staticmethod
    def _tokenize(text: str) -> list[str]:
        return _re.findall(r"\w+", text.lower())

    def _bm25_search(self, query: str, k: int) -> list[tuple[int, float]]:
        """Return top-k (chunk_index, score) pairs from BM25."""
        tokens = self._tokenize(query)
        scores = self.bm25.get_scores(tokens)
        top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:k]
        return [(i, scores[i]) for i in top_indices]

    def _semantic_search(self, query: str, k: int) -> list[tuple[int, float]]:
        """Return top-k (chunk_index, rank_position) pairs from vector search."""
        q_emb = self.embed([query])[0]
        results = self.collection.query(query_embeddings=[q_emb], n_results=k, include=["documents"])
        # ChromaDB returns ids like "chunk_42" — extract indices
        indices = [int(cid.split("_")[1]) for cid in results["ids"][0]]
        return [(idx, rank) for rank, idx in enumerate(indices)]

    def _rrf_fuse(self, semantic_results: list[tuple[int, float]], bm25_results: list[tuple[int, float]]) -> list[int]:
        """Reciprocal Rank Fusion to merge two ranked lists."""
        scores = defaultdict(float)

        for rank, (idx, _) in enumerate(semantic_results):
            scores[idx] += 1.0 / (self.RRF_K + rank + 1)

        for rank, (idx, _) in enumerate(bm25_results):
            scores[idx] += 1.0 / (self.RRF_K + rank + 1)

        fused = sorted(scores.keys(), key=lambda i: scores[i], reverse=True)
        return fused[: self.FINAL_K]

    def retrieve(self, question: str, n_results: int = 5) -> str:
        semantic = self._semantic_search(question, self.SEMANTIC_K)
        bm25 = self._bm25_search(question, self.BM25_K)
        top_indices = self._rrf_fuse(semantic, bm25)
        return "\n\n---\n\n".join(self.chunks[i] for i in top_indices)

    def generate(self, question: str, context: str) -> str:
        system = (
            "You are a helpful assistant that answers questions about the Indian Constitution. "
            "Use ONLY the provided context to answer. If the context doesn't contain enough "
            "information, say so. Be precise and cite relevant articles when possible."
        )
        resp = self.together_client.chat.completions.create(
            model=self.GEN_MODEL,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"},
            ],
        )
        return resp.choices[0].message.content

### Hybrid Search RAG

Combines BM25 keyword search with semantic vector search, merged via Reciprocal Rank Fusion (RRF).

In [None]:
class HierarchicalRAG(RAGPipeline):
    """Two-level retrieval: search summaries first, then detailed chunks within matched sections."""

    name = "hierarchical_rag"

    SECTION_SIZE = 5000       # large sections for summarisation
    CHUNK_SIZE = 1000         # detailed chunks within each section
    CHUNK_OVERLAP = 200
    SUMMARY_MODEL = "meta-llama/Llama-3.2-3B-Instruct-Turbo"
    GEN_MODEL = "meta-llama/Llama-3.2-3B-Instruct-Turbo"
    K_SUMMARIES = 3           # top sections to retrieve
    K_CHUNKS = 5              # detailed chunks per selected section

    def chunk(self, text: str) -> list[str]:
        """Not used directly — build_index handles both levels."""
        chunks, start = [], 0
        while start < len(text):
            chunks.append(text[start : start + self.CHUNK_SIZE])
            start += self.CHUNK_SIZE - self.CHUNK_OVERLAP
        return chunks

    def _make_sections(self, text: str) -> list[str]:
        """Split text into large non-overlapping sections."""
        sections = []
        start = 0
        while start < len(text):
            sections.append(text[start : start + self.SECTION_SIZE])
            start += self.SECTION_SIZE
        return sections

    def _summarise(self, section_text: str) -> str:
        """Ask the LLM for a concise summary of a section."""
        resp = self.together_client.chat.completions.create(
            model=self.SUMMARY_MODEL,
            messages=[
                {"role": "system", "content": "Summarise the following legal text in 2-3 sentences. Focus on the key articles, rights, or provisions mentioned."},
                {"role": "user", "content": section_text[:4000]},
            ],
            temperature=0,
        )
        return resp.choices[0].message.content

    def build_index(self, text: str, batch_size: int = 100):
        """Build two-level index: summary collection + detailed chunk collection."""
        sections = self._make_sections(text)
        print(f"[{self.name}] {len(sections)} sections of ~{self.SECTION_SIZE} chars")

        # ── Level 1: Summaries ──
        self.summary_collection = self.chroma_client.get_or_create_collection(
            name="hierarchical_summaries", metadata={"hnsw:space": "cosine"},
        )
        print("  Generating summaries...")
        self.section_texts = sections
        summaries = []
        for i, sec in enumerate(sections):
            summary = self._summarise(sec)
            summaries.append(summary)
            if (i + 1) % 20 == 0:
                print(f"    Summarised {i + 1}/{len(sections)} sections")

        # Embed and store summaries
        for i in range(0, len(summaries), batch_size):
            batch = summaries[i : i + batch_size]
            ids = [f"summary_{j}" for j in range(i, i + len(batch))]
            embeddings = self.embed(batch)
            self.summary_collection.upsert(ids=ids, documents=batch, embeddings=embeddings)
        print(f"  {len(summaries)} summaries indexed")

        # ── Level 2: Detailed chunks (with section_id metadata) ──
        self.detailed_collection = self.chroma_client.get_or_create_collection(
            name="hierarchical_details", metadata={"hnsw:space": "cosine"},
        )
        all_chunks, all_ids, all_meta = [], [], []
        chunk_counter = 0
        for sec_idx, section in enumerate(sections):
            start = 0
            while start < len(section):
                chunk_text = section[start : start + self.CHUNK_SIZE]
                all_chunks.append(chunk_text)
                all_ids.append(f"detail_{chunk_counter}")
                all_meta.append({"section_id": sec_idx})
                chunk_counter += 1
                start += self.CHUNK_SIZE - self.CHUNK_OVERLAP

        self.chunks = all_chunks  # keep for compatibility

        for i in range(0, len(all_chunks), batch_size):
            batch_docs = all_chunks[i : i + batch_size]
            batch_ids = all_ids[i : i + batch_size]
            batch_meta = all_meta[i : i + batch_size]
            embeddings = self.embed(batch_docs)
            self.detailed_collection.upsert(
                ids=batch_ids, documents=batch_docs,
                embeddings=embeddings, metadatas=batch_meta,
            )
            print(f"  Embedded detail batch {i // batch_size + 1} ({len(batch_docs)} chunks)")

        print(f"  {len(all_chunks)} detailed chunks indexed across {len(sections)} sections")

    def retrieve(self, question: str, n_results: int = 5) -> str:
        """Two-stage retrieval: summaries → detailed chunks within top sections."""
        q_emb = self.embed([question])[0]

        # Stage 1: find top-K summary sections
        summary_results = self.summary_collection.query(
            query_embeddings=[q_emb], n_results=self.K_SUMMARIES,
        )
        top_section_ids = [
            int(sid.split("_")[1]) for sid in summary_results["ids"][0]
        ]

        # Stage 2: search detailed chunks filtered to those sections
        detail_results = self.detailed_collection.query(
            query_embeddings=[q_emb],
            n_results=self.K_CHUNKS,
            where={"section_id": {"$in": top_section_ids}},
            include=["documents"],
        )

        return "\n\n---\n\n".join(detail_results["documents"][0])

    def generate(self, question: str, context: str) -> str:
        system = (
            "You are a helpful assistant that answers questions about the Indian Constitution. "
            "Use ONLY the provided context to answer. If the context doesn't contain enough "
            "information, say so. Be precise and cite relevant articles when possible."
        )
        resp = self.together_client.chat.completions.create(
            model=self.GEN_MODEL,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"},
            ],
        )
        return resp.choices[0].message.content

### Hierarchical RAG

Two-level retrieval: first searches LLM-generated section summaries, then retrieves detailed chunks within the matched sections.

## Evaluator

LLM-as-a-Judge using GPT-4o. Scores each response against ground truth as PASS (1), PARTIAL (0.5), or FAIL (0), with per-category breakdown.

In [None]:
import time


class Evaluator:
    """LLM-as-a-Judge scorer. Accepts RAG results and scores them against ground truth."""

    SYSTEM_PROMPT = (
        "You are an intelligent evaluation system tasked with assessing the AI assistant's responses. "
        "If the AI assistant's response is very close to the true response, assign a score of 1. "
        "If the response is incorrect or unsatisfactory in relation to the true response, assign a score of 0. "
        "If the response is partially aligned with the true response, assign a score of 0.5. "
        'Respond with ONLY a JSON object: {"score": <0 | 0.5 | 1>}'
    )

    def __init__(self, judge_model: str = "gpt-4o"):
        self.judge_model = judge_model
        self.client = OpenAI()

    def _score_one(self, question: str, ai_response: str, true_response: str) -> float:
        prompt = (
            f"User Query: {question}\n"
            f"AI Response: {ai_response}\n"
            f"True Response: {true_response}\n"
            f"{self.SYSTEM_PROMPT}"
        )
        resp = self.client.chat.completions.create(
            model=self.judge_model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            response_format={"type": "json_object"},
        )
        return float(json.loads(resp.choices[0].message.content)["score"])

    def evaluate(self, results: list[dict], generation_time: float | None = None) -> list[dict]:
        """Score a list of RAG results and print aggregate metrics with timing."""
        print(f"\n{'='*60}")
        print(f"  SCORING — Judge: {self.judge_model}")
        print(f"{'='*60}")

        eval_start = time.time()

        for r in results:
            r["score"] = self._score_one(r["question"], r["ai_response"], r["answer"])
            label = {1: "PASS", 0.5: "PARTIAL", 0: "FAIL"}[r["score"]]
            print(f"[{r['id']:>2}] {label:>7} ({r['score']})  {r['question'][:80]}")

        eval_time = time.time() - eval_start

        # ── Aggregate ──
        n = len(results)
        total = sum(r["score"] for r in results)
        print(f"\n{'='*60}")
        print(f"  AGGREGATE SCORE — {n} questions")
        print(f"{'='*60}")
        print(f"  Total: {total} / {n}  ({total / n:.1%})")
        print(f"  PASS: {sum(1 for r in results if r['score'] == 1)}")
        print(f"  PARTIAL: {sum(1 for r in results if r['score'] == 0.5)}")
        print(f"  FAIL: {sum(1 for r in results if r['score'] == 0)}")

        # ── Timing ──
        print(f"\n  TIMING")
        print(f"  {'─'*50}")
        if generation_time is not None:
            print(f"  Generation:  {generation_time:.1f}s ({generation_time / n:.2f}s per question)")
        print(f"  Evaluation:  {eval_time:.1f}s ({eval_time / n:.2f}s per question)")
        if generation_time is not None:
            total_time = generation_time + eval_time
            print(f"  Total:       {total_time:.1f}s")

        # ── Per-category breakdown ──
        categories = sorted(set(r["category"] for r in results))
        print(f"\n  PER-CATEGORY BREAKDOWN")
        print(f"  {'─'*50}")
        for cat in categories:
            cat_items = [r for r in results if r["category"] == cat]
            cat_total = sum(r["score"] for r in cat_items)
            cat_n = len(cat_items)
            print(f"  {cat} — {cat_total}/{cat_n} ({cat_total / cat_n:.1%})")

        return results

In [None]:
pipeline = RerankerRAG()
pipeline.build_index(all_text)

gen_start = time.time()
results = pipeline.generate_answers()
gen_time = time.time() - gen_start

evaluator = Evaluator(judge_model="gpt-4o")
eval_results = evaluator.evaluate(results, generation_time=gen_time)

## Run

Pick a pipeline, build the index, generate answers, and evaluate. Change the pipeline class below to compare strategies.