RAG Pipeline : Data Ingestion to Vector DB Pipeline

In [None]:
import os
from pathlib import Path
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader, DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

: 

In [None]:
### Read All the pdf files from the directory

def process_all_pdfs(pdf_directory):
    ###Process all pdf files in the directory

    all_documents = []
    pdf_dir = Path(pdf_directory)

    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print(f"Found {len(pdf_files)} PDF files in directory {pdf_directory}")

    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file}")
        try:
            loader = PyMuPDFLoader(str(pdf_file))
            documents = loader.load()

            # Add source information to metadata
            for doc in documents:
                doc.metadata["source"] = pdf_file.name
                doc.metadata["file_type"] = "pdf"
            all_documents.extend(documents)
            print(f"Loaded {len(documents)} documents from {pdf_file}")
        except Exception as e:
            print(f"Error loading {pdf_file}: {e}")
    
    return all_documents


In [None]:
all_pdf_documents = process_all_pdfs("../data")
print(f"Total documents loaded: {all_pdf_documents}")

In [None]:
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    ### Split documents into smaller chunks

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""]
    )

    split_docs = text_splitter.split_documents(documents)
    print(f"Split into {len(split_docs)} chunks.")

    # example of chunk 
    if split_docs:
        print("Example chunk:")
        print("Chunks : ", split_docs[0].page_content[:500])  # Print first 500 characters of the first chunk
        print("Metadata:", split_docs[0].metadata)
    return split_docs


In [None]:
chunks = split_documents(all_pdf_documents)

Embedding and VectorStore DB

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from typing import List, Dict, Any, Tuple
import uuid
from sklearn.metrics.pairwise import cosine_similarity


In [None]:
class EmbeddingManager:
    ###Handles document embedding generation using SetenceTransformer

    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager

        Args: 
            model_name (str): HuggingFace model name for Sentence embeddings

        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")

        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise 

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts

        Args:
            texts (List[str]): List of text strings to embed
        Returns:
            np.ndarray: Array of embeddings
        """
        if not self.model:
            raise ValueError("Model is not loaded.")
        
        print(f"Generating embeddings for {len(texts)} texts......")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings
    
    def add_documents(self, documents: List[Any], embeddings:np.ndarray):
        """
        Add documents and their embeddings to the vector store

        Args:
            documents (List[Any]): _description_
            embeddings (np.ndarray): _description_
        """
        if len(documents)!= len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store....")
        
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []
        
        for i, (doc,embedding) in enumerate(zip(documents, embeddings)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)
            
            #prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadata.append(metadata)
            
            # Document content
            documents_text.append(doc.page_content)
            
            # embedding
            embeddings_list.append(embedding.tolist())
            
            
        try:
            self.collection.add(
                ids=ids,
                metadatas=metadatas,
                documents=documents_text,
                embeddings=embeddings_list
            )
            print(f"Successfully added {len(documents)} documents to vector store.")
            print(f"Total documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise
    

## initialize embedding manager
embedding_manager = EmbeddingManager()
embedding_manager

Vector Store

In [None]:
class VectorStore:
    """Manages document embeddings in a ChromaDB vector Store"""
    
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        """
        Initialize the vector store
        
        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()
        
    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        
        try:
            # create persistant ChromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            
            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description:":"Pdf document embeddings for RAG"}
            )
            print(f"Vector Store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise
    
    def add_documents(self, documents: List[Any], embeddings:np.ndarray):
        """
        Add documents and their embeddings to the vector store

        Args:
            documents (List[Any]): _description_
            embeddings (np.ndarray): _description_
        """
        if len(documents)!= len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store....")
        
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []
        
        for i, (doc,embedding) in enumerate(zip(documents, embeddings)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)
            
            #prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)
            
            # Document content
            documents_text.append(doc.page_content)
            
            # embedding
            embeddings_list.append(embedding.tolist())
            
            
        try:
            self.collection.add(
                ids=ids,
                metadatas=metadatas,
                documents=documents_text,
                embeddings=embeddings_list
            )
            print(f"Successfully added {len(documents)} documents to vector store.")
            print(f"Total documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise
            
            
vectorstore = VectorStore()
vectorstore
        
            

In [None]:
chunks

In [None]:
### Convert the text to embeddings

texts = [doc.page_content for doc in chunks]

## Generate embeddings
embeddings = embedding_manager.generate_embeddings(texts)

## store in vector db
vectorstore.add_documents(chunks, embeddings)


Retriever Pipeline From VectorStore

In [None]:
class RAGRetriever:
    """handles retrieval of relevant documents from vector store for RAG"""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the RAG retriever
        
        Args:
            vector_store (VectorStore): Instance of the VectorStore
            embedding_manager (EmbeddingManager): Instance of the EmbeddingManager
            top_k (int): Number of top similar documents to retrieve
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager
    
    def retrieve(self, query: str, top_k: int=2 , score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve top_k similar documents for the given query
        
        Args:
            query (str): User query string
            top_k (int): Number of top similar documents to retrieve
        Returns:
            List[Dict[str, Any]]: List of retrieved documents with metadata
        """
        print(f"Generating embedding for query: {query}")
        print(f"Top k : {top_k}, Score threshold: {score_threshold}")
        
        #Generate embedding for query
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        # Search query in vector store
        try:
            
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            
            # Process results
            retieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distance = results['distance'][0]
                ids = results['ids'][0]
                
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distance)):
                    
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance
                    
                    if similarity_score >= score_threshold:
                        retieved_docs.append({
                            "id": doc_id,
                            "document": document,
                            "metadata": metadata,
                            "similarity_score": similarity_score,
                            'distance': distance,
                            'rank': i+1
                        })
                        
                print(f"Retrieved {len(retieved_docs)} documents after applying score threshold.")
            else:
                print("No documents retrieved from vector store.") 
                
            return retieved_docs
        
        except Exception as e:
            print(f"Error retrieving documents: {e}")
            return []
         
         
ragRetriever = RAGRetriever(vectorstore, embedding_manager)

In [None]:
ragRetriever

In [None]:
ragRetriever.retrieve("Give me an overview of the document")