### RAG Pipeline- Data Ingestion to Vector DB Pipeline

In [None]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path


In [None]:
### Read all PDF inside the directory
def process_all_pdfs(pdf_directory):
    """ Process all PDF files in a directory """
    all_documents = []
    pdf_dir = Path(pdf_directory)

    # Find all PDF files recursively
    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print(f"Found {len(pdf_files)} PDF files to Process")

    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            # Add source information to metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            
            all_documents.extend(documents)
            print(f"Loaded {len(documents)} pages")

        except Exception as e:
            print(f"X Error: {e}")

    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

all_pdf_documents = process_all_pdfs("../data/pdf")

In [None]:
### Text splitting getting to chunks

def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    ### Split documents into smaller chunks for better RAG performance ###
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size= chunk_size,
        chunk_overlap= chunk_overlap,
        length_function= len,
        separators= ["\n\n", "\n", " ", ""]
    )

    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    if split_docs:
        print(f"\n Example chunk: ")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")
        
    return split_docs

In [None]:
chunks= split_documents(all_pdf_documents)

In [None]:
chunks

### Embedding and Vector store DB

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any , Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class EmbeddingManager:
    # Handles document embedding generation
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
            Initialize the embedding manager

            Args:
                model_name: Hugging face model name for sentence embeddings
        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """ Load the Sentence Transformer model"""
        try:
            print(f"Error loading model {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension {self.model.get_sentence_embedding_dimension()}")
            
        except Exception as e:
            print(f"Error loading model: {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts

        ARGS:
        List of text strings to be embedded

        Returns:
        numpy array of embeddings with shape (len(texts), embedding_ dim)
        """

        if not self.model:
            raise ValueError("Model not loaded")
        
        print(f"Generating embeddings for {len(texts)} text...")
        embeddings = self.model.encode(texts, show_progress_bar= True)
        print(f"Generated embeddings with shape {embeddings.shape}")
        return embeddings
    
    def get_embedding_dimension(self) -> int:
        """Get the embedding dimension of the model"""

        if not self.model:
            raise ValueError("Model not loaded")
        return self.model.get_sentence_embedding_dimension()
    

## Initializing embedding Manager
embedding_manager = EmbeddingManager()
embedding_manager

#### VECTOR STORE


In [None]:
class VectorStore:
    """Manages Document embeddings in a ChromaDB Vector store"""
    def __init__(self, collection_name: str = 'pdf_documents', persist_directory: str = "../data/vector_store"):
        """
        Initialize the Vector Store

        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """

        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """ Initialize chromaDB client and collection"""
        try:
            #Create persistent chromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path= self.persist_directory)

            #Get or create collections
            self.collection = self.client.get_or_create_collection(
                name = self.collection_name,
                metadata = {"description": "PDF Documents embedding for RAG"}
            )

            print(f" Vector Store initialized. Collection: {self.collection_name}")
            print(f" Existing documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error initializing Vector store: {e}")
            raise

    def add_documents(self, documents: List[any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the Vector Store

        ARGS:
            documents: A list of Langchain documents
            embeddings: Corresponding embeddings for the documents
        """

        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store...")

        # Prepare Data for ChromaDB
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []


        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique IDs
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            # Prepare Metadata
            metadata = dict(doc.metadata)
            metadata["doc_index"] = i
            metadata["content_length"] = len(doc.page_content)
            metadatas.append(metadata)

            # Document content
            documents_text.append(doc.page_content)

            # Embedding 
            embeddings_list.append(embedding.tolist())

        # Add to collections
        try:
            self.collection.add(
                ids = ids,
                embeddings = embeddings_list,
                metadatas = metadatas,
                documents = documents_text
            )
            print(f"Successfully added {len(documents)} documents to vector store")
            print(f" Total documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f" Error adding documents to vector store: {e}")
            raise

vector_Store = VectorStore()
vector_Store   


In [None]:
chunks

In [None]:
### Convert the text to embeddings
texts = [doc.page_content for doc in chunks]
texts

## Generate Embeddings
embeddings = embedding_manager.generate_embeddings(texts)

## Store in Vector Store
vector_Store.add_documents(chunks, embeddings)

### Retriever Pipeline From Vector Database

In [None]:
class RAGRetriever:
    """Handles query-based retrieval from Vector Store"""
    def __init__(self, vector_Store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever

        ARGS:
            vector_store: Vector store containing the embeddings
            embedding_manager: Manager for generating query embeddings
        """

        self.vector_store = vector_Store
        self.embedding_manager = embedding_manager

    def retrieve(self, query:str, top_k:int = 5, score_threshold:float = 0.0 ) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a query

        ARGS:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold

        RETURNS:
            List of dictionaries containing retrieved documents and metadata
        """

        print(f"Retrieving documents for query: '{query}'  ")
        print(f"Top K: {top_k},  Score Threshold: {score_threshold}")

        # Generate Query Embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]

        # Search in Vector Store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results= top_k
            )

            #Process results
            retrieved_docs = []

            if(results)['documents'] and (results)['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance

                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })

                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")

            else:
                print("No documents found!")

            return retrieved_docs

        except Exception as e:
            print(f"Error during retrieval, {e}")
            return []
        
rag_retriever = RAGRetriever(vector_Store, embedding_manager)


In [None]:
rag_retriever.retrieve("What is Greg's email address? ")
