### RAG pipelines - Data Ingestion to VectorDB pipeline

In [1]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader

In [2]:
### Read all the pdf inside the directory
from pathlib import Path

def process_all_pdf(pdf_directory):
    all_documents = []
    pdf_dir = Path(pdf_directory)

    pdf_files=list(pdf_dir.glob("**/*.pdf"))
    print(f"Found {len(pdf_files)} PDF files.")
    for pdf_file in pdf_files:
        print(f"\nProcessing file: {pdf_file}")
        try:
            loader = PyMuPDFLoader(str(pdf_file))
            documents = loader.load()
            for doc in documents:
                doc.metadata['source'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'

            all_documents.extend(documents)
            print(f"Loaded {len(documents)} pages from {pdf_file.name}.")
        except Exception as e:  
            print(f"Error processing {pdf_file}: {e}")

    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents
           

In [3]:
# Process all PDF in the data directory
pdf_directory = "./pdf_files"
all_pdf_documents = process_all_pdf(pdf_directory)

Found 2 PDF files.

Processing file: pdf_files\document1.pdf
Loaded 1 pages from document1.pdf.

Processing file: pdf_files\document2.pdf
Loaded 1 pages from document2.pdf.

Total documents loaded: 2


In [4]:
### Text splitting get into chunks
from langchain_text_splitters import RecursiveCharacterTextSplitter

def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""]
    )

    split_docs = []
    for doc in documents:
        splits = text_splitter.split_text(doc.page_content)
        for i, split in enumerate(splits):
            new_doc = doc.copy()
            new_doc.page_content = split
            new_doc.metadata['chunk_index'] = i
            split_docs.append(new_doc)

    print(f"Total documents after splitting: {len(split_docs)}")
    return split_docs

In [5]:
chunks=split_documents(all_pdf_documents)


Total documents after splitting: 2


C:\Users\Acer\AppData\Local\Temp\ipykernel_41040\2632767502.py:15: PydanticDeprecatedSince20: The `copy` method is deprecated; use `model_copy` instead. See the docstring of `BaseModel.copy` for details about how to handle `include` and `exclude`. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.12/migration/
  new_doc = doc.copy()


### Embedding and VectorStoreDB

In [6]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [7]:
class EmbeddingManager:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        try:
            print(f"Loading embedding model: {self.model_name}...")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. \nLoaded embedding model: {self.model_name}. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise
    
    def generate_embedding(self, texts: List[str]) -> np.ndarray:
        if not self.model:
            raise ValueError("Embedding model is not loaded.")
        print(f"Generating embeddings for {len(texts)} texts...")
        try:
            embeddings = self.model.encode(texts, convert_to_numpy=True)
            return embeddings
        except Exception as e:
            print(f"Error generating embeddings: {e}")
            raise

    def get_embedding_dimension(self) -> int:
        if not self.model:
            raise ValueError("Embedding model is not loaded.")
        return self.model.get_sentence_embedding_dimension()
    

In [8]:
### Initialize Embedding Manager
embedding_manager = EmbeddingManager()
embedding_manager

Loading embedding model: all-MiniLM-L6-v2...


Loading weights: 100%|██████████| 103/103 [00:00<00:00, 1222.66it/s, Materializing param=pooler.dense.weight]                             
BertModel LOAD REPORT from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


Model loaded successfully. 
Loaded embedding model: all-MiniLM-L6-v2. Embedding dimension: 384


<__main__.EmbeddingManager at 0x1f05d7ce660>

### VectorStore


