# RAG Engineering Demo Notebook

A complete, runnable demonstration of production RAG patterns:
- Document ingestion and normalization
- Semantic chunking
- Embedding generation
- Hybrid retrieval (vector + lexical)
- Reranking
- Context assembly
- Answer verification


## Setup

Install dependencies (uncomment for Colab):


In [None]:
# !pip install chromadb sentence-transformers tiktoken numpy


In [None]:
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import tiktoken
import numpy as np
import re
from typing import List, Dict
import hashlib
from datetime import datetime


## 1. Sample Corpus

Enterprise documents for demonstration:


In [None]:
docs = [
    {
        "doc_id": "contract_001",
        "title": "Payment Terms",
        "text": """Payment Terms and Conditions

The customer must pay within 30 days of invoice date. Late fees apply after 45 days at a rate of 1.5% per month.

Payment Methods:
- Wire transfer (preferred)
- ACH direct debit
- Credit card (3% processing fee applies)

Early payment discount: 2% discount for payment within 10 days.""",
    },
    {
        "doc_id": "policy_legal_01",
        "title": "Data Retention Policy",
        "text": """Data Retention Policy

Customer data is retained for 7 years for audit and compliance purposes. This includes:
- Transaction records
- Communication logs
- Account information

Data can be deleted upon written request, subject to legal retention requirements.
Backup data is retained for an additional 90 days after primary deletion.""",
    },
    {
        "doc_id": "faq_01",
        "title": "Password Reset FAQ",
        "text": """Frequently Asked Questions - Account Security

Q: How can I reset my password?
A: Use the self-service portal at /reset. You will receive a verification email within 5 minutes.

Q: What are the password requirements?
A: Minimum 12 characters, including uppercase, lowercase, number, and special character.

Q: How often should I change my password?
A: We recommend changing passwords every 90 days.""",
    },
    {
        "doc_id": "sla_001",
        "title": "Service Level Agreement",
        "text": """Service Level Agreement (SLA)

Uptime Guarantee: 99.9% availability measured monthly.

Response Times:
- Critical issues: 15 minutes initial response
- High priority: 1 hour initial response
- Medium priority: 4 hours initial response
- Low priority: 24 hours initial response

Service Credits:
- Below 99.9%: 10% credit
- Below 99.5%: 25% credit
- Below 99.0%: 50% credit""",
    },
    {
        "doc_id": "product_spec_001",
        "title": "API Rate Limits",
        "text": """API Rate Limits and Quotas

Standard Tier:
- 1000 requests per minute
- 100,000 requests per day
- Maximum payload: 10MB

Enterprise Tier:
- 10,000 requests per minute
- Unlimited daily requests
- Maximum payload: 100MB

Rate limit headers are included in all responses: X-RateLimit-Remaining, X-RateLimit-Reset.""",
    },
]

print(f"Loaded {len(docs)} documents")


## 2. Document Normalization

Clean input beats clever retrieval:


In [None]:
BOILERPLATE_PATTERNS = [
    r"(?i)confidential\s+information",
    r"(?i)all\s+rights\s+reserved",
    r"(?i)page\s+\d+\s+of\s+\d+",
]

