### RAG Pipelines- Data Ingestion to Vector DB Pipeline 

In [None]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

In [None]:
def process_all_pdf(pdf_directory):
    """Process all PDF files in a directory."""
    all_documents = []
    pdf_dir = Path(pdf_directory)

    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print(f"Found {len(pdf_files)} PDF files to process.")

    for pdf_file in pdf_files:
        print(f"\n Processing file: {pdf_file.name}")
        try:
            loader = PyMuPDFLoader(str(pdf_file))
            documents = loader.load()

            for doc in documents:
                doc.metadata["source_file"]= pdf_file.name
                doc.metadata["file_type"]= "pdf"
            
            all_documents.extend(documents)
            print(f"Loaded {len(documents)} pages from {pdf_file.name}")

        except Exception as e:
            print(f"Error loading {pdf_file.name}: {e}")

    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents            

all_pdf_documents=process_all_pdf("../data/pdf")



In [None]:
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    """Split the document into smaller chunks for better RAG performance."""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n","\n"," ",""]
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks.")


    if split_docs:
        print("Sample chunk")
        print(f"Content: {split_docs[0].page_content[:200]}....")
        print(f"Metadata: {split_docs[0].metadata}")

    return split_docs


In [None]:
chunks = split_documents(all_pdf_documents)

### Embedding and Vector Store DB

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class EmbeddingManager:
    """Handles Documents Embedding Generation using Sentence Transformers"""

    def __init__(self, model_name:str="all-MiniLM-L6-v2"):
        """
        Initialize the Embedding Manager with a specified model.

        Args:
            model_name (str): HuggingFace model name for Sentence Embedding
        """
        self.model_name = model_name
        self.model = None
        self._load_model()


    def _load_model(self):
        """Load the Sentence Transformers model."""
        try:
            print(f"Loading Embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model Loaded Successfully. Embedding dimensions: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embedding for a list of texts

        Args:
            texts(List[str]): List of text string to generate embedding for 

        Returns:
            numpy array for embeddings of shape (len(texts), embedding_dimension)
        """

        if not self.model:
            raise ValueError("Model Not Loaded Properly Try Loading Model Again.")
        
        print(f"Generating embedding for {len(texts)} texts....")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embedding with shape : {embeddings.shape}")
        return embeddings
    

### intialize the embedding manager 
embedding_manager = EmbeddingManager()
embedding_manager

In [None]:
class VectorStore:
    """ Manages Documents embedding in a ChromaDB Vector Store """

    def __init__(self, collection_name: str="pdf_documents", persist_directory: str= "../data/vector_store"):
        """
        Initialize the Vector Store with ChromaDB

        Args: 
            collection_name (str): Name of the ChromaDB Collection
            persist_directory (str): Directory to persist the Vector store 
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize the ChromaDB Client and Collection"""
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path = self.persist_directory)

            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF Documents Embedding for RAG"}
                )
            print(f"Vector Store Initialized. Collection: {self.collection_name}")
            print(f"Existing Documents in Collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error Initializing Vector Store: {e}")
            raise 

    def add_documents(self, documents: List[Any], embedding: np.ndarray):
        """
        Add Documents and their Embedding to the vector Store


        Args:
            documents List[Any]: List of all the Langchain Documents 
            embedding np.ndarray: Corresponding Embedding to the documents 
        """
        if len(documents) != len(embedding):
            raise ValueError("Number of documents and embedding must match the number of embeddings.")
        
        print(f"Adding {len(documents)} documents to vector Store...")

        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []

        for i,(doc, embedding) in enumerate(zip(documents, embedding)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)


            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)


            documents_text.append(doc.page_content)

            embeddings_list.append(embedding.tolist())

        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"Successfully added {len(documents)} documents to the vector store")
            print(f"Total documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error adding documents to the vector store {e}")
            raise

vector_store = VectorStore()
vector_store
            

In [None]:
# Lets Convert the chunk to embedding
texts=[doc.page_content for doc in chunks ]

embeddings=embedding_manager.generate_embeddings(texts)

vector_store.add_documents(chunks,embeddings)


### Retriever Pipeline From VectorStore

In [None]:
class RAGRetrieval:
    """Handles query-based retrieval from the Vector Store"""

    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        initializing the retriever

        Args:
            vector_store: Vector store containing document embeddings
            embedding_manager: Embedding manager to generate query embeddings
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager


    def retrieve(self, query:str, top_k:int=5, score_threshold:float=0.0) -> List[Dict[str, Any]]:
        """
        Retrieve the relevant documents for a query 

        Args:
            query: The search Query
            top_k: Number of top results to retrieve
            score_threshold: Minimum similarity score threshold for filtering results

        Returns:
            List of Dictionaries containing retrieved documents and their metadata 
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score Threshold: {score_threshold}")


        query_embedding = self.embedding_manager.generate_embeddings([query])[0]


        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )

            retrieved_docs = []

            if results['documents'] and results['documents'][0]:
                ids = results['ids'][0]
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]

                for i, (doc_id, document, metadata , distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    similarity_score = 1 - distance

                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,

                        })
                print(f"Retrieved {len(retrieved_docs)} documents after applying score threshold.")
            else:
                print("No documents retrieved from the vector store.")

            return retrieved_docs
        
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []
        
