# Part 2: Production RAG

In this notebook, we'll build a production-ready RAG system with:

1. **Hybrid Search** — Vector + Keyword search combined
2. **Reranking** — Score and filter results
3. **Relevance Check** — Verify we found useful info
4. **Query Rewrite** — Retry with better phrasing
5. **Grounding Check** — Prevent hallucinations

This is how you go from 30% to 86% accuracy.

## Setup

In [None]:
!git clone https://github.com/i33ym/rag-workshop.git 2>/dev/null || echo "Already cloned"
%cd rag-workshop

In [None]:
!pip install -q openai langchain langchain-openai langchain-community chromadb rank-bm25

In [None]:
import os
from getpass import getpass

os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")

## Load and Prepare Documents

Same as Part 1 — load docs, split into chunks, create vector store.

In [None]:
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma

# Load
loader = DirectoryLoader("docs/", glob="**/*.md", loader_cls=TextLoader)
documents = loader.load()

# Split
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n## ", "\n### ", "\n\n", "\n", " "]
)
chunks = splitter.split_documents(documents)

# Embed
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Chroma.from_documents(documents=chunks, embedding=embeddings)

# LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

print(f"Loaded {len(chunks)} chunks")

## Stage 1: Hybrid Search

**Problem:** Vector search finds similar meanings but misses exact terms.

**Solution:** Combine vector search + keyword search (BM25).

### What is BM25?
BM25 is a keyword search algorithm. It finds documents containing the exact words you searched for.

In [None]:
from langchain_community.retrievers import BM25Retriever

# Create BM25 retriever from same chunks
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 5

In [None]:
# Compare: Vector vs BM25
query = "POST /api/payment"

print(f"Query: {query}\n")

print("=== Vector Search Results ===")
vector_results = vector_store.similarity_search(query, k=3)
for i, doc in enumerate(vector_results):
    print(f"{i+1}. {doc.page_content[:100]}...\n")

print("=== BM25 (Keyword) Results ===")
bm25_results = bm25_retriever.invoke(query)
for i, doc in enumerate(bm25_results[:3]):
    print(f"{i+1}. {doc.page_content[:100]}...\n")

### Reciprocal Rank Fusion (RRF)

RRF combines results from multiple searches. Documents that appear in both searches rank higher.

In [None]:
def hybrid_search(query, k=5):
    """Combine vector search and BM25 using RRF."""
    
    # Get results from both methods
    vector_results = vector_store.similarity_search(query, k=k)
    bm25_results = bm25_retriever.invoke(query)[:k]
    
    # Calculate RRF scores
    rrf_scores = {}
    k_constant = 60  # Standard RRF constant
    
    for rank, doc in enumerate(vector_results):
        doc_id = doc.page_content[:100]  # Use content as ID
        rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k_constant + rank + 1)
        rrf_scores[doc_id + "_doc"] = doc  # Store the document
    
    for rank, doc in enumerate(bm25_results):
        doc_id = doc.page_content[:100]
        rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (k_constant + rank + 1)
        rrf_scores[doc_id + "_doc"] = doc
    
    # Sort by score and return documents
    sorted_ids = sorted(
        [k for k in rrf_scores.keys() if not k.endswith("_doc")],
        key=lambda x: rrf_scores[x],
        reverse=True
    )
    
    results = [rrf_scores[doc_id + "_doc"] for doc_id in sorted_ids[:k]]
    scores = [rrf_scores[doc_id] for doc_id in sorted_ids[:k]]
    
    return results, scores

In [None]:
# Test hybrid search
query = "How do I authenticate API requests?"

results, scores = hybrid_search(query, k=5)

print(f"Query: {query}\n")
print("=== Hybrid Search Results ===")
for i, (doc, score) in enumerate(zip(results, scores)):
    print(f"{i+1}. Score: {score:.4f}")
    print(f"   {doc.page_content[:80]}...\n")

## Stage 2: Reranking

**Problem:** Search returns documents by similarity, not by how well they answer the question.

**Solution:** Score each document's relevance to the question and keep only the best.

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def rerank_documents(query, documents, top_n=3):
    """Score each document's relevance and return top results."""
    
    rerank_prompt = ChatPromptTemplate.from_template("""
Rate how relevant this document is to answering the question.
Reply with only a number from 0 to 10.

Question: {question}

Document: {document}

Relevance score (0-10):""")
    
    chain = rerank_prompt | llm | StrOutputParser()
    
    scored_docs = []
    for doc in documents:
        try:
            score_str = chain.invoke({
                "question": query,
                "document": doc.page_content[:500]
            })
            score = float(score_str.strip())
        except:
            score = 5.0
        
        scored_docs.append((doc, score))
    
    # Sort by score (highest first)
    scored_docs.sort(key=lambda x: x[1], reverse=True)
    
    return scored_docs[:top_n]