In [9]:
class VectorStore:
    """Persistent directory means whatever is vector store files will be saved there."""
    def __init__(self, collection_name: str = "document_embeddings", persistent_directory: str = "./vector_store"):
        self.collection_name = collection_name
        self.persistent_directory = persistent_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        try:
            # Create persistent directory if not exists
            print(f"Initializing ChromaDB client with persistent directory: {self.persistent_directory}...")
            os.makedirs(self.persistent_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persistent_directory)
            
            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description" : "PDF document embeddings for RAG"}
            )
            print(f"Vector store initialized at {self.persistent_directory}.")
            print(f"Existing collections: {self.collection.count()}")
            print(f"ChromaDB collection '{self.collection_name}' initialized successfully.")
        except Exception as e:
            print(f"Error initializing ChromaDB client: {e}")
            raise
    
    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """Add documents and their embeddings to the vector store."""

        if len(documents) != len(embeddings):
            raise ValueError("Number of documents and embeddings must match.")
        
        print(f"Adding {len(documents)} documents to the vector store...")

        #data preparation for chromadb
        ids=[]
        metadatas=[]
        documents_texts=[]
        embeddings_list=[]

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate a unique ID for each document
            doc_id=f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            #prepare metadata and 
            metadata = dict(doc.metadata)  # Copy existing metadata
            metadata['doc_index'] = i 
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            documents_texts.append(doc.page_content)
            embeddings_list.append(embedding.tolist())

        #add to collection
        try:
            self.collection.add(
                ids=ids,
                metadatas=metadatas,
                documents=documents_texts,
                embeddings=embeddings_list
            )
            print(f"Successfully added {len(documents)} documents to the vector store.")
            print(f"Total documents in collection after addition: {self.collection.count()}")

        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

In [10]:
vectorstore=VectorStore()
vectorstore

Initializing ChromaDB client with persistent directory: ./vector_store...
Vector store initialized at ./vector_store.
Existing collections: 10
ChromaDB collection 'document_embeddings' initialized successfully.


<__main__.VectorStore at 0x1f02fb43230>

In [11]:
chunks

[Document(metadata={'producer': 'ReportLab PDF Library - (opensource)', 'creator': 'anonymous', 'creationdate': '2026-01-30T17:23:53+05:00', 'source': 'document1.pdf', 'file_path': 'pdf_files\\document1.pdf', 'total_pages': 1, 'format': 'PDF 1.3', 'title': 'untitled', 'author': 'anonymous', 'subject': 'unspecified', 'keywords': '', 'moddate': '2026-01-30T17:23:53+05:00', 'trapped': '', 'modDate': "D:20260130172353+05'00'", 'creationDate': "D:20260130172353+05'00'", 'page': 0, 'file_type': 'pdf', 'chunk_index': 0}, page_content='Machine Learning Fundamentals\nThis is the first dummy PDF document.\nIt contains information about machine learning basics.\nMachine learning algorithms learn patterns from data.\nCommon types: Supervised, Unsupervised, Reinforcement Learning.\nApplications include image recognition, NLP, and recommendations.\nDeep learning uses neural networks for complex tasks.'),
 Document(metadata={'producer': 'ReportLab PDF Library - (opensource)', 'creator': 'anonymous', 

In [12]:
#Extract texts from chunks and generate embeddings
texts = [doc.page_content for doc in chunks]

embeddings = embedding_manager.generate_embedding(texts)
vectorstore.add_documents(chunks, embeddings)
embeddings.shape

Generating embeddings for 2 texts...
Adding 2 documents to the vector store...
Successfully added 2 documents to the vector store.
Total documents in collection after addition: 12


(2, 384)

### Retrieval Pipeline

In [13]:
class RAGRetriever:
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str,  top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        print(f"Generating embedding for the query: {query}")
        query_embedding = self.embedding_manager.generate_embedding([query])[0]

        print(f"Query embedding generated. Retrieving top {top_k} similar documents...")
        results = self.vector_store.collection.query(
            query_embeddings=[query_embedding.tolist()],
            n_results=top_k
        )

        retrieved_docs = []
        try:
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids,documents,metadatas,distances)):
                    similarity_score=1-distance

                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'document': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i+1
                        })
                    print(f"Retrieved Document {i+1}: ID={doc_id}, Similarity Score={similarity_score:.4f}")
                else:
                    print(f"No documents found above the similarity threshold of {score_threshold}.")
                return retrieved_docs
        
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

In [14]:
rag_retriever = RAGRetriever(vectorstore, embedding_manager)
print("RAG Retriever initialized.")
print(rag_retriever)

RAG Retriever initialized.
<__main__.RAGRetriever object at 0x000001F030D65940>


In [15]:
# rag_retriever.retrieve("What is the purpose of this Machine Learning?")
rag_retriever.retrieve("What is the Advanced Data Science")


