# Session 2: Embeddings and RAG Assignment

## Overview
This notebook implements a Retrieval Augmented Generation (RAG) system using Python, OpenAI embeddings, and vector similarity search.

## Tasks
1. **Imports and Utilities** - Set up required libraries and helper functions
2. **Documents** - Load and process text documents
3. **Embeddings and Vectors** - Generate embeddings and create vector database
4. **Prompts** - Create effective prompts for the RAG system
5. **Retrieval Augmented Generation** - Implement the complete RAG pipeline

## Activity #1: Augment RAG
Enhance the RAG system with additional features like PDF support, metadata, or different embedding models.


## Task 1: Imports and Utilities


In [None]:
# Core libraries
import os
import json
import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import numpy as np
from pathlib import Path

# OpenAI and embeddings
import openai
from openai import AsyncOpenAI

# Vector operations
from sklearn.metrics.pairwise import cosine_similarity
import pickle

# Document processing
import PyPDF2
import requests
from bs4 import BeautifulSoup

# Set up OpenAI client
client = AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY'))

print("✅ Imports completed successfully!")


### Question 1: What is the purpose of using async/await in the OpenAI client setup?

**Answer:** Async/await allows for non-blocking operations when making API calls to OpenAI. This means that when generating embeddings for multiple documents, the code can process multiple requests concurrently rather than waiting for each one to complete sequentially. This significantly improves performance and reduces total processing time, especially when dealing with large document collections.


## Task 2: Documents


In [None]:
@dataclass
class Document:
    """Represents a document with content and metadata."""
    content: str
    source: str
    page_number: Optional[int] = None
    file_type: str = "text"
    
    def __post_init__(self):
        # Clean and normalize content
        self.content = self.content.strip()
        if len(self.content) < 10:
            raise ValueError("Document content too short")

def load_text_file(file_path: str) -> Document:
    """Load a text file and return a Document object."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        return Document(
            content=content,
            source=file_path,
            file_type="text"
        )
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None

def load_pdf_file(file_path: str) -> List[Document]:
    """Load a PDF file and return a list of Document objects (one per page)."""
    documents = []
    try:
        with open(file_path, 'rb') as f:
            pdf_reader = PyPDF2.PdfReader(f)
            for page_num, page in enumerate(pdf_reader.pages, 1):
                content = page.extract_text().strip()
                if content and len(content) > 10:
                    documents.append(Document(
                        content=content,
                        source=file_path,
                        page_number=page_num,
                        file_type="pdf"
                    ))
    except Exception as e:
        print(f"Error loading PDF {file_path}: {e}")
    return documents

print("✅ Document classes and loading functions created!")


### Question 2: Why do we use a dataclass for the Document class instead of a regular class?

**Answer:** Dataclasses provide several advantages: 1) Automatic generation of `__init__`, `__repr__`, and `__eq__` methods, reducing boilerplate code; 2) Type hints are enforced and provide better IDE support; 3) Cleaner, more readable code structure; 4) Built-in support for immutability with `frozen=True` if needed; 5) Better integration with serialization libraries. For a simple data structure like Document, dataclasses offer the perfect balance of functionality and simplicity.


## Task 3: Embeddings and Vectors


In [None]:
async def generate_embedding(text: str, model: str = "text-embedding-3-small") -> List[float]:
    """Generate embedding for a single text using OpenAI API."""
    try:
        response = await client.embeddings.create(
            model=model,
            input=text
        )
        return response.data[0].embedding
    except Exception as e:
        print(f"Error generating embedding: {e}")
        return None

async def generate_embeddings_batch(documents: List[Document], model: str = "text-embedding-3-small") -> List[List[float]]:
    """Generate embeddings for multiple documents concurrently."""
    tasks = [generate_embedding(doc.content, model) for doc in documents]
    embeddings = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Filter out failed embeddings
    valid_embeddings = [emb for emb in embeddings if isinstance(emb, list)]
    print(f"Generated {len(valid_embeddings)} embeddings out of {len(documents)} documents")
    return valid_embeddings

class VectorDatabase:
    """Simple in-memory vector database for storing and searching embeddings."""
    
    def __init__(self):
        self.documents: List[Document] = []
        self.embeddings: List[List[float]] = []
        self.metadata: List[Dict[str, Any]] = []
    
    def add_documents(self, documents: List[Document], embeddings: List[List[float]]):
        """Add documents and their embeddings to the database."""
        self.documents.extend(documents)
        self.embeddings.extend(embeddings)
        
        # Create metadata for each document
        for doc in documents:
            self.metadata.append({
                "source": doc.source,
                "page_number": doc.page_number,
                "file_type": doc.file_type,
                "content_length": len(doc.content)
            })
    
    def search(self, query_embedding: List[float], top_k: int = 5) -> List[Dict[str, Any]]:
        """Search for similar documents using cosine similarity."""
        if not self.embeddings:
            return []
        
        # Calculate cosine similarities
        similarities = cosine_similarity([query_embedding], self.embeddings)[0]
        
        # Get top-k results
        top_indices = np.argsort(similarities)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            results.append({
                "document": self.documents[idx],
                "similarity": float(similarities[idx]),
                "metadata": self.metadata[idx]
            })
        
        return results

print("✅ Embedding functions and VectorDatabase class created!")


### Question 3: What are the advantages of using cosine similarity for document retrieval?

**Answer:** Cosine similarity is ideal for document retrieval because: 1) It measures the angle between vectors rather than magnitude, making it robust to document length differences; 2) It's normalized between -1 and 1, providing consistent similarity scores; 3) It's computationally efficient for high-dimensional vectors; 4) It works well with text embeddings where the direction (semantic meaning) matters more than the magnitude; 5) It's less sensitive to document size variations, so short and long documents can be compared fairly.


## Task 4: Prompts


In [None]:
def create_rag_prompt(query: str, context_documents: List[Document], system_prompt: str = None) -> str:
    """Create a comprehensive prompt for RAG with context and query."""
    
    if system_prompt is None:
        system_prompt = """You are a helpful AI assistant that answers questions based on the provided context. 