In [None]:
# Test reranking
query = "How do I create a payment?"

# First, get hybrid results
hybrid_results, _ = hybrid_search(query, k=6)

print(f"Query: {query}\n")
print("Reranking documents...\n")

# Rerank them
reranked = rerank_documents(query, hybrid_results, top_n=3)

print("=== Reranked Results ===")
scores = []
for i, (doc, score) in enumerate(reranked):
    scores.append(score)
    print(f"{i+1}. Score: {score}/10")
    print(f"   {doc.page_content[:80]}...\n")

print(f"\nRerank scores: {scores}")

## Stage 3: Relevance Check

**Problem:** Sometimes no documents are truly relevant. Simple RAG generates an answer anyway.

**Solution:** Check if documents can actually answer the question before proceeding.

In [None]:
def check_relevance(query, documents):
    """Check if documents can answer the question."""
    
    context = "\n\n".join([doc.page_content[:300] for doc in documents])
    
    relevance_prompt = ChatPromptTemplate.from_template("""
Can this context answer the question? Reply only 'yes' or 'no'.

Question: {question}

Context: {context}

Can this context answer the question?""")
    
    chain = relevance_prompt | llm | StrOutputParser()
    result = chain.invoke({"question": query, "context": context})
    
    is_relevant = "yes" in result.lower()
    return is_relevant, result

In [None]:
# Test with a relevant question
query = "How do I authenticate?"
docs, _ = hybrid_search(query, k=3)

is_relevant, raw = check_relevance(query, docs)
print(f"Query: {query}")
print(f"Is relevant: {is_relevant} (raw: {raw})")

In [None]:
# Test with an irrelevant question
query = "How do I integrate with PayPal?"
docs, _ = hybrid_search(query, k=3)

is_relevant, raw = check_relevance(query, docs)
print(f"Query: {query}")
print(f"Is relevant: {is_relevant} (raw: {raw})")

## Stage 4: Query Rewrite

**Problem:** Users don't always use the same terminology as the docs.

**Solution:** If relevance check fails, rewrite the query and try again.

In [None]:
def rewrite_query(query):
    """Rewrite query for better retrieval."""
    
    rewrite_prompt = ChatPromptTemplate.from_template("""
Rewrite this question to be better for searching documentation.
Use technical terms if applicable. Be specific.
Reply with only the rewritten question.

Original question: {question}

Rewritten question:""")
    
    chain = rewrite_prompt | llm | StrOutputParser()
    return chain.invoke({"question": query})

In [None]:
# Test query rewrite
test_queries = [
    "How do I log in?",
    "What happens when something goes wrong?",
    "How do I get money from a customer?"
]

for q in test_queries:
    rewritten = rewrite_query(q)
    print(f"Original:  {q}")
    print(f"Rewritten: {rewritten}\n")

## Stage 5: Grounding Check

**Problem:** LLMs sometimes "hallucinate" — generate info not in the documents.

**Solution:** Verify the answer is supported by the retrieved documents.

In [None]:
def check_grounding(answer, documents):
    """Verify answer is supported by documents."""
    
    context = "\n\n".join([doc.page_content for doc in documents])
    
    grounding_prompt = ChatPromptTemplate.from_template("""
Is this answer fully supported by the context? Reply only 'yes' or 'no'.

Context:
{context}

Answer:
{answer}

Is the answer fully supported?""")
    
    chain = grounding_prompt | llm | StrOutputParser()
    result = chain.invoke({"context": context, "answer": answer})
    
    is_grounded = "yes" in result.lower()
    return is_grounded, result

## Putting It All Together: Production RAG Pipeline

In [None]:
def generate_answer(query, documents):
    """Generate answer from documents."""
    
    context = "\n\n---\n\n".join([doc.page_content for doc in documents])
    
    answer_prompt = ChatPromptTemplate.from_template("""
Answer the question based only on the following context.
If you can't find the answer, say "I don't have information about this."
Include code examples if relevant.

Context:
{context}

Question: {question}

Answer:""")
    
    chain = answer_prompt | llm | StrOutputParser()
    return chain.invoke({"context": context, "question": query})