def strip_boilerplate(text: str) -> str:
    """Remove common boilerplate text."""
    for pat in BOILERPLATE_PATTERNS:
        text = re.sub(pat, " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()

def normalize_document(doc: dict, tenant_id: str = "demo") -> dict:
    """Full document normalization pipeline."""
    cleaned = strip_boilerplate(doc["text"])
    doc_hash = hashlib.md5(cleaned.encode()).hexdigest()
    
    return {
        "doc_id": doc["doc_id"],
        "title": doc.get("title", ""),
        "text": cleaned,
        "metadata": {
            "content_hash": doc_hash,
            "tenant_id": tenant_id,
            "ingested_at": datetime.utcnow().isoformat() + "Z",
            "char_count": len(cleaned),
        }
    }

# Normalize all documents
normalized_docs = [normalize_document(doc) for doc in docs]
print(f"Normalized {len(normalized_docs)} documents")
print(f"Sample metadata: {normalized_docs[0]['metadata']}")


## 3. Semantic Chunking

Chunking determines what the retriever can find:


In [None]:
enc = tiktoken.get_encoding("cl100k_base")

def num_tokens(text: str) -> int:
    """Count tokens using cl100k_base encoding."""
    return len(enc.encode(text))

def semantic_chunk(text: str, max_tokens: int = 300, overlap_tokens: int = 50) -> List[str]:
    """
    Semantic chunking with overlap.
    
    Rules:
    - Aim for 150-300 words per chunk
    - Use 5-15% overlap
    - Preserve structure (headings, lists)
    """
    if num_tokens(text) <= max_tokens:
        return [text]
    
    paragraphs = text.split("\n\n")
    chunks = []
    current = []
    current_tokens = 0
    
    for para in paragraphs:
        para_tokens = num_tokens(para)
        
        if current_tokens + para_tokens > max_tokens and current:
            chunks.append("\n\n".join(current))
            # Keep overlap
            overlap = []
            overlap_tok = 0
            for p in reversed(current):
                overlap.insert(0, p)
                overlap_tok += num_tokens(p)
                if overlap_tok >= overlap_tokens:
                    break
            current = overlap
            current_tokens = sum(num_tokens(p) for p in current)
        
        current.append(para)
        current_tokens += para_tokens
    
    if current:
        chunks.append("\n\n".join(current))
    
    return chunks

# Chunk all documents
all_chunks = []
for doc in normalized_docs:
    text_chunks = semantic_chunk(doc["text"])
    for i, chunk_text in enumerate(text_chunks):
        all_chunks.append({
            "id": f"{doc['doc_id']}#chunk_{i}",
            "doc_id": doc["doc_id"],
            "title": doc["title"],
            "text": chunk_text,
            "token_count": num_tokens(chunk_text),
            **doc["metadata"]
        })

print(f"Created {len(all_chunks)} chunks from {len(normalized_docs)} documents")
print(f"\nChunk token distribution:")
token_counts = [c["token_count"] for c in all_chunks]
print(f"  Min: {min(token_counts)}, Max: {max(token_counts)}, Avg: {np.mean(token_counts):.1f}")


## 4. Embedding Generation

Version embeddings and normalize vectors:


In [None]:
# Embedding configuration - version this with your index!
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
EMBEDDING_VERSION = "2025-01-01"

print(f"Loading embedding model: {EMBEDDING_MODEL}")
embedder = SentenceTransformer(EMBEDDING_MODEL)

def embed_batch(texts: List[str]) -> np.ndarray:
    """Embed texts with normalization for cosine similarity."""
    vectors = embedder.encode(texts, show_progress_bar=False)
    vectors = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)
    return vectors

# Generate embeddings
chunk_texts = [c["text"] for c in all_chunks]
chunk_embeddings = embed_batch(chunk_texts)

print(f"Generated {len(chunk_embeddings)} embeddings with dimension {chunk_embeddings.shape[1]}")


## 5. Build Vector Index with ChromaDB


In [None]:
# Create ephemeral Chroma client
client = chromadb.EphemeralClient(Settings(anonymized_telemetry=False))

collection = client.create_collection(
    name="demo_rag",
    metadata={
        "embedding_model": EMBEDDING_MODEL,
        "embedding_version": EMBEDDING_VERSION,
        "hnsw:space": "cosine"
    }
)

collection.add(
    ids=[c["id"] for c in all_chunks],
    documents=[c["text"] for c in all_chunks],
    metadatas=[{"doc_id": c["doc_id"], "title": c["title"]} for c in all_chunks],
    embeddings=chunk_embeddings.tolist()
)

print(f"Added {collection.count()} chunks to vector index")


