PipeLine of Rag System

1. loading the documents
2. Text Preprocessing
3. Text chunking
4. Embedding Generation
5.vector store 
6. Query Processing
7. Similarity Search
8. Answer Generation


# loading documents

In [None]:
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.document_loaders import PyMuPDFLoader

loader =DirectoryLoader("../updatedgenerativeai/pdf",glob="**/*.pdf",loader_cls=PyMuPDFLoader,show_progress=True)
docs = loader.load()
print(docs)

In [None]:
import os
from langchain_community.document_loaders import DirectoryLoader,PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path


In [None]:
## Read All The pdf inside the directory
def process_all_pdf(pdf_directory):
    """ Process All The PDF files in the given Directory""" 
    all_documents = []
    pdf_dir=Path(pdf_directory)
    pdf_files=list(pdf_dir.glob("**/*.pdf"))
    print(f"Found {len(pdf_files)} PDF files")

    for pdf_file in pdf_files:
        try:
            loader=PyMuPDFLoader(pdf_file)
            docs=loader.load()
            for doc in docs:
                doc.metadata["source"]=pdf_file.name
                doc.metadata["file_type"]="pdf"
            all_documents.append(doc)
            print(f"Processed {pdf_file}")
        
        except Exception as e:
            print(f"Failed to process {pdf_file}: {e}")
    return all_documents


In [None]:
all_documents=process_all_pdf("../updatedgenerativeai/pdf")

In [None]:
## Making Chunks of the text

def split_text_in_chunks(documents,chunk_size=2000,chunk_overlap=20):
    """ Break each document in chunks so to process it properly"""
    text_splitter=RecursiveCharacterTextSplitter(chunk_size=chunk_size,chunk_overlap=chunk_overlap,length_function=len,separators=["\n\n", "\n", " ", ""])
    doc_splitter=text_splitter.split_documents(documents)
    print(f"splitted the {len(documents)} documents into {len(doc_splitter)} chunks")
    if doc_splitter:
        print("\n Example split of first chunk")
        print(doc_splitter[0].page_content)
        print("\n Example split of second chunk")
        print(doc_splitter[1].page_content)
    return doc_splitter



In [None]:
chunks=split_text_in_chunks(all_documents)


In [None]:
## lets do the embeddings

import numpy as np 
from sentence_transformers import SentenceTransformer
from typing import List,Dict,Any,Tuple
import chromadb
from chromadb.config import Settings
import uuid
from sklearn.metrics.pairwise import cosine_similarity




In [None]:
class EmbeddingManager :
    def __init__(self,model_name="all-MiniLM-L6-v2"):
        self.model_name=model_name
        self.model=None
        self._load_model()

    def _load_model(self):
        try:
            self.model=SentenceTransformer(self.model_name)
            print(f"Model loaded successfully: {self.model.get_sentence_embedding_dimension()}")
            
        except Exception as e:
            print(f"Error while loading the model {e}")
            raise
    
    def get_embeddings(self,text:List[str]):
        """Generate the embeddings for the given text
            args: 
                text (list):list of strings
            returns:
                np.ndarray: embeddings of the given text
        """
        if not self.model:
            raise ValueError("Model not loaded")

        try:
            embeddings = self.model.encode(text,show_progress_bar=True)
            return embeddings
        except Exception as e:
            print(f"Error while generating embeddings: {e}")
            raise


    

In [None]:
embed=EmbeddingManager()

embed.get_embeddings(["hello","world"])

# Vector Store:

In [None]:
class VectorStore:
    """Managing the embeddings we created""" 
    def __init__(
        self,
        collection_name:str ="pdf_documents",
        persist_directory:str ="./vector_store"
    ):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()
    def _initialize_store(self):
        """Initialize the vector store""" 
        try:
            os.makedirs(self.persist_directory,exist_ok=True)
            self.client=chromadb.PersistentClient(
                path=self.persist_directory
            )
            self.collection=self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF documents embeddings for RAG"}
            )
            print(f"Vector store initialized: {self.collection_name}")
            print(f"Existing documents: {self.collection.count()}")
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise
    

    def add_documents(self,documents:list[Any],embeddings:np.ndarray):
        """Add documents and their embeddings to the vector store""" 
        if len(documents) != len(embeddings):
            raise ValueError("Documents and embeddings count mismatch")
        ids=[]
        embeddings_list=[]
        documents_text=[]
        metadata=[]


        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            metadata = dict(doc.metadata)
            metadata["doc_index"] = i
            metadata["content_length"] = len(doc.page_content)
            metadatas.append(metadata)

            documents_text.append(doc.page_content)
            embeddings_list.append(embedding.tolist())

        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                documents=documents_text,
                metadatas=metadatas,
            )
            print(f"Successfully added {len(ids)} documents")

        except Exception as e:
            print(f"Error adding documents: {e}")
            raise

    def query(self, query_embedding: np.ndarray, k: int = 5):
        """Query similar documents"""
        try:
            return self.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=k,
            )
        except Exception as e:
            print(f"Error querying vector store: {e}")
            raise

In [None]:
vec=VectorStore()


In [None]:
texts=[doc.page_content for doc in chunks]

embeddings=embed.get_embeddings(texts)
print(embeddings.shape)

Retreival Pipeline from vector store