In [None]:
## Read PDFs from S3 bucket using PyPDFLoader
import boto3
import tempfile
import os
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing import List, Dict, Any

class DocumentProcessor:
    """Handles loading and processing of documents from S3 using PyPDFLoader."""
    
    def __init__(self, bucket_name: str, prefix: str = "pdfs/"):
        self.bucket_name = bucket_name
        self.prefix = prefix
        self.s3_client = boto3.client('s3')
        self.documents = []
        
    def load_documents(self) -> List[Any]:
        """Load all PDF documents from S3 bucket using PyPDFLoader."""
        try:
            print(f"Loading PDFs from s3://{self.bucket_name}/{self.prefix}")
            
            # List all PDF files in S3 bucket
            response = self.s3_client.list_objects_v2(
                Bucket=self.bucket_name,
                Prefix=self.prefix
            )
            
            if 'Contents' not in response:
                print(f"No files found in s3://{self.bucket_name}/{self.prefix}")
                return []
            
            # Filter PDF files
            pdf_files = [obj['Key'] for obj in response['Contents'] 
                        if obj['Key'].endswith('.pdf')]
            
            print(f"Found {len(pdf_files)} PDF files\n")
            
            # Load each PDF file
            all_documents = []
            for pdf_key in pdf_files:
                print(f"Processing: {pdf_key}")
                
                # Download PDF to temporary file
                with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
                    tmp_path = tmp_file.name
                    self.s3_client.download_file(self.bucket_name, pdf_key, tmp_path)
                
                try:
                    # Load PDF using PyPDFLoader
                    loader = PyPDFLoader(tmp_path)
                    documents = loader.load()
                    
                    # Add source information to metadata
                    for doc in documents:
                        doc.metadata['s3_bucket'] = self.bucket_name
                        doc.metadata['s3_key'] = pdf_key
                        doc.metadata['source_file'] = pdf_key.split('/')[-1]
                    
                    all_documents.extend(documents)
                    
                    print(f"  - Loaded {len(documents)} page(s)")
                    print(f"  - Total characters: {sum(len(doc.page_content) for doc in documents)}")
                    
                finally:
                    # Clean up temporary file
                    if os.path.exists(tmp_path):
                        os.remove(tmp_path)
            
            self.documents = all_documents
            print(f"\nTotal documents loaded: {len(self.documents)} pages from {len(pdf_files)} PDF(s)")
            print(f"Total characters: {sum(len(doc.page_content) for doc in self.documents)}")
            
            if self.documents:
                print(f"\nFirst document metadata: {self.documents[0].metadata}")
                print(f"First page preview: {self.documents[0].page_content[:150]}...")
            
            return self.documents
            
        except Exception as e:
            print(f"Error loading documents: {e}")
            raise
    
    def split_documents_into_chunks(self, chunk_size: int = 500, chunk_overlap: int = 50) -> List[Any]:
        """Split loaded documents into smaller chunks with overlap."""
        try:
            if not self.documents:
                raise ValueError("No documents loaded. Call load_documents() first.")
            
            print(f"\nSplitting documents into chunks (size={chunk_size}, overlap={chunk_overlap})")
            
            # Use RecursiveCharacterTextSplitter for better chunking
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                length_function=len,
                separators=["\n\n", "\n", " ", ""]
            )
            
            chunks = text_splitter.split_documents(self.documents)
            
            print(f"Split into {len(chunks)} chunks")
            print(f"Average chunk size: {sum(len(chunk.page_content) for chunk in chunks) / len(chunks):.0f} characters")
            
            return chunks
            
        except Exception as e:
            print(f"Error splitting documents: {e}")
            raise

# Initialize and use the DocumentProcessor
doc_processor = DocumentProcessor(bucket_name="test", prefix="pdfs/")

# Load all documents
all_documents = doc_processor.load_documents()

# Split into chunks
document_chunks = doc_processor.split_documents_into_chunks(chunk_size=1500, chunk_overlap=50)
document_chunks