## 6. Hybrid Retrieval

Combine vector search with lexical scoring:


In [None]:
def lexical_score(query: str, text: str) -> float:
    """Simple lexical overlap score for keyword-heavy queries."""
    q_terms = set(query.lower().split())
    t_terms = set(text.lower().split())
    overlap = len(q_terms & t_terms)
    return overlap / (len(q_terms) + 1e-6)

def hybrid_retrieve(query: str, top_k: int = 5, vector_weight: float = 0.7, lexical_weight: float = 0.3) -> List[Dict]:
    """Hybrid retrieval with score fusion: score = w_vec * s_vec + w_lexical * s_lexical"""
    q_vec = embed_batch([query])[0].tolist()
    results = collection.query(
        query_embeddings=[q_vec],
        n_results=top_k * 2,
        include=["documents", "metadatas", "distances"]
    )
    
    scored = []
    for i in range(len(results["ids"][0])):
        doc_text = results["documents"][0][i]
        vec_score = 1 - (results["distances"][0][i] / 2)
        lex_score = lexical_score(query, doc_text)
        combined = vector_weight * vec_score + lexical_weight * lex_score
        
        scored.append({
            "id": results["ids"][0][i],
            "doc_id": results["metadatas"][0][i]["doc_id"],
            "title": results["metadatas"][0][i]["title"],
            "text": doc_text,
            "vector_score": vec_score,
            "lexical_score": lex_score,
            "combined_score": combined,
        })
    
    scored.sort(key=lambda x: x["combined_score"], reverse=True)
    return scored[:top_k]

# Test hybrid retrieval
test_query = "what are the payment terms?"
results = hybrid_retrieve(test_query, top_k=3)

print(f"Query: '{test_query}'\n")
for i, r in enumerate(results):
    print(f"{i+1}. [{r['doc_id']}] {r['title']}")
    print(f"   Score: {r['combined_score']:.3f} (vec: {r['vector_score']:.3f}, lex: {r['lexical_score']:.3f})")
    print(f"   Text: {r['text'][:100]}...\n")


## 7. Reranking

Reranking is cost-effective accuracy applied to top candidates:


In [None]:
def simple_rerank(query: str, candidates: List[Dict]) -> List[Dict]:
    """Simple reranking using embedding similarity."""
    if not candidates:
        return []
    
    q_vec = embed_batch([query])[0]
    reranked = []
    
    for c in candidates:
        c_vec = embed_batch([c["text"]])[0]
        rerank_score = float(np.dot(q_vec, c_vec))
        final_score = 0.4 * c["combined_score"] + 0.6 * rerank_score
        
        reranked.append({**c, "rerank_score": rerank_score, "final_score": final_score})
    
    reranked.sort(key=lambda x: x["final_score"], reverse=True)
    return reranked

# Test reranking
test_query = "how long is customer data retained?"
candidates = hybrid_retrieve(test_query, top_k=5)
reranked = simple_rerank(test_query, candidates)

print(f"Query: '{test_query}'\n")
print("After reranking:")
for i, r in enumerate(reranked[:3]):
    print(f"{i+1}. [{r['doc_id']}] {r['title']}")
    print(f"   Final: {r['final_score']:.3f} (rerank: {r['rerank_score']:.3f})")


## 8. Context Assembly & Prompt Building

Prepare context for LLM with source citations:


In [None]:
def build_context(chunks: List[Dict], max_tokens: int = 2000) -> str:
    """Assemble context from chunks with headers."""
    blocks = []
    total_tokens = 0
    
    for i, c in enumerate(chunks):
        header = f"[Source {i+1}] doc={c['doc_id']} | score: {c['final_score']:.2f}"
        if c.get("title"):
            header += f"\nTitle: {c['title']}"
        
        block = f"{header}\n\n{c['text']}"
        block_tokens = num_tokens(block)
        
        if total_tokens + block_tokens > max_tokens:
            break
        
        blocks.append(block)
        total_tokens += block_tokens
    
    return "\n\n---\n\n".join(blocks)

