### Production Ready RAG : A Conceptual deep dive

What we are going to do:
- Compression saves tokens → You'll see character counts drop
- Routing saves money → Simple queries use free model
- Caching speeds things up → Second query is instant
- Metrics make it visible → You can see everything happening

In [1]:
import os
import time
import json
import hashlib
from datetime import datetime
from dotenv import load_dotenv

from langchain_groq import ChatGroq
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

load_dotenv()

groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
    raise ValueError("⚠️ Add GROQ_API_KEY to .env file!")

print("✅ Setup complete!")

✅ Setup complete!


### Loading the docs

In [2]:
# Load PDFs
docs = []
for file in os.listdir("./policies"):
    if file.endswith(".pdf"):
        loader = PyPDFLoader(f"./policies/{file}")
        docs.extend(loader.load())

print(f"✅ Loaded {len(docs)} pages")

# Split into chunks
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(docs)

print(f"✅ Created {len(chunks)} chunks")

✅ Loaded 15 pages
✅ Created 37 chunks


### Creating a vector store

In [3]:
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectordb = Chroma.from_documents(chunks, embeddings, persist_directory="./chroma_prod")
retriever = vectordb.as_retriever(
    search_type="mmr",  # More diverse results
    search_kwargs={"k": 1, "fetch_k": 10}  
)

print("✅ Vector store ready!")

  embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")


✅ Vector store ready!


### The Models to be set up...

In [8]:
# TWO MODELS: Simple (free) and Complex (paid)
SIMPLE_MODEL = ChatGroq(model="llama-3.1-8b-instant", api_key=groq_api_key, temperature=0)
COMPLEX_MODEL = ChatGroq(model="llama-3.3-70b-versatile", api_key=groq_api_key, temperature=0)

print("✅ Models ready:")
print("  🟢 Simple (free): llama-3.1-8b-instant")
print("  🔴 Complex (fast): llama-3.3-70b-versatile")

# COMPRESSION: Extract only relevant parts from retrieved chunks
compressor = LLMChainExtractor.from_llm(SIMPLE_MODEL)
compressed_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

print("✅ Compression enabled!")

# Quick test
test_docs_normal = retriever.get_relevant_documents("What is casual leave?")
test_docs_compressed = compressed_retriever.get_relevant_documents("What is casual leave?")
chars_normal = sum([len(d.page_content) for d in test_docs_normal])
chars_compressed = sum([len(d.page_content) for d in test_docs_compressed])

print(f"\n📊 Compression test:")
print(f"  Before: {chars_normal} chars")
print(f"  After: {chars_compressed} chars")
print(f"  Saved: {((chars_normal - chars_compressed) / chars_normal * 100):.0f}%")

✅ Models ready:
  🟢 Simple (free): llama-3.1-8b-instant
  🔴 Complex (fast): llama-3.3-70b-versatile
✅ Compression enabled!

📊 Compression test:
  Before: 939 chars
  After: 741 chars
  Saved: 21%


### Model Routing Function

In [9]:
def route_to_model(query: str):
    """
    Route query to appropriate model based on complexity
    
    COMPLEX = comparisons, analysis, recommendations
    SIMPLE = definitions, single facts
    """
    complex_keywords = ["compare", "difference", "recommend", "analyze", 
                        "evaluate", "best", "versus", "pros and cons"]
    
    query_lower = query.lower()
    
    # Check if any complex keyword in query
    for keyword in complex_keywords:
        if keyword in query_lower:
            print("  🔴 Using COMPLEX model")
            return COMPLEX_MODEL
    
    print("  🟢 Using SIMPLE model")
    return SIMPLE_MODEL

# Test it
print("🧪 Testing routing:\n")
print("Query: 'What is casual leave?'")
route_to_model("What is casual leave?")

print("\nQuery: 'Compare casual leave vs earned leave'")
route_to_model("Compare casual leave vs earned leave")

🧪 Testing routing:

Query: 'What is casual leave?'
  🟢 Using SIMPLE model

Query: 'Compare casual leave vs earned leave'
  🔴 Using COMPLEX model


ChatGroq(client=<groq.resources.chat.completions.Completions object at 0x000002504B12CB60>, async_client=<groq.resources.chat.completions.AsyncCompletions object at 0x000002504B12C680>, model_name='llama-3.3-70b-versatile', temperature=1e-08, model_kwargs={}, groq_api_key=SecretStr('**********'))

### Simple Cache + Metrics Tracker

