RAG Pipelines- Data Ingestion to Vector DB Pipeline

In [54]:
import os
from pathlib import Path
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter


In [55]:
import os
from dotenv import load_dotenv
import google.generativeai as genai

# Load environment variables
load_dotenv()

# Configure Gemini API
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))

print("✅ Gemini API configured successfully")

✅ Gemini API configured successfully


In [56]:
### Read all the pdf's inside the directory
def process_all_pdfs(pdf_directory):
    """Process all PDF files in a directory"""
    all_documents = []
    pdf_dir = Path(pdf_directory)
    
    # Find all PDF files recursively
    pdf_files = list(pdf_dir.glob("**/*.pdf"))
    
    print(f"Found {len(pdf_files)} PDF files to process")
    
    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()
            
            # Add source information to metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            
            all_documents.extend(documents)
            print(f"  âœ“ Loaded {len(documents)} pages")
            
        except Exception as e:
            print(f"  âœ— Error: {e}")
    
    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

# Process all PDFs in the data directory
all_pdf_documents = process_all_pdfs("../data")

Found 5 PDF files to process

Processing: 12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf
  âœ“ Loaded 19 pages

Processing: 12037624.pdf
  âœ“ Loaded 30 pages

Processing: RAG_Research_Paper_arkiv.pdf
  âœ“ Loaded 21 pages

Processing: Resume_Latest.pdf
  âœ“ Loaded 1 pages

Processing: __The_Ultimate_AI_Engineering_Learning_Roadmap__Fr[1].pdf
  âœ“ Loaded 11 pages

Total documents loaded: 82


In [57]:
all_pdf_documents

[Document(metadata={'producer': 'WeasyPrint 65.1', 'creator': 'ChatGPT', 'creationdate': '', 'title': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization)', 'author': 'ChatGPT Deep Research', 'source': '..\\data\\pdf\\12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'total_pages': 19, 'page': 0, 'page_label': '1', 'source_file': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'file_type': 'pdf'}, page_content='12-Month Roadmap to Become a Production-Ready\nAI Engineer (Agentic AI Specialization)\nOverview: This roadmap is tailored for Yash – a 4th-year ECE student with basic Python, math, and ML\nknowledge – to transform into a production-ready AI Engineer specialized in Agentic AI over 12 months.\nYash  will  dedicate  ~8  hours  daily.  The  plan  is  divided  into  monthly  phases  with  clear  goals,  hands-on\nprojects,  and  curated  resources.  To  minimize  

In [58]:
### Text Splitting get into chunks

def split_documents(documents,chunk_size=1000,chunk_overlap=200):
    """Split Resumes into smaller chunks for better performance"""

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n","\n"," ",""]

    )

    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content : {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")
    return split_docs


In [59]:
chunks=split_documents(all_pdf_documents)
chunks


Split 82 documents into 359 chunks

Example chunk:
Content : 12-Month Roadmap to Become a Production-Ready
AI Engineer (Agentic AI Specialization)
Overview: This roadmap is tailored for Yash – a 4th-year ECE student with basic Python, math, and ML
knowledge – t...
Metadata: {'producer': 'WeasyPrint 65.1', 'creator': 'ChatGPT', 'creationdate': '', 'title': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization)', 'author': 'ChatGPT Deep Research', 'source': '..\\data\\pdf\\12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'total_pages': 19, 'page': 0, 'page_label': '1', 'source_file': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'WeasyPrint 65.1', 'creator': 'ChatGPT', 'creationdate': '', 'title': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization)', 'author': 'ChatGPT Deep Research', 'source': '..\\data\\pdf\\12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'total_pages': 19, 'page': 0, 'page_label': '1', 'source_file': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'file_type': 'pdf'}, page_content='12-Month Roadmap to Become a Production-Ready\nAI Engineer (Agentic AI Specialization)\nOverview: This roadmap is tailored for Yash – a 4th-year ECE student with basic Python, math, and ML\nknowledge – to transform into a production-ready AI Engineer specialized in Agentic AI over 12 months.\nYash  will  dedicate  ~8  hours  daily.  The  plan  is  divided  into  monthly  phases  with  clear  goals,  hands-on\nprojects,  and  curated  resources.  To  minimize  