rag_retriever = RAGRetrieval(vector_store, embedding_manager)
rag_retriever

In [None]:
rag_retriever.retrieve("Give me all about agents?")

In [None]:
from rag_config import OPENAI_API_KEY
from langchain_openai import ChatOpenAI

openai_api_key=OPENAI_API_KEY

llm = ChatOpenAI(api_key=openai_api_key, base_url="https://openrouter.ai/api/v1" , model_name="gpt-4o-mini",temperature=0.1, max_tokens=1024)

def rag_simple(query, retriever,llm, top_k=3):

    results = retriever.retrieve(query, top_k=top_k)
    context="\n\n".join([doc['content'] for doc in results]) if results else ""
    if not context:
        return "No relevant context found to answer the question."
    
    prompt=f"""
            Use the following context to answer the question concisely. 
            Context: {context}

            Question: {query}

            Answer: Answer the Question based on the above context.         
            """
    
    response=llm.invoke([prompt.format(context=context, query=query)])
    return response.content

In [None]:
answer = rag_simple("Explain me about langchain agents", rag_retriever, llm, top_k=3)
print(answer)

In [None]:
def rag_advanced(query, retriever, llm, top_k=3, min_score=0.2, return_context=False):
    """
    RAG Pipeline with advance options and extra features:
    - Returns answer, sources, confidence score and optionally full context.
    """
    results = retriever.retrieve(query, top_k=top_k, score_threshold=min_score)
    if not results:
        return {
            "answer": "No relevant context found to answer the question.",
            "sources": [],
            "confidence_score": 0.0,
            "context": ""
        }
    

    context="\n\n".join([doc["content"] for doc in results])
    sources=[{
        'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
        'page': doc['metadata'].get('page', 'unknown'),
        'similarity_score': doc['similarity_score'],
        'preview': doc['content'][:120] + "...."
    } for doc in results]
    confidence_score = max(doc['similarity_score'] for doc in results)


    prompt=f"""Use the following context to answer the question concisely.
            Context: {context}

            Question: {query}

            Answer: Answer the Question based on the above context.
            """
    

    response = llm.invoke([prompt.format(context=context, query=query)])

    output={
        "answer": response.content,
        "sources": sources,
        "confidence_score": confidence_score
    }
    if return_context:
        output["context"] = context
    return output

result = rag_advanced("What are Langchain agents", rag_retriever, llm, top_k=3, min_score=0.2, return_context=True)

print("Answer:", result['answer'])
print("Sources:", result['sources'])
print("Confidence Score:", result['confidence_score'])
print("Context:", result['context'])

In [None]:
from typing import List, Dict, Any
import time

class AdvancedRAGPipeline:
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
        self.history = []  # Store query history

    def query(self, question: str, top_k: int = 5, min_score: float = 0.2, stream: bool = False, summarize: bool = False) -> Dict[str, Any]:
        # Retrieve relevant documents
        results = self.retriever.retrieve(question, top_k=top_k, score_threshold=min_score)
        if not results:
            answer = "No relevant context found."
            sources = []
            context = ""
        else:
            context = "\n\n".join([doc['content'] for doc in results])
            sources = [{
                'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
                'page': doc['metadata'].get('page', 'unknown'),
                'score': doc['similarity_score'],
                'preview': doc['content'][:120] + '...'
            } for doc in results]
            # Streaming answer simulation
            prompt = f"""Use the following context to answer the question concisely.\nContext:\n{context}\n\nQuestion: {question}\n\nAnswer:"""
            if stream:
                print("Streaming answer:")
                for i in range(0, len(prompt), 80):
                    print(prompt[i:i+80], end='', flush=True)
                    time.sleep(0.05)
                print()
            response = self.llm.invoke([prompt.format(context=context, question=question)])
            answer = response.content

        # Add citations to answer
        citations = [f"[{i+1}] {src['source']} (page {src['page']})" for i, src in enumerate(sources)]
        answer_with_citations = answer + "\n\nCitations:\n" + "\n".join(citations) if citations else answer

        # Optionally summarize answer
        summary = None
        if summarize and answer:
            summary_prompt = f"Summarize the following answer in 2 sentences:\n{answer}"
            summary_resp = self.llm.invoke([summary_prompt])
            summary = summary_resp.content

        # Store query history
        self.history.append({
            'question': question,
            'answer': answer,
            'sources': sources,
            'summary': summary
        })

        return {
            'question': question,
            'answer': answer_with_citations,
            'sources': sources,
            'summary': summary,
            'history': self.history
        }

# Example usage:
adv_rag = AdvancedRAGPipeline(rag_retriever, llm)
result = adv_rag.query("What are Langchain agents", top_k=3, min_score=0.1, stream=True, summarize=True)
print("\nFinal Answer:", result['answer'])
print("Summary:", result['summary'])
print("History:", result['history'][-1])