In [10]:
class ProductionRAG:
    """All-in-one: Cache + Metrics + Routing + Compression"""
    
    def __init__(self):
        self.cache = {}  # Store answers
        self.metrics = []  # Store query stats
        
    def _cache_key(self, query):
        """Create unique key for query"""
        return hashlib.md5(query.lower().strip().encode()).hexdigest()
    
    def ask(self, query: str):
        """Main function: handles everything!"""
        start_time = time.time()
        
        # 1. CHECK CACHE
        cache_key = self._cache_key(query)
        if cache_key in self.cache:
            print("  ✅ Cache HIT (instant!)")
            answer = self.cache[cache_key]
            latency = time.time() - start_time
            
            # Log metric
            self.metrics.append({
                "query": query[:50],
                "latency": round(latency, 3),
                "source": "cache"
            })
            
            return answer
        
        print("  ❌ Cache MISS (calling LLM...)")
        
        # 2. ROUTE TO MODEL
        model = route_to_model(query)
        model_name = "simple" if model == SIMPLE_MODEL else "complex"
        
        # 3. CREATE QA CHAIN (with compression!)
        qa_chain = RetrievalQA.from_chain_type(
            llm=model,
            retriever=compressed_retriever,  # Using compression!
            return_source_documents=True
        )
        
        # 4. GET ANSWER
        result = qa_chain.invoke({"query": query})
        answer = result["result"]
        sources = [d.metadata.get("source") for d in result["source_documents"]]
        
        # 5. CACHE IT
        self.cache[cache_key] = answer
        
        # 6. LOG METRICS
        latency = time.time() - start_time
        self.metrics.append({
            "query": query[:50],
            "latency": round(latency, 2),
            "source": model_name,
            "docs": sources
        })
        
        return answer
    
    def show_stats(self):
        """Display performance stats"""
        if not self.metrics:
            print("No queries yet!")
            return
        
        total = len(self.metrics)
        cache_hits = sum(1 for m in self.metrics if m["source"] == "cache")
        avg_latency = sum(m["latency"] for m in self.metrics) / total
        
        print("\n" + "="*50)
        print("📊 PERFORMANCE STATS")
        print("="*50)
        print(f"Total Queries: {total}")
        print(f"Cache Hits: {cache_hits} ({cache_hits/total*100:.0f}%)")
        print(f"Avg Latency: {avg_latency:.2f}s")
        print("="*50 + "\n")

# Initialize
rag = ProductionRAG()

print("✅ Production RAG initialized!")
print("   - Cache ready")
print("   - Metrics ready")
print("   - Routing ready")
print("   - Compression ready")

✅ Production RAG initialized!
   - Cache ready
   - Metrics ready
   - Routing ready
   - Compression ready


### Testing...

In [11]:
print("🧪 TESTING ALL FEATURES\n")
print("="*60)

# Test 1: First query (no cache, simple model)
print("\nFirst query (simple):")
print("Query: 'What is casual leave?'")
answer1 = rag.ask("What is casual leave?")
print(f"Answer: {answer1[:100]}...")

# Test 2: Same query (cache hit!)
print("\nSame query again (should be instant):")
print("Query: 'What is casual leave?'")
answer2 = rag.ask("What is casual leave?")
print(f"Answer: {answer2[:100]}...")

# Test 3: Complex query (uses complex model)
print("\nComplex query:")
print("Query: 'Compare casual leave and sick leave'")
answer3 = rag.ask("Compare casual leave and sick leave")
print(f"Answer: {answer3[:100]}...")

# Show stats
rag.show_stats()

print("✅ All features working!")

🧪 TESTING ALL FEATURES


First query (simple):
Query: 'What is casual leave?'
  ❌ Cache MISS (calling LLM...)
  🟢 Using SIMPLE model
Answer: Casual leave is a type of leave that employees can take for personal or miscellaneous reasons, such ...

Same query again (should be instant):
Query: 'What is casual leave?'
  ✅ Cache HIT (instant!)
Answer: Casual leave is a type of leave that employees can take for personal or miscellaneous reasons, such ...

Complex query:
Query: 'Compare casual leave and sick leave'
  ❌ Cache MISS (calling LLM...)
  🔴 Using COMPLEX model
Answer: Based on the given context, here's a comparison between casual leave and sick leave:

1. **Minimum d...

📊 PERFORMANCE STATS
Total Queries: 3
Cache Hits: 1 (33%)
Avg Latency: 0.44s

✅ All features working!


In [12]:
print("\n" + "="*60)
print("\nCommands:")
print("  'stats' - Show performance stats")
print("  'exit' - Quit and see final summary")
print("="*60 + "\n")

while True:
    query = input("You: ").strip()
    
    if query.lower() == "exit":
        print("\n👋 Goodbye!\n")
        rag.show_stats()
        break
    
    if query.lower() == "stats":
        rag.show_stats()
        continue
    
    if not query:
        continue
    
    try:
        print()  # Blank line for routing message
        answer = rag.ask(query)
        print(f"\n🤖 Assistant: {answer}\n")
        
    except Exception as e:
        print(f"❌ Error: {e}\n")



