# RAG Pipelines - Data Ingestion to Vector DB Pipeline 

In [1]:
import os 
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader 
from langchain_text_splitters import RecursiveCharacterTextSplitter 
from pathlib import Path 

In [2]:
# Read all the pdf's inside the directory 
def process_all_pdfs(pdf_directory): 
    # Process all PDF files in a directory 
    all_documents = [] 
    pdf_dir = Path(pdf_directory) 

    # Find all PDF files recursively 
    pdf_files = list(pdf_dir.glob("**/*.pdf")) 

    print(f"Found {len(pdf_files)} PDF files to process") 

    for pdf_file in pdf_files: 
        print(f"\nProcessing: {pdf_file.name}")
        try: 
            loader = PyPDFLoader(str(pdf_file)) 
            documents = loader.load() 

            # Add source information to metadata 
            for doc in documents: 
                doc.metadata['source_file'] = pdf_file.name 
                doc.metadata['file_type'] = 'pdf'

            all_documents.extend(documents) 
            print(f"Loaded {len(documents)} pages") 
        except Exception as e: 
            print(f"Error: {e}") 

    print(f"\nTotal documents loaded: {len(all_documents)}") 
    return all_documents 

# Process all PDFs in the data directory 
all_pdf_documents = process_all_pdfs("../data") 

Found 3 PDF files to process

Processing: UGCF-Flowchart.pdf
Loaded 4 pages

Processing: Guidelines_DAVP.pdf
Loaded 3 pages

Processing: VAC ethics.pdf
Loaded 3 pages

Total documents loaded: 10


In [3]:
all_pdf_documents