### Embeddings and VectorDB

In [60]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict , Any , Tuple
from sklearn.metrics.pairwise import cosine_similarity


In [61]:
from typing import List
class EmbeddingManager:
    """Handles document embedding generation using SentenceTransformer"""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager
        
        Args:
            model_name: HuggingFace model name for sentence embeddings
        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts
        
        Args:
            texts: List of text strings to embed
            
        Returns:
            numpy array of embeddings with shape (len(texts), embedding_dim)
        """
        if not self.model:
            raise ValueError("Model not loaded")
        
        print(f"Generating embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings


## initialize the embedding manager

embedding_manager=EmbeddingManager()
embedding_manager

Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x1e09e96ec90>

### VectorStore


In [62]:
class VectorStore:
    """Manages document embeddings in a ChromaDB vector store"""
    
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        """
        Initialize the vector store
        
        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        try:
            # Create persistent ChromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            
            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the vector store

        Args:
            documents: List of LangChain documents
            embeddings: Corresponding embeddings for the documents
        """
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store...")

        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique ID
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            # Document content
            documents_text.append(doc.page_content)

            # Embedding
            embeddings_list.append(embedding.tolist())

        # Add to collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"Successfully added {len(documents)} documents to vector store")
            print(f"Total documents in collection: {self.collection.count()}")
        
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise
            
vectorstore=VectorStore()
vectorstore

Vector store initialized. Collection: pdf_documents
Existing documents in collection: 2417


<__main__.VectorStore at 0x1e09ecfce10>

In [63]:
chunks

[Document(metadata={'producer': 'WeasyPrint 65.1', 'creator': 'ChatGPT', 'creationdate': '', 'title': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization)', 'author': 'ChatGPT Deep Research', 'source': '..\\data\\pdf\\12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'total_pages': 19, 'page': 0, 'page_label': '1', 'source_file': '12-Month Roadmap to Become a Production-Ready AI Engineer (Agentic AI Specialization).pdf', 'file_type': 'pdf'}, page_content='12-Month Roadmap to Become a Production-Ready\nAI Engineer (Agentic AI Specialization)\nOverview: This roadmap is tailored for Yash – a 4th-year ECE student with basic Python, math, and ML\nknowledge – to transform into a production-ready AI Engineer specialized in Agentic AI over 12 months.\nYash  will  dedicate  ~8  hours  daily.  The  plan  is  divided  into  monthly  phases  with  clear  goals,  hands-on\nprojects,  and  curated  resources.  To  minimize  

In [64]:
### convert the text to embeddings
texts=[doc.page_content for doc in chunks]

##Generate the embeddings

embeddings = embedding_manager.generate_embeddings(texts)

##Store in the vector database

vectorstore.add_documents(chunks,embeddings)

Generating embeddings for 359 texts...


Batches: 100%|██████████| 12/12 [00:08<00:00,  1.42it/s]


Generated embeddings with shape: (359, 384)
Adding 359 documents to vector store...
Successfully added 359 documents to vector store
Total documents in collection: 2776


### retriever pipeline from vector store


In [65]:
class RAGRetriever:
    """Handles query-based retrieval from the vector store"""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever
        
        Args:
            vector_store: Vector store containing document embeddings
            embedding_manager: Manager for generating query embeddings
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a query
        
        Args:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold
            
        Returns:
            List of dictionaries containing retrieved documents and metadata
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")

         # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        # Search in vector store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            
            # Process results
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]
                
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever=RAGRetriever(vectorstore,embedding_manager)


In [73]:
rag_retriever.retrieve("What are news values")

Retrieving documents for query: 'What are news values'
Top K: 5, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 166.71it/s]

Generated embeddings with shape: (1, 384)
Retrieved 5 documents (after filtering)





[{'id': 'doc_80158d1d_112',
  'content': '7 \n \nWhat is News Values?  Newsworthiness  or news values  is a term discussed by many linguists, \nsociologists and, mainly,  researchers dealing, directly or indirectly, with the field of media \nstudies. Stuart Hall states that:  \nThe media do not simply and transparently  report events which are \nnaturally newsworthy in themselves. News is the end product of a \ncomplex process, which begins with systematic sorting and selecting \nof events and topics according to a socially constructed set of \ncategories.  \n       Stuart Hall ( in Fowler1991:12) \n  Philo goes on even further to maintain that news is not found or even gathered. It is a \ncreation of a journalistic process, an artefact, and a commodity ( in Fowler 1991). The last \nstatement, as discussed above, is hardly applicable in the contemporary era of mass \ncommunication, where, with the existence of the internet, everybody can have access to \npractically any nugget of infor

### RAG with Gemini LLM - Answer Generation

In [74]:
class GeminiLLM:
    """Wrapper for Google Gemini 2.5 Flash API optimized for RAG"""
    
    def __init__(
        self, 
        model_name: str = "gemini-2.5-flash",
        temperature: float = 0.1,
        max_output_tokens: int = 500,
        top_p: float = 0.95,
        top_k: int = 40
    ):
        self.model_name = model_name
        
        # Generation config optimized for factual Q&A
        self.generation_config = {
            "temperature": temperature,
            "top_p": top_p,
            "top_k": top_k,
            "max_output_tokens": max_output_tokens,
        }
        
        # Safety settings (adjust as needed)
        self.safety_settings = [
            {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
        ]
        
        # Initialize model
        self.model = genai.GenerativeModel(
            model_name=self.model_name,
            generation_config=self.generation_config,
            safety_settings=self.safety_settings
        )
        
        print(f"✅ GeminiLLM initialized with {self.model_name}")
        print(f"   Temperature: {temperature}, Max tokens: {max_output_tokens}")
    
    def generate(self, prompt: str, max_retries: int = 3) -> str:
        """
        Generate response with retry logic
        
        Args:
            prompt: The input prompt
            max_retries: Number of retry attempts on failure
            
        Returns:
            Generated text response
        """
        for attempt in range(max_retries):
            try:
                response = self.model.generate_content(prompt)
                return response.text.strip()
            
            except Exception as e:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"⚠️  Attempt {attempt + 1} failed: {e}")
                    print(f"   Retrying in {wait_time}s...")
                    import time
                    time.sleep(wait_time)
                else:
                    print(f"❌ All {max_retries} attempts failed")
                    raise

# Initialize Gemini LLM
gemini_llm = GeminiLLM(
    temperature=0.1,      # Low temperature for factual responses
    max_output_tokens=500,
    top_p=0.95,
    top_k=40
)

✅ GeminiLLM initialized with gemini-2.5-flash
   Temperature: 0.1, Max tokens: 500


In [75]:
# List available models to find the correct one
for model in genai.list_models():
    if 'generateContent' in model.supported_generation_methods:
        print(f"Model: {model.name}")
        print(f"  Display name: {model.display_name}")
        print(f"  Supported methods: {model.supported_generation_methods}")
        print()

Model: models/gemini-2.5-pro-preview-03-25
  Display name: Gemini 2.5 Pro Preview 03-25
  Supported methods: ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']

Model: models/gemini-2.5-flash-preview-05-20
  Display name: Gemini 2.5 Flash Preview 05-20
  Supported methods: ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']

Model: models/gemini-2.5-flash
  Display name: Gemini 2.5 Flash
  Supported methods: ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']

Model: models/gemini-2.5-flash-lite-preview-06-17
  Display name: Gemini 2.5 Flash-Lite Preview 06-17
  Supported methods: ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']

Model: models/gemini-2.5-pro-preview-05-06
  Display name: Gemini 2.5 Pro Preview 05-06
  Supported methods: ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']

Model: models/gemini-2.5-pro-preview-06-05
  D

In [76]:
def rag_answer(query: str, top_k: int = 3) -> dict:
    """
    Complete RAG pipeline: Retrieve relevant context + Generate answer

    Args:
        query: User's question
        top_k: Number of relevant chunks to retrieve

    Returns:
        Dictionary with answer, sources, and metadata
    """
    print(f"📝 Query: {query}")
    print(f"🔍 Retrieving top {top_k} relevant chunks...\n")

    # Step 1: Retrieve relevant documents (returns list of dicts)
    results = rag_retriever.retrieve(query, top_k=top_k)

    if not results:
        return {
            "answer": "I couldn't find relevant information to answer this question.",
            "sources": [],
            "context_used": ""
        }

    # Step 2: Build context from retrieved chunks
    context_parts = []
    for i, doc_dict in enumerate(results, 1):
        content = doc_dict['content']
        metadata = doc_dict['metadata']
        source = metadata.get('source', 'Unknown')
        page = metadata.get('page', 'N/A')
        similarity = doc_dict['similarity_score']

        context_parts.append(f"[Source {i}: {source}, Page {page}]\n{content}")
        print(f"📄 Source {i}: {source} (Page {page}) - Similarity: {similarity:.1%}")

    context = "\n\n".join(context_parts)

    # Step 3: Build RAG prompt
    prompt = f"""You are a helpful assistant that answers questions based on the provided context.