Commands:
  'stats' - Show performance stats
  'exit' - Quit and see final summary



You:  What are POSH Policy Guidelines



  ❌ Cache MISS (calling LLM...)
  🟢 Using SIMPLE model

🤖 Assistant: Based on the provided context, the POSH Policy Guidelines are as follows:

1. **Purpose**: To create and maintain a safe work environment, free from sexual harassment and discrimination for all employees.
2. **Scope**: The policy applies to all employees of BBIL SYSTEMS, including those working in company premises or elsewhere in India or abroad, and extends to clients, vendors, and contractors.
3. **Applicability**: The policy applies to all employees of BBIL SYSTEMS, including those on permanent, temporary, contracted, or retainer ship basis, part-time basis, etc.
4. **Definition of Employee**: An employee of BBIL SYSTEMS includes anyone carrying out work on behalf of the company, whether directly or indirectly, or through a vendor organization.

These guidelines are based on the "The Sexual harassment of women at workplace (prevention, prohibition & redressal) Act, 2013" and aim to establish a zero-tolerance attit

You:  What is the job of Internal Committee? How do they work?



  ❌ Cache MISS (calling LLM...)
  🟢 Using SIMPLE model

🤖 Assistant: According to the given context, the job of the Internal Committee is to deal with complaints of Sexual Harassment in a confidential and urgent manner. They are responsible for conducting an official internal enquiry.

Here's a breakdown of their roles and responsibilities:

1. **Composition**: The Internal Committee consists of:
	* 1 Presiding Officer (1 member)
	* 3 Internal Members
	* 1 External Member (an NGO or Legal expert)
2. **Responsibilities**: Within 3 working days of receiving a complaint, the Internal Committee shall:
	* Commence an official internal enquiry
	* Handle the complaint with utmost confidentiality and urgency
3. **Composition of the Committee**: The committee is composed of a mix of internal and external members to ensure a fair and impartial investigation.

The Internal Committee's primary goal is to investigate complaints of Sexual Harassment, ensure a fair and confidential process, and take

You:  How many casual leaves I get during a year



  ❌ Cache MISS (calling LLM...)
  🟢 Using SIMPLE model

🤖 Assistant: Unfortunately, the given context doesn't mention the number of casual leaves you get per year. It only mentions the minimum and maximum duration of casual leave (0.5 to 3 days) and that there are no carry-forwards.



You:  Any idea on how my annual salary increses?



  ❌ Cache MISS (calling LLM...)
  🟢 Using SIMPLE model

🤖 Assistant: Based on the given context, it seems that your annual salary increase would be determined by two main factors:

1. Increase in the cost of living: This suggests that your salary may increase to keep pace with inflation and the rising cost of living.
2. The Company's economic situation: This implies that the company's financial health and performance may also impact your salary increase.

However, without more specific information on how these factors are applied to determine salary increases, it's difficult to provide a detailed answer. 

In general, it seems that your salary increase would be influenced by a combination of external (cost of living) and internal (company's economic situation) factors.



You:  stats



📊 PERFORMANCE STATS
Total Queries: 7
Cache Hits: 1 (14%)
Avg Latency: 0.56s



You:  exit



👋 Goodbye!


📊 PERFORMANCE STATS
Total Queries: 7
Cache Hits: 1 (14%)
Avg Latency: 0.56s



In [13]:
"""
Save all metrics to file for later analysis
"""

def save_metrics():
    """Save performance data to JSON"""
    
    if not rag.metrics:
        print("⚠️ No metrics to save. Try asking some questions first!")
        return
    
    # Calculate summary stats
    total = len(rag.metrics)
    cache_hits = sum(1 for m in rag.metrics if m["source"] == "cache")
    avg_latency = sum(m["latency"] for m in rag.metrics) / total
    
    data = {
        "timestamp": datetime.now().isoformat(),
        "summary": {
            "total_queries": total,
            "cache_hits": cache_hits,
            "cache_hit_rate": f"{cache_hits/total*100:.1f}%",
            "avg_latency": f"{avg_latency:.2f}s"
        },
        "all_queries": rag.metrics
    }
    
    # Save to file
    filename = f"rag_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(filename, "w") as f:
        json.dump(data, f, indent=2)
    
    print(f"✅ Metrics saved to: {filename}")
    print(f"   Total queries: {total}")
    print(f"   Cache hit rate: {cache_hits/total*100:.0f}%")
    
    return filename

# Save metrics
save_metrics()

✅ Metrics saved to: rag_metrics_20251013_165043.json
   Total queries: 7
   Cache hit rate: 14%


'rag_metrics_20251013_165043.json'