In [4]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List,    Dict,   Any,Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class EmbeddingManager:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        self.model_name = model_name
        self.model = None
        self._load_model()
    
    def _load_model(self):
        """Load the sentence transformer model."""
        try:
            self.model = SentenceTransformer(self.model_name)
            print(f"Loaded model: {self.model_name}")
            print("Embedding dimension:", self.model.get_sentence_embedding_dimension() )
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embedding(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings for a list of texts."""

        if not self.model:
            raise ValueError("Model not loaded.")
        embeddings = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=True )
        print(f"Generated embeddings for {len(texts)} texts.")
        print(f"Embedding shape: {embeddings.shape}")
        return embeddings
    
    def get_sentence_embedding_dimension(self) -> int:
        """Get the dimension of the sentence embeddings."""
        if not self.model:
            raise ValueError("Model not loaded.")
        return self.model.get_sentence_embedding_dimension()
    

## initalize embedding manager

embedding_manager = EmbeddingManager(model_name='all-MiniLM-L6-v2')
embedding_manager

### Vector Store

In [None]:
class VectorStore:
    """ Manages a vector store using ChromaDB."""
    def __init__(self, collection_name: str = "pdf_documents", persistent_directory: str = "../data/vector_store"):

        self.collection_name = collection_name
        self.persistent_directory = persistent_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize the ChromaDB client and collection."""
        try:
            # Create persistent ChromaDB client
            os.makedirs(self.persistent_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persistent_directory)

            # Get or Create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG system"}                   
            )
            print(f"Vector store initialized: {self.collection_name} at {self.persistent_directory}")
            print(f"Existing documents in collection: {self.collection.count()}")            
        except Exception as e:
            print(f"Error initializing Vector Store: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """Add documents and their embeddings to the vector store."""
        try:

            if len(documents) != len(embeddings):
                raise ValueError("Number of documents and embeddings must match.")
            
            print(f"Adding {len(documents)} documents to the vector store...")

            # Prepare data for ChromaDB
            ids = []
            metadatas = []
            documents_texts = []
            embeddings_list = []

            for i, (doc, emb) in enumerate(zip(documents, embeddings)):
                # Generate a unique ID for each document
                doc_id = str(uuid.uuid4().hex[:8])
                ids.append(doc_id)

                # Prepare metadata
                metadata = dict(doc.metadata)
                metadata['doc_index'] = i
                metadata['content_length'] = len(doc.page_content)
                metadatas.append(metadata)
                
                # Document Content
                documents_texts.append(doc.page_content)
                # Embeddings
                embeddings_list.append(emb.tolist())

            # Add to ChromaDB collection
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_texts
            )

            print(f"Added {len(documents)} documents to the vector store.")
        except Exception as e:
            print(f"Error adding documents to Vector Store: {e}")
            raise

vectorStore = VectorStore(collection_name="pdf_documents", persistent_directory="../data/vector_store")
vectorStore
document_chunks

In [None]:
### Convert document chunks to embeddings
texts = [doc.page_content for doc in document_chunks]

### Generate embeddings for document chunks
embeddings = embedding_manager.generate_embedding(texts)

### Store in the Vector Store
vectorStore.add_documents(document_chunks, embeddings)


### RAG - Retrieval Augmented Generation- Retrieval Pipeline


In [8]:
class RAGRetriever:
    """Retriever for RAG system using ChromaDB vector store."""
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """ Intialize the retriever with vector store and embedding manager."""
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.4) -> List[Dict[str, Any]]:
        """Retrieve top_k relevant documents for the given query."""
        try:
            print(f"\n=== Retrieving documents for query ===")
            print(f"Query: '{query}'")
            print(f"Top K: {top_k}, Score Threshold: {score_threshold}")
            
            # Generate embedding for the query - First Convert query to Embedding
            print("Generating query embedding...")
            query_embeddings = self.embedding_manager.generate_embedding([query])
            query_embedding = query_embeddings[0].tolist()
            print(f"Query embedding shape: {len(query_embedding)}")

            # Query the vector store
            print("\nQuerying vector store...")
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding],
                n_results=top_k
            )

            # Process results
            print(f"\nQuery returned {len(results['ids'][0])} results")

            retrieved_docs = []
            if results['documents'] and len(results['documents'][0]) > 0:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                print("\n=== Results ===")
                for i, (doc_id, doc, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score
                    # ChromaDB uses L2 distance, so lower is better
                    similarity_score = 1 / (1 + distance)  # Convert distance to similarity
                    
                    print(f"\nRank {i+1}:")
                    print(f"  Doc ID: {doc_id}")
                    print(f"  Distance: {distance:.4f}")
                    print(f"  Similarity Score: {similarity_score:.4f}")
                    print(f"  Source: {metadata.get('source_file', 'Unknown')}")
                    print(f"  Page: {metadata.get('page', 'N/A')}")
                    print(f"  Content preview: {doc[:150]}...")
                    
                    if similarity_score >= score_threshold:
                        doc_info = {
                            "id": doc_id,
                            "content": doc,
                            "metadata": metadata,
                            "similarity_score": similarity_score,
                            "distance": distance,
                            "rank": i + 1
                        }
                        retrieved_docs.append(doc_info)
                        
                print(f"\n=== Summary ===")
                print(f"Documents above similarity threshold ({score_threshold}): {len(retrieved_docs)}")
            else:
                print("No documents retrieved.")

            return retrieved_docs

        except Exception as e:
            print(f"Error retrieving documents: {e}")
            import traceback
            traceback.print_exc()
            return []

rag_retriever = RAGRetriever(vector_store=vectorStore, embedding_manager=embedding_manager)
rag_retriever

<__main__.RAGRetriever at 0x129db12b0>

In [None]:
rag_retriever.retrieve(query="describe about Attention is All you Need")

In [None]:
## Debug: Check vector store contents and search
print("=== Vector Store Debug Info ===")
print(f"Total documents in vector store: {vectorStore.collection.count()}")

# Check if the Attention paper is in the documents
print("\n=== Checking for 'Attention' paper ===")
attention_chunks = [doc for doc in document_chunks if 'attention' in doc.page_content.lower() 
                    or 'attention' in doc.metadata.get('source_file', '').lower()]
print(f"Found {len(attention_chunks)} chunks mentioning 'attention'")

if attention_chunks:
    print(f"\nFirst attention chunk metadata: {attention_chunks[0].metadata}")
    print(f"First attention chunk preview: {attention_chunks[0].page_content[:200]}...")

# Test the query with debug info
print("\n=== Testing Query ===")
query = "describe about Attention is All you Need"
print(f"Query: {query}")
results = rag_retriever.retrieve(query=query, top_k=5, score_threshold=0.1)

In [None]:
## Check if documents were actually added to vector store
print("=== Verifying Vector Store Population ===")
print(f"Number of chunks created: {len(document_chunks)}")
print(f"Number of embeddings generated: {embeddings.shape[0] if embeddings is not None else 0}")
print(f"Documents in vector store: {vectorStore.collection.count()}")

# Sample a random query from the vector store
sample_result = vectorStore.collection.query(
    query_embeddings=[embeddings[0].tolist()],
    n_results=3
)
print(f"\nSample query returned {len(sample_result['ids'][0])} results")
if sample_result['documents']:
    print(f"Sample document preview: {sample_result['documents'][0][0][:200]}...")
    print(f"Sample distances: {sample_result['distances'][0]}")
    print(f"Sample metadata: {sample_result['metadatas'][0][0]}")

### RAG with Groq LLM Simple 

Implementing complete RAG pipeline with Groq open source LLM

In [None]:
## Simple RAG pipeline with Groq LLM

from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()


### Initialize Groq LLM with API key from environment variable
groq_api_key = "<APIKEY OF GROQ>"

llm = ChatGroq(api_key=groq_api_key, model="llama-3.1-8b-instant", temperature=0.2, max_tokens=1000)

## Simple RAG function: retrieve context +  generate response

def rag_simple(query,retriever:RAGRetriever, llm:ChatGroq, top_k:int=5, score_threshold:float=0.5) -> str:
    """Simple RAG pipeline: retrieve context and generate answer."""
    # Retrieve relevant documents
    retrieved_docs = retriever.retrieve(query=query, top_k=top_k, score_threshold=score_threshold)
    
    if not retrieved_docs:
        return "I cannot find the answer in the provided documents."
    
    # Combine retrieved document contents
    context = "\n\n".join([doc['content'] for doc in retrieved_docs])

    if not context.strip():
        return "I cannot find the answer in the provided documents."
    
    # Create prompt for LLM
    prompt = f"""Answer the following question based on the provided context. If the answer cannot be found in the context, say "I cannot find the answer in the provided documents."

        Context:
        {context}

        Question: {query}

        Answer:"""
    
    # Generate answer using LLM
    response = llm.invoke(prompt.format(query=query, context=context))
    
    return response.content.strip()



In [None]:
answer= rag_simple(query="Explain Model Architecture of Attention is all you Need Paper", retriever=rag_retriever, llm=llm, top_k=3, score_threshold=0.3)
answer

In [None]:
def rag_advanced(query: str, retriever: RAGRetriever, llm: ChatGroq, top_k: int = 5, score_threshold: float = 0.5,return_context: bool = False) -> str:
    """RAG pipeline with extra features
        Returns answer,sources,confidence score and optionally full context
    """
    # Retrieve relevant documents
    retrieved_docs = retriever.retrieve(query=query, top_k=top_k, score_threshold=score_threshold)
    
    if not retrieved_docs:
        return "I cannot find the answer in the provided documents."
    
    # Combine retrieved document contents
    context = "\n\n".join([doc['content'] for doc in retrieved_docs])

    if not context.strip():
        return "I cannot find the answer in the provided documents."
    
    # Prepare context and sources
    sources = [{
            'source': doc['metadata'].get('source_file', 'Unknown'),
            'similarity_score': doc['similarity_score'],
            'rank': doc['rank'],
            'page': doc['metadata'].get('page', 'N/A'),
            'preview': doc['content'][:150]
        } for doc in retrieved_docs
    ]
    confidence_score = np.mean([doc['similarity_score'] for doc in retrieved_docs])


    # Generate Answer using LLM

    prompt = f"""Answer the following question based on the provided context. If the answer cannot be found in the context, say "I cannot find the answer in the provided documents."

        Context:
        {context}

        Question: {query}

        Answer:"""
    
    # Generate answer using LLM
    response = llm.invoke([prompt.format(query=query, context=context)])

    output={
        'answer': response.content.strip(),
        'sources': sources,
        'confidence_score': confidence_score,
        'full_context': context
    }

    if return_context:
        output['context']= context
    return output    

    # Example usage
result = rag_advanced(query="Explain Model Architecture of Attention is all you Need Paper", retriever=rag_retriever, llm=llm, top_k=3, score_threshold=0.3)
result   
    