[Document(metadata={'producer': 'PDFium', 'creator': 'PDFium', 'creationdate': 'D:20221113231027', 'source': '../data/pdf/UGCF-Flowchart.pdf', 'total_pages': 4, 'page': 0, 'page_label': '1', 'source_file': 'UGCF-Flowchart.pdf', 'file_type': 'pdf'}, page_content='GE 2 \nCredit - 4\nAEC \nCredit - 2\nVAC 2 \nCredit - 2\nDSE \nCredit - 4\nSEC 2 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - II\nIL - 1 / Environment Sc - 1\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nFirst Year\nSecond Year\nUndergraduate Certificate \nin the field of Study/Discipline\nGE 4 \nCredit - 4\nAEC \nCredit - 2 VAC 4 \nCredit - 2\nDSE 2 \nCredit - 4\nSEC 4 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - IV\nIL - 2/ Environment Sc - 2\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nOR\nOne from the Pool of DSE Courses\nOR\nGE 5 \nCredit - 4\nAEC \nCredit - 2 VAC 1 \nCredit - 2\nDSE 3 \nCredit - 4\nSEC 5 \nCredit - 2\nIAPC

In [4]:
# Text splitting get into chunks 
def split_documents(documents, chunk_size=1000, chunk_overlap=200): 
    # Split documents into smaller chunks for better RAG perform performance 
    text_splitter = RecursiveCharacterTextSplitter( 
        chunk_size=chunk_size, 
        chunk_overlap=chunk_overlap,
        length_function=len, 
        separators=["\n\n", "\n", " ", ""]
    )
    split_docs = text_splitter.split_documents(documents) 
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    # Exmaple of a chunk 
    if split_docs: 
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}...") 
        print(f"Metadata: {split_docs[0].metadata}")
    return split_docs

In [5]:
chunks = split_documents(all_pdf_documents) 
chunks 

Split 10 documents into 49 chunks

Example chunk:
Content: GE 2 
Credit - 4
AEC 
Credit - 2
VAC 2 
Credit - 2
DSE 
Credit - 4
SEC 2 
Credit - 2
IAPC 
Credit - 2
One from the Pool of GE Courses
SEM - II
IL - 1 / Environment Sc - 1
One from the Pool of SEC Cour...
Metadata: {'producer': 'PDFium', 'creator': 'PDFium', 'creationdate': 'D:20221113231027', 'source': '../data/pdf/UGCF-Flowchart.pdf', 'total_pages': 4, 'page': 0, 'page_label': '1', 'source_file': 'UGCF-Flowchart.pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'PDFium', 'creator': 'PDFium', 'creationdate': 'D:20221113231027', 'source': '../data/pdf/UGCF-Flowchart.pdf', 'total_pages': 4, 'page': 0, 'page_label': '1', 'source_file': 'UGCF-Flowchart.pdf', 'file_type': 'pdf'}, page_content='GE 2 \nCredit - 4\nAEC \nCredit - 2\nVAC 2 \nCredit - 2\nDSE \nCredit - 4\nSEC 2 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - II\nIL - 1 / Environment Sc - 1\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nFirst Year\nSecond Year\nUndergraduate Certificate \nin the field of Study/Discipline\nGE 4 \nCredit - 4\nAEC \nCredit - 2 VAC 4 \nCredit - 2\nDSE 2 \nCredit - 4\nSEC 4 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - IV\nIL - 2/ Environment Sc - 2\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nOR\nOne from the Pool of DSE Courses\nOR\nGE 5 \nCredit - 4\nAEC \nCredit - 2 VAC 1 \nCredit - 2\nDSE 3 \nCredit - 4\nSEC 5 \nCredit - 2\nIAPC

# Embedding and VectorStoreDB 

In [6]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [7]:
class EmbeddingManager:
    """Handles document embedding generation using SentenceTransformer"""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager
        
        Args:
            model_name: HuggingFace model name for sentence embeddings
        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts
        
        Args:
            texts: List of text strings to embed
            
        Returns:
            numpy array of embeddings with shape (len(texts), embedding_dim)
        """
        if not self.model:
            raise ValueError("Model not loaded")
        
        print(f"Generating embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings


## initialize the embedding manager

embedding_manager=EmbeddingManager()
embedding_manager

Loading embedding model: all-MiniLM-L6-v2


Loading weights: 100%|██████████| 103/103 [00:00<00:00, 1146.19it/s, Materializing param=pooler.dense.weight]                             
[1mBertModel LOAD REPORT[0m from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m


Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x7504f9ee8740>

In [8]:
class VectorStore:
    """Manages document embeddings in a ChromaDB vector store"""
    
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        """
        Initialize the vector store
        
        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        try:
            # Create persistent ChromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            
            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the vector store
        
        Args:
            documents: List of LangChain documents
            embeddings: Corresponding embeddings for the documents
        """
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store...")
        
        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []
        
        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique ID
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)
            
            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)
            
            # Document content
            documents_text.append(doc.page_content)
            
            # Embedding
            embeddings_list.append(embedding.tolist())
        
        # Add to collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"Successfully added {len(documents)} documents to vector store")
            print(f"Total documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

vectorstore=VectorStore()
vectorstore

Vector store initialized. Collection: pdf_documents
Existing documents in collection: 0


<__main__.VectorStore at 0x7504f9a8b650>

In [9]:
chunks

[Document(metadata={'producer': 'PDFium', 'creator': 'PDFium', 'creationdate': 'D:20221113231027', 'source': '../data/pdf/UGCF-Flowchart.pdf', 'total_pages': 4, 'page': 0, 'page_label': '1', 'source_file': 'UGCF-Flowchart.pdf', 'file_type': 'pdf'}, page_content='GE 2 \nCredit - 4\nAEC \nCredit - 2\nVAC 2 \nCredit - 2\nDSE \nCredit - 4\nSEC 2 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - II\nIL - 1 / Environment Sc - 1\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nFirst Year\nSecond Year\nUndergraduate Certificate \nin the field of Study/Discipline\nGE 4 \nCredit - 4\nAEC \nCredit - 2 VAC 4 \nCredit - 2\nDSE 2 \nCredit - 4\nSEC 4 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - IV\nIL - 2/ Environment Sc - 2\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nOR\nOne from the Pool of DSE Courses\nOR\nGE 5 \nCredit - 4\nAEC \nCredit - 2 VAC 1 \nCredit - 2\nDSE 3 \nCredit - 4\nSEC 5 \nCredit - 2\nIAPC

In [10]:
# Convert the text to embeddings 
texts = [doc.page_content for doc in chunks]
texts 

['GE 2 \nCredit - 4\nAEC \nCredit - 2\nVAC 2 \nCredit - 2\nDSE \nCredit - 4\nSEC 2 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - II\nIL - 1 / Environment Sc - 1\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nFirst Year\nSecond Year\nUndergraduate Certificate \nin the field of Study/Discipline\nGE 4 \nCredit - 4\nAEC \nCredit - 2 VAC 4 \nCredit - 2\nDSE 2 \nCredit - 4\nSEC 4 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - IV\nIL - 2/ Environment Sc - 2\nOne from the Pool of SEC Courses\nOne from the Pool of VAC Courses\nOR\nOne from the Pool of DSE Courses\nOR\nGE 5 \nCredit - 4\nAEC \nCredit - 2 VAC 1 \nCredit - 2\nDSE 3 \nCredit - 4\nSEC 5 \nCredit - 2\nIAPC \nCredit - 2\nOne from the Pool of GE Courses\nSEM - V\nOne from the Pool of SEC Courses\nThird Year\nOne from the Pool of DSE Courses\nOR\nUndergraduate Diploma in \nthe field of Study/Discipline\nGE 6 \nCredit - 4\nAEC \nCredit - 2\nVAC 1 \nCredit - 2\nDSE 4

In [11]:
# Generate the Embeddings 
embedings = embedding_manager.generate_embeddings(texts)

# Store in the VectorDB 
vectorstore.add_documents(chunks, embedings)

Generating embeddings for 49 texts...


Batches: 100%|██████████| 2/2 [00:00<00:00,  4.77it/s]

Generated embeddings with shape: (49, 384)
Adding 49 documents to vector store...
Successfully added 49 documents to vector store
Total documents in collection: 49





# Retroever Pipeline from VectorStore 

In [19]:
class RAGRetriever: 
    # Handles query-based retrieval from the vector store 

    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager): 
        """
        Initialize the retriever 
        Args: 
            vector_store: Vector store containing document embeddings 
            embedding_manager: Manager for generating query embeddings 
        """
        self.vector_store = vector_store 
        self.embedding_manager = embedding_manager 

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]: 
        """
        Retrieve relevant documents for a query 
        Args: 
            query: The search query 
            top_k: Number of top results to return 
            score_threshold: Minimum similarity score threshold 

        Returns: 
            List of dictionaries containing retrieved documents and metadata 
        """
        print(f"Retrieving documents for query: `{query}`")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")

        # Generate query embedding 
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]

        # Search in vector store 
        try: 
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )

            # Process results 
            retrieved_docs = [] 

            if results.get('documents') and results['documents'][0]: 
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)): 
                    # Convert distance to bounded similarity score in [0, 1]
                    similarity_score = 1 / (1 + float(distance))

                    if similarity_score >= score_threshold: 
                        retrieved_docs.append({
                            'id': doc_id, 
                            'content': document, 
                            'metadata': metadata, 
                            'similarity_score': similarity_score, 
                            'distance': distance, 
                            'rank': i + 1 
                        })

            if retrieved_docs:
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")

            return retrieved_docs
        except Exception as e: 
            print(f"Error during retrieval: {e}")
            return [] 

rag_retriever = RAGRetriever(vectorstore,embedding_manager)


In [20]:
rag_retriever

<__main__.RAGRetriever at 0x7504ecf89ee0>

In [21]:
rag_retriever.retrieve("What are the three units in Ethics and values subject?")

Retrieving documents for query: `What are the three units in Ethics and values subject?`
Top K: 5, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 33.41it/s]

Generated embeddings with shape: (1, 384)
Retrieved 5 documents (after filtering)





[{'id': 'doc_d5959923_42',
  'content': 'VAC 1: ETHICS AND VALUES IN ANCIENT INDIAN TRADITIONS \nCredit distribution, Eligibility and Pre-requisites of the Course \nCourse Credits Credit distribution of the course Eligibility Pre-requisite \ntitle & Lecture Tutorial Practical/ criteria of the course \nCode Practice \nEthics and 02 1 0 1 Pass in NIL \nValues in Class 12th \nAncient \nIndian \nTraditions \nLearning Objectives \nThe Learning Objectives of this course are as follows: \n• To understand the rich cultural traditions relating to discourses on life and its purpose, \ninstilling of values relating to ethical and moral propriety. \n• To make students more engaged with the past traditions of the country. \n• To introduce students to early epics: Puranic, Buddhist and other traditions. \nLearning outcomes \nThe Learning Outcomes of this course are as follows: \n• Students will develop an overview of indigenous philosophies . \n• Understanding the richness of Indian heritage leading