### Data Ingestion Pipeline

In [1]:
import os
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
## Read PDF file
def process_pdfs(pdf_directory):
    all_documents = []
    pdf_dir = Path(pdf_directory)

    pdf_files = list(pdf_dir.glob('**/*.pdf'))


    for pdf_file in pdf_files:
        print(f"Processing: {pdf_file}")
        try:
            loader = PyMuPDFLoader(str(pdf_file))
            documents = loader.load()

            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type' ] = "pdf"
                
            all_documents.extend(documents)
        except Exception as e:
            print(f"Error processing {pdf_file}: {e}")

    return all_documents

process_pdfs('../data')


Processing: ..\data\pdf_files\Django.pdf
Processing: ..\data\pdf_files\Python.pdf
Processing: ..\data\pdf_files\RAG.pdf


[Document(metadata={'producer': 'Skia/PDF m147 Google Docs Renderer', 'creator': '', 'creationdate': '', 'source': '..\\data\\pdf_files\\Django.pdf', 'file_path': '..\\data\\pdf_files\\Django.pdf', 'total_pages': 1, 'format': 'PDF 1.4', 'title': 'Django', 'author': '', 'subject': '', 'keywords': '', 'moddate': '', 'trapped': '', 'modDate': '', 'creationDate': '', 'page': 0, 'source_file': 'Django.pdf', 'file_type': 'pdf'}, page_content='Django is a high-level Python web framework designed to help developers build secure, \nscalable, and maintainable web applications quickly. Created in 2005, Django follows the \nphilosophy of “batteries included,” meaning it provides many built-in features out of the box, \nsuch as authentication, admin panels, ORM (Object-Relational Mapping), and security \nprotections. This allows developers to focus more on application logic rather than reinventing \ncommon components. \nOne of Django’s core architectural patterns is the MVT (Model–View–Template) de

In [3]:
## Text into chunks

def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size= chunk_size, 
        chunk_overlap= chunk_overlap,
        length_function= len,
        separators=["\n\n", "\n", " ", ""]
        )
    split_documents =text_splitter.split_documents(documents)
    print(f"Total chunks created: {len(split_documents)}")


    if split_documents:
        print(f'\nExample chunk:')
        print(f"Content: {split_documents[0].page_content[:100]}...")
        print(f"Metadata: {split_documents[0].metadata}")

    return split_documents

In [4]:
all_pdf_documents = process_pdfs('../data/pdf_files')
chunks = split_documents(all_pdf_documents)
chunks

Processing: ..\data\pdf_files\Django.pdf
Processing: ..\data\pdf_files\Python.pdf
Processing: ..\data\pdf_files\RAG.pdf
Total chunks created: 10

Example chunk:
Content: Django is a high-level Python web framework designed to help developers build secure, 
scalable, and...
Metadata: {'producer': 'Skia/PDF m147 Google Docs Renderer', 'creator': '', 'creationdate': '', 'source': '..\\data\\pdf_files\\Django.pdf', 'file_path': '..\\data\\pdf_files\\Django.pdf', 'total_pages': 1, 'format': 'PDF 1.4', 'title': 'Django', 'author': '', 'subject': '', 'keywords': '', 'moddate': '', 'trapped': '', 'modDate': '', 'creationDate': '', 'page': 0, 'source_file': 'Django.pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'Skia/PDF m147 Google Docs Renderer', 'creator': '', 'creationdate': '', 'source': '..\\data\\pdf_files\\Django.pdf', 'file_path': '..\\data\\pdf_files\\Django.pdf', 'total_pages': 1, 'format': 'PDF 1.4', 'title': 'Django', 'author': '', 'subject': '', 'keywords': '', 'moddate': '', 'trapped': '', 'modDate': '', 'creationDate': '', 'page': 0, 'source_file': 'Django.pdf', 'file_type': 'pdf'}, page_content='Django is a high-level Python web framework designed to help developers build secure, \nscalable, and maintainable web applications quickly. Created in 2005, Django follows the \nphilosophy of “batteries included,” meaning it provides many built-in features out of the box, \nsuch as authentication, admin panels, ORM (Object-Relational Mapping), and security \nprotections. This allows developers to focus more on application logic rather than reinventing \ncommon components. \nOne of Django’s core architectural patterns is the MVT (Model–View–Template) de

### Embedding and VectorStore DB

In [5]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [6]:
class EmbeddingManager:
    def __init__(self, model_name = 'all-MiniLM-L6-v2'):
        self.model_name = model_name
        self.model = None
        self._load_model(model_name)

    def _load_model(self, model_name):
        try:
            self.model = SentenceTransformer(self.model_name)
        except Exception as e:
            print(f"Error loading model '{model_name}': {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        if not self.model:
            raise ValueError("Model not loaded.")
        print(f"Generating embeddings for {len(texts)} texts...")
        try:
            embeddings = self.model.encode(texts, show_progress_bar=True)
            return embeddings
        except Exception as e:
            print(f"Error generating embeddings: {e}")
            raise

    def get_embedding_dimensions(self):
        if not self.model:
            raise ValueError("Model not loaded.")
        return self.model.get_sentence_embedding_dimension()
    

embedding_manager = EmbeddingManager()
embedding_manager

Loading weights: 100%|██████████| 103/103 [00:00<00:00, 221.71it/s, Materializing param=pooler.dense.weight]                             
[1mBertModel LOAD REPORT[0m from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m


<__main__.EmbeddingManager at 0x1a19264fcb0>

### VectorStore

In [7]:
class VectorStore:
    def __init__(self, collection_name='all_pdf_documents', persist_directory='../data/vector_store/pdf'):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_chromadb()

    def _initialize_chromadb(self):
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path= self.persist_directory)
        
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document chunks with embeddings"}
            )

            print(f'Vector store initialized with collection: {self.collection_name}')
            
        
        except Exception as e:
            print(f"Error initializing ChromaDB: {e}")
            raise

    def add_documents(self, documents: List[Dict[str, Any]], embeddings: np.ndarray):

        if len(documents) != len(embeddings):
            raise ValueError("Number of documents and embeddings must match.")
        

        # Prepare the chromadb entries

        ids = []
        metadata_list = []
        documents_text = []
        embeddings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate a unique ID for each document
            doc_id = f'doc_{uuid.uuid4().hex[:8]}_{i}'
            ids.append(doc_id)

            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadata_list.append(metadata)
              
            
            documents_text.append(doc.page_content)
            embeddings_list.append(embedding)


        try:
            self.collection.add(
                ids=ids,
                documents=documents_text,
                embeddings=embeddings_list,
                metadatas=metadata_list
            )

            print(f'Successfully added {len(documents)} documents to the vector store.')
        except Exception as e:
            print(f"Error adding documents to ChromaDB: {e}")
            raise
            # Ensure it's a standard dictionary


vector_store = VectorStore()
vector_store

Vector store initialized with collection: all_pdf_documents


<__main__.VectorStore at 0x1a1823597f0>

In [8]:
texts = [chunk.page_content for chunk in chunks]
# texts
embeddings = embedding_manager.generate_embeddings(texts)
# embeddings
vector_store.add_documents(chunks, embeddings)


Generating embeddings for 10 texts...


Batches: 100%|██████████| 1/1 [00:01<00:00,  1.86s/it]

Successfully added 10 documents to the vector store.





### Retriever pipeline from vector store

In [9]:
class RAGRetriever:
    """Handles the retrieval of relevant document chunks based on a query."""

    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 2, score_threshold: float = 0.5):
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        try:
            # Perform the query to get relevant documents
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k,
                include=['documents', 'metadatas', 'embeddings', 'distances']
            )

            retrieved_docs = []
            documents = results.get('documents', [])[0]
            metadatas = results.get('metadatas', [])[0]
            distances = results.get('distances', [])[0]
            ids = results.get('ids', [])[0]

            # Check if the query returned any results
            if not documents:
                print("No documents found")
                return []

            # Process the results and filter based on similarity score
            for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                similarity_score = 1.0 - (distance / 2.0) # Higher similarity means a smaller distance
                
                if similarity_score >= score_threshold:
                    retrieved_docs.append({
                        'id': doc_id,
                        'content': document,
                        'metadata': metadata,
                        'similarity_score': similarity_score,
                        'distance': distance,
                        'rank': i + 1
                    })

            if not retrieved_docs:
                print(f"No documents meet the score threshold of {score_threshold}")
            
            return retrieved_docs

        except Exception as e:
            print(f"Error during retrieval: {e}")
            raise


# Make sure vector_store and embedding_manager are instantiated before passing them to the RAGRetriever
vector_store = VectorStore()  # Assuming you have this class set up
embedding_manager = EmbeddingManager()  # Assuming you have this class set up


Vector store initialized with collection: all_pdf_documents


Loading weights: 100%|██████████| 103/103 [00:00<00:00, 220.47it/s, Materializing param=pooler.dense.weight]                             
[1mBertModel LOAD REPORT[0m from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m


In [10]:
rag_retriever = RAGRetriever(vector_store, embedding_manager)
rag_retriever.retrieve("What is Django?", top_k=2, score_threshold=0.5)


Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 12.17it/s]


[{'id': 'doc_b9299715_0',
  'content': 'Django is a high-level Python web framework designed to help developers build secure, \nscalable, and maintainable web applications quickly. Created in 2005, Django follows the \nphilosophy of “batteries included,” meaning it provides many built-in features out of the box, \nsuch as authentication, admin panels, ORM (Object-Relational Mapping), and security \nprotections. This allows developers to focus more on application logic rather than reinventing \ncommon components. \nOne of Django’s core architectural patterns is the MVT (Model–View–Template) design. \nThe Model represents the database structure and business logic, the View handles the \nrequest-response logic, and the Template manages the presentation layer. This separation \nof concerns promotes clean, organized code and makes large projects easier to manage \nand scale. \nDjango’s ORM is one of its strongest features. It allows developers to interact with',
  'metadata': {'file_path': 

In [17]:
from google import genai
import os
from dotenv import load_dotenv

load_dotenv() 

api_key = os.getenv("GOOGLE_API_KEY")

class simple_rag_retriever:
    def __init__(self, api_key):
        self.client = genai.Client(api_key=api_key)
        self.model_name = "gemini-2.5-flash-lite" 
        
    def generate_response(self, query, retrieved_docs):
        if not retrieved_docs:
            return "Error: The retriever returned 0 chunks. The context is empty!"

        # Context Injection
        context_chunks = [doc['content'] for doc in retrieved_docs]
        context_string = "\n\n---\n\n".join(context_chunks)

        # Constructing the prompt
        prompt = f"""You are a helpful assistant answering questions based on the provided documents.
Using only the text below, answer the question. If the answer is not contained in the context, say "I do not have enough information to answer that."
        
Context:
{context_string}

Question: {query}

Answer:"""
        
        try:
            # THE FIX: Explicitly stating model= and contents=
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=prompt
            )
            return response.text.strip()
        except Exception as e:
            print(f"Error generating response: {e}")
            raise

#Query
query = "best features of python?"

# 1. Retrieve the documents
retrieved_docs = rag_retriever.retrieve(query, top_k=2, score_threshold=0.2)

# 2. Initialize generator
generator = simple_rag_retriever(api_key)

# 3. Generate
final_answer = generator.generate_response(query, retrieved_docs)

print("\n=== FINAL RAG ANSWER ===")
print(final_answer)

Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00,  4.82it/s]


Error generating response: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/rate-limit. \n* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 20, model: gemini-2.5-flash-lite\nPlease retry in 26.919787562s.', 'status': 'RESOURCE_EXHAUSTED', 'details': [{'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Learn more about Gemini API quotas', 'url': 'https://ai.google.dev/gemini-api/docs/rate-limits'}]}, {'@type': 'type.googleapis.com/google.rpc.QuotaFailure', 'violations': [{'quotaMetric': 'generativelanguage.googleapis.com/generate_content_free_tier_requests', 'quotaId': 'GenerateRequestsPerDayPerProjectPerModel-FreeTier', 'quotaDimensions': {'location': 'global', 'model': '

ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/rate-limit. \n* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 20, model: gemini-2.5-flash-lite\nPlease retry in 26.919787562s.', 'status': 'RESOURCE_EXHAUSTED', 'details': [{'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Learn more about Gemini API quotas', 'url': 'https://ai.google.dev/gemini-api/docs/rate-limits'}]}, {'@type': 'type.googleapis.com/google.rpc.QuotaFailure', 'violations': [{'quotaMetric': 'generativelanguage.googleapis.com/generate_content_free_tier_requests', 'quotaId': 'GenerateRequestsPerDayPerProjectPerModel-FreeTier', 'quotaDimensions': {'location': 'global', 'model': 'gemini-2.5-flash-lite'}, 'quotaValue': '20'}]}, {'@type': 'type.googleapis.com/google.rpc.RetryInfo', 'retryDelay': '26s'}]}}