GROUNDED_PROMPT = """You are an enterprise assistant. Use ONLY the context below to answer.

Context:
{context}

Question: {query}

Instructions:
- Answer based solely on the provided context
- If the answer is not in the context, reply: "Not found in context"
- Cite source numbers when making claims

Answer:"""

# Build context and prompt
context = build_context(reranked[:3])
prompt = GROUNDED_PROMPT.format(context=context, query=test_query)

print(f"Context ({num_tokens(context)} tokens):\n")
print(context[:500] + "...")
print(f"\n\nFull prompt: {num_tokens(prompt)} tokens")


## 9. Answer Verification

Never trust a single LLM pass - verify deterministically:


In [None]:
def verify_answer(answer: str, context: str) -> Dict:
    """Level 1: Deterministic verification - check numbers exist in context."""
    issues = []
    
    answer_numbers = set(re.findall(r'\b\d+(?:\.\d+)?%?\b', answer))
    context_numbers = set(re.findall(r'\b\d+(?:\.\d+)?%?\b', context))
    
    unsupported = answer_numbers - context_numbers
    if unsupported:
        issues.append(f"Numbers not in context: {unsupported}")
    
    return {
        "valid": len(issues) == 0,
        "issues": issues,
        "numbers_found": list(answer_numbers & context_numbers)
    }

# Test verification
test_answer = "Customer data is retained for 7 years for audit purposes."
verification = verify_answer(test_answer, context)

print(f"Answer: {test_answer}")
print(f"Verification: {verification}")

# Test with hallucinated number
bad_answer = "Customer data is retained for 10 years."
bad_verification = verify_answer(bad_answer, context)
print(f"\nBad answer: {bad_answer}")
print(f"Verification: {bad_verification}")


## 10. Full RAG Pipeline

Putting it all together:


In [None]:
def rag_pipeline(query: str, top_k: int = 3, rerank: bool = True) -> Dict:
    """Complete RAG pipeline returning prompt and sources."""
    # 1. Retrieve
    candidates = hybrid_retrieve(query, top_k=top_k * 2 if rerank else top_k)
    
    if not candidates:
        return {"prompt": None, "context": "", "sources": [], "error": "No relevant documents found"}
    
    # 2. Rerank
    if rerank:
        candidates = simple_rerank(query, candidates)[:top_k]
    
    # 3. Build context
    context = build_context(candidates)
    
    # 4. Build prompt
    prompt = GROUNDED_PROMPT.format(context=context, query=query)
    
    return {
        "prompt": prompt,
        "context": context,
        "sources": [{"doc_id": c["doc_id"], "title": c["title"], "score": c["final_score"]} for c in candidates],
        "retrieved_ids": [c["id"] for c in candidates]
    }

# Run pipeline on multiple queries
queries = [
    "What are the payment terms?",
    "How long is data retained?",
    "What are the API rate limits for enterprise?",
    "What is the uptime SLA guarantee?",
]

for q in queries:
    result = rag_pipeline(q)
    print(f"\n{'='*60}")
    print(f"Query: {q}")
    print(f"Sources: {[s['doc_id'] for s in result['sources']]}")
    print(f"Prompt tokens: {num_tokens(result['prompt'])}")


## Summary

This notebook demonstrated key production RAG components:

1. **Ingestion & Normalization** - Clean input beats clever retrieval
2. **Semantic Chunking** - Determines what the retriever can find
3. **Embedding with Versioning** - Version embeddings with your index
4. **Hybrid Retrieval** - Vector + lexical for better recall
5. **Reranking** - Cost-effective accuracy improvement
6. **Context Assembly** - Budget-aware with citations
7. **Grounded Prompts** - Prevent hallucinations
8. **Verification** - Never trust a single LLM pass

For production deployment, see the FastAPI service and Kubernetes manifests in the repository.