Generating embedding for the query: What is the Advanced Data Science
Generating embeddings for 1 texts...
Query embedding generated. Retrieving top 5 similar documents...
Retrieved Document 1: ID=doc_c34413e9_1, Similarity Score=0.0044
Retrieved Document 2: ID=doc_69e0bd2a_1, Similarity Score=0.0044
Retrieved Document 3: ID=doc_e088d721_1, Similarity Score=0.0044
Retrieved Document 4: ID=doc_c5cc78fe_1, Similarity Score=0.0044
Retrieved Document 5: ID=doc_0b14d56b_1, Similarity Score=0.0044
No documents found above the similarity threshold of 0.0.


[{'id': 'doc_c34413e9_1',
  'document': 'Advanced Data Science Techniques\nThis is the second dummy PDF document.\nIt covers advanced topics in data science.\nFeature engineering improves model performance.\nCross-validation prevents overfitting.\nHyperparameter tuning optimizes model accuracy.\nEnsemble methods combine multiple models.',
  'metadata': {'trapped': '',
   'modDate': "D:20260130172353+05'00'",
   'moddate': '2026-01-30T17:23:53+05:00',
   'author': 'anonymous',
   'file_path': 'pdf_files\\document2.pdf',
   'total_pages': 1,
   'format': 'PDF 1.3',
   'title': 'untitled',
   'doc_index': 1,
   'chunk_index': 0,
   'page': 0,
   'creator': 'anonymous',
   'creationDate': "D:20260130172353+05'00'",
   'keywords': '',
   'producer': 'ReportLab PDF Library - (opensource)',
   'content_length': 291,
   'subject': 'unspecified',
   'file_type': 'pdf',
   'creationdate': '2026-01-30T17:23:53+05:00',
   'source': 'document2.pdf'},
  'similarity_score': 0.004385411739349365,
  'd

### Integeraion VectorDB context pipeline with LLM output

In [16]:
### Simple Rag pipeline with Groq LLM
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()

groq_api_key = os.getenv("API_KEY")

llm=ChatGroq(api_key=groq_api_key, model="llama-3.1-8b-instant", temperature=0.1, max_tokens=512)


## Rag Function
def rag_simple(query,retriever,llm,top_k=3):
    results = retriever.retrieve(query, top_k=top_k)
    context="\n\n".join([doc['document'] for doc in results]) if results else ""

    if not context:
        return "No relevant documents found."
    
    ##generate the answer using Groq LLM
    prompt=f"""Use the following context to answer the question:\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"""
    # print(f"Context for LLM:\n{context}\n")
    response=llm.invoke([prompt.format(context=context, query=query)])
    return response.content



ModuleNotFoundError: No module named 'langchain_groq'

In [None]:
answer = rag_simple("What is Advanced Data Science?", rag_retriever,llm)
print(answer)

In [None]:
# Enhanced Rag PipeLine Features

def rag_advanced(query,retriever,llm,top_k=3,min_score=0.2,return_context=False):
    results = retriever.retrieve(query, top_k=top_k, score_threshold=min_score)
    context="\n\n".join([doc['document'] for doc in results]) if results else ""
    
    if not context:
        output = {
            'answer': "No relevant documents found.",
            'sources': [],
            'confidence': 0.0,
            'context': ""
        }
        return output
    
    sources = [{
        'source': doc['metadata'].get('source_file',doc['metadata'].get('source','unknown')),
        'page': doc['metadata'].get('page', 'unknown'),
        'score': doc['similarity_score'],
        "preview": doc['document'][:100] + '...'
        } for doc in results]
    
    confidence = max([doc['similarity_score'] for doc in results])
    ##generate the answer using Groq LLM
    prompt=f"""Use the following context to answer the question:\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"""
    response=llm.invoke([prompt.format(context=context, query=query)])
    
    output = {
        'answer': response.content,
        'sources': sources,
        'confidence': confidence,
        'context': context
    }
    return output
#Example 
result = rag_advanced("Explain the concept of machine learning.", rag_retriever, llm, top_k=5, min_score=0.3, return_context=True)
print("Answer:", result['answer'])
print("Sources:", result['sources'])
print("Confidence:", result['confidence'])
print("Context Preview:", result['context'][:200])