Use only the information from the context documents to answer questions. If the context doesn't contain 
enough information to answer the question, say so clearly. Always cite the source of your information 
when possible."""
    
    # Build context section
    context_sections = []
    for i, doc in enumerate(context_documents, 1):
        source_info = f"Source: {doc.source}"
        if doc.page_number:
            source_info += f", Page {doc.page_number}"
        if doc.file_type:
            source_info += f" (Type: {doc.file_type})"
        
        context_sections.append(f"""
--- Context Document {i} ---
{source_info}
Content: {doc.content[:1000]}{'...' if len(doc.content) > 1000 else ''}
""")
    
    context = "\n".join(context_sections)
    
    # Create the full prompt
    prompt = f"""{system_prompt}

Context Documents:
{context}

Question: {query}

Answer: """
    
    return prompt

print("✅ Prompt creation functions implemented!")


### Question 4: Why is it important to include source information in RAG prompts?

**Answer:** Including source information in RAG prompts is crucial for several reasons: 1) **Transparency** - Users can verify the information and understand where it came from; 2) **Traceability** - Enables fact-checking and validation of claims; 3) **Trust** - Users are more likely to trust responses when they know the source; 4) **Context** - Source metadata (like page numbers, file types) provides additional context that can improve answer quality; 5) **Debugging** - Helps identify which documents are most relevant for specific queries; 6) **Professionalism** - Makes the system more suitable for production use where accountability matters.


## Task 5: Retrieval Augmented Generation


In [None]:
class RAGSystem:
    """Complete RAG system that combines retrieval and generation."""
    
    def __init__(self, embedding_model: str = "text-embedding-3-small", llm_model: str = "gpt-4o-mini"):
        self.embedding_model = embedding_model
        self.llm_model = llm_model
        self.vector_db = VectorDatabase()
        self.client = AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY'))
    
    async def add_documents(self, documents: List[Document]):
        """Add documents to the RAG system."""
        print(f"Generating embeddings for {len(documents)} documents...")
        embeddings = await generate_embeddings_batch(documents, self.embedding_model)
        
        # Filter documents to match successful embeddings
        valid_docs = [doc for i, doc in enumerate(documents) if i < len(embeddings) and embeddings[i] is not None]
        valid_embeddings = [emb for emb in embeddings if emb is not None]
        
        self.vector_db.add_documents(valid_docs, valid_embeddings)
        print(f"Added {len(valid_docs)} documents to the vector database")
    
    async def query(self, question: str, top_k: int = 3, include_sources: bool = True) -> Dict[str, Any]:
        """Query the RAG system with a question."""
        # Generate embedding for the question
        query_embedding = await generate_embedding(question, self.embedding_model)
        if not query_embedding:
            return {"error": "Failed to generate query embedding"}
        
        # Retrieve relevant documents
        search_results = self.vector_db.search(query_embedding, top_k=top_k)
        
        if not search_results:
            return {"answer": "No relevant documents found.", "sources": []}
        
        # Extract documents and create context
        context_documents = [result["document"] for result in search_results]
        
        # Create RAG prompt
        prompt = create_rag_prompt(question, context_documents)
        
        # Generate answer using LLM
        try:
            response = await self.client.chat.completions.create(
                model=self.llm_model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=500,
                temperature=0.1
            )
            
            answer = response.choices[0].message.content
            
            result = {"answer": answer}
            
            if include_sources:
                result["sources"] = [
                    {
                        "source": doc.source,
                        "page_number": doc.page_number,
                        "file_type": doc.file_type,
                        "similarity": search_results[i]["similarity"]
                    }
                    for i, doc in enumerate(context_documents)
                ]
            
            return result
            
        except Exception as e:
            return {"error": f"Failed to generate answer: {str(e)}"}

print("✅ Complete RAG system implemented!")
