# Level 3: Advanced RAG Implementation (Python)

This notebook demonstrates an advanced, modular, and best-practices implementation of a Retrieval-Augmented Generation (RAG) system for PDF documents. It uses object-oriented design, clear separation of concerns, and includes comments and documentation for advanced learners. It integrates with real services including OpenAI, ChromaDB, and Hugging Face transformers.

---

## Key Features
- **Class-based design** for extensibility and maintainability
- **Dependency injection** for easy testing and swapping components
- **Docstrings and type hints** for clarity
- **Reusable utility functions**
- **Real-world integrations** with OpenAI API, ChromaDB, and Hugging Face
- **Testable with example cases**

---

## Implementation
Below, we define the main components as classes and show how they interact.

In [None]:
import numpy as np
import os
import tempfile
from typing import List, Dict, Any, Optional, Tuple
import uuid
import warnings

# Suppress warnings for cleaner notebook output
warnings.filterwarnings('ignore')

# Install required packages if not already installed
!pip install -q transformers sentence-transformers chromadb openai tiktoken

# Import necessary libraries
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI

# Configuration class for managing settings
class Config:
    """Central configuration for the RAG system."""
    def __init__(self):
        # OpenAI settings - you would normally use environment variables
        self.openai_api_key = "your_api_key_here"  # Replace with actual key in production
        self.openai_model = "gpt-3.5-turbo"  # Or gpt-4
        
        # Embedding settings
        self.use_local_embeddings = True
        self.embedding_model = "all-MiniLM-L6-v2"  # HuggingFace model
        
        # Vector DB settings
        self.vector_db_path = tempfile.mkdtemp()  # Temporary directory for the database
        
        # Chunking settings
        self.chunk_size = 500
        self.chunk_overlap = 100
        
        # RAG settings
        self.top_k_results = 3