Context:
{context}

Question: {query}

Instructions:
- Answer based ONLY on the information in the context above
- If the context doesn't contain enough information, say so
- Be concise but complete
- Cite which source(s) you used

Answer:"""

    # Step 4: Generate answer
    print(f"\n🤖 Generating answer with Gemini...\n")
    answer = gemini_llm.generate(prompt)

    return {
        "answer": answer,
        "sources": [
            {
                "source": doc_dict['metadata'].get('source', 'Unknown'),
                "page": doc_dict['metadata'].get('page', 'N/A'),
                "similarity": doc_dict['similarity_score'],
                "content": doc_dict['content'][:200] + "..."  # Preview
            }
            for doc_dict in results
        ],
        "context_used": context
    }

In [77]:
# Test the complete RAG pipeline
result = rag_answer("What are news values?", top_k=3)

# Display answer
print("=" * 80)
print("ANSWER:")
print("=" * 80)
print(result['answer'])
print("\n" + "=" * 80)
print("SOURCES USED:")
print("=" * 80)
for i, source in enumerate(result['sources'], 1):
    print(f"\n{i}. {source['source']} (Page {source['page']}) - Similarity: {source['similarity']:.1%}")
    print(f"   Preview: {source['content']}")

📝 Query: What are news values?
🔍 Retrieving top 3 relevant chunks...

Retrieving documents for query: 'What are news values?'
Top K: 3, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 156.47it/s]

Generated embeddings with shape: (1, 384)
Retrieved 3 documents (after filtering)
📄 Source 1: ..\data\pdf\12037624.pdf (Page 6) - Similarity: 60.9%
📄 Source 2: ..\data\pdf\12037624.pdf (Page 6) - Similarity: 60.9%
📄 Source 3: ..\data\pdf\12037624.pdf (Page 6) - Similarity: 60.9%

🤖 Generating answer with Gemini...






ANSWER:
News values, also known as newsworthiness, is a term discussed by many linguists, sociologists, and researchers involved in media studies. (Source 1, Source 2, Source 3)

SOURCES USED:

1. ..\data\pdf\12037624.pdf (Page 6) - Similarity: 60.9%
   Preview: 7 
 
What is News Values?  Newsworthiness  or news values  is a term discussed by many linguists, 
sociologists and, mainly,  researchers dealing, directly or indirectly, with the field of media 
stud...

2. ..\data\pdf\12037624.pdf (Page 6) - Similarity: 60.9%
   Preview: 7 
 
What is News Values?  Newsworthiness  or news values  is a term discussed by many linguists, 
sociologists and, mainly,  researchers dealing, directly or indirectly, with the field of media 
stud...

3. ..\data\pdf\12037624.pdf (Page 6) - Similarity: 60.9%
   Preview: 7 
 
What is News Values?  Newsworthiness  or news values  is a term discussed by many linguists, 
sociologists and, mainly,  researchers dealing, directly or indirectly, with the field of 