In [None]:
def production_rag(query, debug=False):
    """Full production RAG pipeline."""
    
    debug_info = {"query": query}
    
    # Stage 1: Hybrid Search
    if debug: print("[1] Hybrid Search...")
    docs, fusion_scores = hybrid_search(query, k=6)
    debug_info["hybrid_results"] = len(docs)
    
    # Stage 2: Reranking
    if debug: print("[2] Reranking...")
    reranked = rerank_documents(query, docs, top_n=3)
    rerank_scores = [score for _, score in reranked]
    top_docs = [doc for doc, _ in reranked]
    debug_info["rerank_scores"] = rerank_scores
    if debug: print(f"    Scores: {rerank_scores}")
    
    # Stage 3: Relevance Check
    if debug: print("[3] Checking relevance...")
    is_relevant, _ = check_relevance(query, top_docs)
    debug_info["is_relevant"] = is_relevant
    
    # Stage 4: Query Rewrite (if not relevant)
    if not is_relevant:
        if debug: print("[4] Not relevant, rewriting query...")
        new_query = rewrite_query(query)
        debug_info["rewritten_query"] = new_query
        if debug: print(f"    Rewritten: {new_query}")
        
        # Retry search
        docs, _ = hybrid_search(new_query, k=6)
        reranked = rerank_documents(new_query, docs, top_n=3)
        top_docs = [doc for doc, _ in reranked]
        
        # Check relevance again
        is_relevant, _ = check_relevance(new_query, top_docs)
        if not is_relevant:
            if debug: print("    Still not relevant. Giving up.")
            return {
                "answer": "I don't have information about this topic in the documentation.",
                "debug": debug_info
            }
    
    if debug: print(f"    Relevant: {is_relevant}")
    
    # Stage 5: Generate Answer
    if debug: print("[5] Generating answer...")
    answer = generate_answer(query, top_docs)
    
    # Stage 6: Grounding Check
    if debug: print("[6] Checking grounding...")
    is_grounded, _ = check_grounding(answer, top_docs)
    debug_info["is_grounded"] = is_grounded
    if debug: print(f"    Grounded: {is_grounded}")
    
    # Get sources
    sources = list(set([doc.metadata.get("source", "unknown") for doc in top_docs]))
    
    return {
        "answer": answer,
        "sources": sources,
        "is_grounded": is_grounded,
        "debug": debug_info
    }

## Test the Production Pipeline

In [None]:
# Test with debug output
result = production_rag("How do I authenticate API requests?", debug=True)

print("\n" + "="*50)
print("ANSWER:")
print("="*50)
print(result["answer"])

In [None]:
# Test multiple questions
test_questions = [
    "How do I create a payment?",
    "What error codes can the API return?",
    "How do I integrate with Stripe?"  # Not in docs!
]

for q in test_questions:
    print(f"\nQ: {q}")
    print("-" * 40)
    result = production_rag(q, debug=False)
    print(f"A: {result['answer'][:300]}...")
    print(f"Grounded: {result.get('is_grounded', 'N/A')}")

## Compare: Simple vs Production RAG

In [None]:
def simple_rag(query):
    """The simple approach from Part 1."""
    docs = vector_store.similarity_search(query, k=3)
    return generate_answer(query, docs)

# Compare on a tricky question
query = "How do I get a bearer token for authentication?"

print(f"Query: {query}\n")

print("=== Simple RAG ===")
simple_answer = simple_rag(query)
print(simple_answer[:300])

print("\n=== Production RAG ===")
prod_result = production_rag(query, debug=True)
print(f"\nAnswer: {prod_result['answer'][:300]}")

## Summary

**What we built:**

| Stage | What it does | Why it matters |
|-------|--------------|----------------|
| Hybrid Search | Vector + BM25 | Catches both semantic and exact matches |
| Reranking | Score relevance | Filters out noise |
| Relevance Check | Verify we can answer | Prevents confident wrong answers |
| Query Rewrite | Retry with better terms | Handles terminology mismatch |
| Grounding Check | Verify answer is supported | Prevents hallucinations |

**Benchmark improvement:**
- Simple RAG: 0.30
- Production RAG: 0.82-0.86

**Next notebook:** Evaluation and debugging techniques.

In [None]:
print("✅ Part 2 complete!")
print("Next: Open 03_evaluation.ipynb")