class PDFProcessor:
    """Extracts and chunks text from PDF files."""
    def __init__(self, config: Config):
        self.config = config
        # Initialize tokenizer for counting tokens
        self.tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
        
    def extract_text(self, pdf_text: str) -> List[Dict[str, Any]]:
        """In a real implementation, this would use PyMuPDF (fitz) to extract text.
        For this example, we'll simulate it with pre-formatted text."""
        # Simulate extraction: split by lines, each line is a page
        return [
            {"content": line, "metadata": {"page": i+1}}
            for i, line in enumerate(pdf_text.strip().split('\n'))
        ]
    
    def count_tokens(self, text: str) -> int:
        """Count the number of tokens in the text."""
        return len(self.tokenizer.encode(text))
    
    def chunk_text(self, pages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Split text into chunks based on token count."""
        chunks = []
        
        for page in pages:
            text = page["content"]
            page_num = page["metadata"]["page"]
            
            # For demo purposes, we'll use a simple approach
            # In a real implementation, we would use a more sophisticated chunking algorithm
            tokens = self.count_tokens(text)
            
            # If text is small enough, keep it as one chunk
            if tokens <= self.config.chunk_size:
                chunks.append({
                    "content": text,
                    "metadata": {"page": page_num, "chunk_id": f"page_{page_num}_chunk_1"}
                })
            else:
                # Split by sentences (simple approximation)
                sentences = text.split('. ')
                current_chunk = ""
                chunk_id = 1
                
                for sentence in sentences:
                    # Add period back if it was removed by split
                    if not sentence.endswith('.'):
                        sentence += '.'
                        
                    # Check if adding this sentence would exceed chunk size
                    potential_chunk = current_chunk + " " + sentence if current_chunk else sentence
                    if self.count_tokens(potential_chunk) <= self.config.chunk_size:
                        current_chunk = potential_chunk
                    else:
                        # Save current chunk if it's not empty
                        if current_chunk:
                            chunks.append({
                                "content": current_chunk,
                                "metadata": {"page": page_num, "chunk_id": f"page_{page_num}_chunk_{chunk_id}"}
                            })
                            chunk_id += 1
                        current_chunk = sentence
                
                # Don't forget the last chunk
                if current_chunk:
                    chunks.append({
                        "content": current_chunk,
                        "metadata": {"page": page_num, "chunk_id": f"page_{page_num}_chunk_{chunk_id}"}
                    })
        
        return chunks

class EmbeddingGenerator:
    """Generates embeddings for text chunks using HuggingFace models."""
    def __init__(self, config: Config):
        self.config = config
        # Initialize the embedding model
        self.model = SentenceTransformer(config.embedding_model)
        
    def generate(self, text: str) -> np.ndarray:
        """Generate embeddings for a single text."""
        return self.model.encode(text)
    
    def generate_batch(self, texts: List[str]) -> List[np.ndarray]:
        """Generate embeddings for a batch of texts."""
        return self.model.encode(texts)

class VectorStore:
    """Stores and retrieves chunks using ChromaDB."""
    def __init__(self, config: Config):
        self.config = config
        # Initialize ChromaDB client with persistent storage
        self.client = chromadb.PersistentClient(path=config.vector_db_path)
        
        # Create a default collection with HuggingFace embedding function
        self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name=config.embedding_model
        )
        
        # Create or get the collection
        try:
            self.collection = self.client.get_or_create_collection(
                name="pdf_chunks",
                embedding_function=self.embedding_function
            )
        except Exception as e:
            print(f"Error initializing ChromaDB: {e}")
            # Fallback to creating a new collection
            self.collection = self.client.create_collection(
                name=f"pdf_chunks_{uuid.uuid4().hex[:8]}",
                embedding_function=self.embedding_function
            )
    
    def add(self, chunks: List[Dict[str, Any]]):
        """Add chunks to the vector store."""
        if not chunks:
            return
            
        # Prepare data for insertion
        ids = []
        documents = []
        metadatas = []
        
        for i, chunk in enumerate(chunks):
            chunk_id = f"chunk_{i}_{uuid.uuid4().hex[:8]}"
            ids.append(chunk_id)
            documents.append(chunk["content"])
            metadatas.append(chunk["metadata"])
        
        # Add to collection
        self.collection.add(
            ids=ids,
            documents=documents,
            metadatas=metadatas
        )
        
    def query(self, query_text: str, top_k: int = None) -> List[Dict[str, Any]]:
        """Query the vector store for similar chunks."""
        if top_k is None:
            top_k = self.config.top_k_results
            
        try:
            results = self.collection.query(
                query_texts=[query_text],
                n_results=top_k
            )
            
            # Format results
            chunks = []
            for i in range(len(results["ids"][0])):
                chunks.append({
                    "content": results["documents"][0][i],
                    "metadata": results["metadatas"][0][i]
                })
                
            return chunks
        except Exception as e:
            print(f"Error querying vector store: {e}")
            return []

class LLMService:
    """Interface with the OpenAI API."""
    def __init__(self, config: Config):
        self.config = config
        self.client = None
        if config.openai_api_key and config.openai_api_key != "your_api_key_here":
            self.client = OpenAI(api_key=config.openai_api_key)
    
    def generate_answer(self, question: str, context: str) -> str:
        """Generate an answer using the OpenAI API."""
        # If no client is available, return a fallback answer
        if not self.client:
            return self._generate_fallback_answer(question, context)
            
        try:
            prompt = self._build_prompt(question, context)
            response = self.client.chat.completions.create(
                model=self.config.openai_model,
                messages=[
                    {"role": "system", "content": "You are a helpful assistant that answers questions based on the provided document excerpts."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=500
            )
            
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error with OpenAI API: {e}")
            return self._generate_fallback_answer(question, context)
    
    def _build_prompt(self, question: str, context: str) -> str:
        """Build a prompt for the LLM."""
        return f"""Answer the following question based ONLY on the provided context:
        
CONTEXT:
{context}

QUESTION: {question}

INSTRUCTIONS:
1. Answer the question using ONLY information from the provided context.
2. If the context doesn't contain the information needed, respond with 'I cannot answer this question based on the provided documents.'
3. Cite the specific page numbers (e.g., [Page X]).
4. Be concise and accurate.

ANSWER:"""
    
    def _generate_fallback_answer(self, question: str, context: str) -> str:
        """Generate a fallback answer when API is not available."""
        # Extract page numbers for citations
        page_citations = set()
        for line in context.split('\n'):
            if '[Page ' in line:
                try:
                    page = line.split('[Page ')[1].split(']')[0]
                    page_citations.add(page)
                except:
                    pass
                    
        page_citations_str = ", ".join([f"Page {p}" for p in sorted(page_citations)])
        
        return f"""Based on the provided context, I found some information related to your question about '{question}'.
        
{context}

The relevant information was found on {page_citations_str}.

Note: This is a fallback answer as the OpenAI API is not configured. For more accurate answers, please provide an OpenAI API key."""

class RAGPipeline:
    """Coordinates the RAG workflow."""
    def __init__(self, config: Optional[Config] = None):
        # Initialize with default config if none provided
        self.config = config or Config()
        
        # Initialize components with dependency injection
        self.processor = PDFProcessor(self.config)
        self.embedder = EmbeddingGenerator(self.config)
        self.store = VectorStore(self.config)
        self.llm = LLMService(self.config)
    
    def process_document(self, text: str) -> None:
        """Process a document and add it to the vector store."""
        print("Processing document...")
        # Extract text and create initial pages
        pages = self.processor.extract_text(text)
        
        # Split into chunks
        print("Splitting into chunks...")
        chunks = self.processor.chunk_text(pages)
        print(f"Created {len(chunks)} chunks")
        
        # Add to vector store
        print("Adding chunks to vector store...")
        self.store.add(chunks)
        print("Document processed and indexed successfully!")
    
    def answer_question(self, question: str) -> Tuple[str, List[Dict[str, Any]]]:
        """Answer a question using the RAG pipeline."""
        print(f"\nQuestion: {question}")
        print("Retrieving relevant context...")
        
        # Retrieve relevant chunks
        relevant_chunks = self.store.query(question)
        
        if not relevant_chunks:
            return "No relevant information found.", []
        
        # Format context for the LLM
        context = "\n\n".join([f"[Page {c['metadata']['page']}]: {c['content']}" for c in relevant_chunks])
        
        # Generate answer
        print("Generating answer...")
        answer = self.llm.generate_answer(question, context)
        
        return answer, relevant_chunks

# ---
# Example usage and test
config = Config()
print("Initializing RAG pipeline...")
pipeline = RAGPipeline(config)

# Sample PDF text (in a real scenario, we would extract this from a PDF)
pdf_text = """
Page 1: The sun is the center of our solar system. It provides light and heat to the planets through nuclear fusion in its core. The temperature at the core of the sun reaches about 15 million degrees Celsius.

Page 2: The Earth orbits the sun once every 365.25 days, which we call a year. This orbit is slightly elliptical rather than perfectly circular. The moon orbits the Earth approximately every 27.3 days, completing a full cycle of phases in about 29.5 days.

Page 3: Solar energy can be converted into electricity using solar panels. This renewable energy source is becoming increasingly popular as technology improves and costs decrease. Solar panels work through the photovoltaic effect, where sunlight knocks electrons free from atoms, generating electricity.
"""

# Process document
pipeline.process_document(pdf_text)

# Test with questions
questions = [
    "How does the Earth move around the sun?",
    "What is the temperature at the core of the sun?",
    "How do solar panels work?"
]

for question in questions:
    answer, chunks = pipeline.answer_question(question)
    print("\nRelevant chunks:")
    for i, chunk in enumerate(chunks, 1):
        print(f"  {i}. [Page {chunk['metadata']['page']}] {chunk['content'][:100]}...")
    print("\nAnswer:")
    print(answer)
    print("\n" + "-"*50)

# ---
# Design Rationale:
# - Each class has a single responsibility and can be swapped or extended
# - Real-world integrations: ChromaDB for vector storage, HuggingFace for embeddings, OpenAI for LLM
# - Configuration is centralized and injectable
# - The pipeline is testable and easy to maintain
# - Error handling for robustness
# - Fallbacks when external services aren't available
