# Multi Source RAG Pipeline

### Document Loaders

1. Pdf(Text Only)
2. Csv
3. Website

In [1]:
from langchain_community.document_loaders import PyMuPDFLoader,SitemapLoader,WebBaseLoader,RecursiveUrlLoader,AsyncHtmlLoader
from langchain_community.document_transformers import Html2TextTransformer
from langchain_community.document_loaders.csv_loader import CSVLoader 

from pathlib import Path
import datetime
import nest_asyncio
nest_asyncio.apply()


  from .autonotebook import tqdm as notebook_tqdm
USER_AGENT environment variable not set, consider setting it to identify your requests.
USER_AGENT environment variable not set, consider setting it to identify your requests.


In [2]:

# for plain text pdfs
def pdf_loader(pdf_dir):
    all_docs = []
    dir = Path(pdf_dir)

    pdf_files = list(dir.glob("**/*.pdf"))
    print("loaded documents")
    for p in pdf_files:
        loader = PyMuPDFLoader(
            file_path=p,
            mode="page"
        )

        documents = loader.load()
        for doc in documents:
            doc.metadata['creationdata'] = str(datetime.datetime.now())
            doc.metadata['source_file']=p.name
            doc.metadata['file_type'] = 'pdf'
            all_docs.append(doc)
    print("done")
    return all_docs


In [3]:

#csv loader
def csv_loader(csv_dir):
    all_documents = []
    dir = Path(csv_dir)

    csv_files = list(dir.glob("**/*.csv"))
    for c in csv_files:
        loader = CSVLoader(
        file_path=c,
        csv_args={
            "delimiter":",",
            }             
        )

        documents = loader.load()
        for doc in documents:
            doc.metadata['source_file']=c.name
            doc.metadata['creationdata'] = str(datetime.datetime.now())
            doc.metadata['format'] = "csv"

            all_documents.append(doc)

    return all_documents

In [4]:
from typing import List, Optional,Sequence
from langchain_classic.schema import Document


class WebLoader:

    def __init__(self):
        self.html_transformer = Html2TextTransformer()

    def _postprocess(self,docs:Sequence[Document],source_type:str)->List[Document]:
        """ Post-process documents by cleaning content and adding metadata. """
        processed_docs = []
        for d in docs:
            d.page_content = d.page_content.strip()
            d.metadata.update({
                "source_type": source_type,
                "ingested_at": str(datetime.datetime.now()),
                "content_length": len(d.page_content)
            })

            if d.page_content:
                processed_docs.append(d)
        
        return processed_docs


    def load_single_page(self,url:str)->List[Document]:
        """ Load a single Web Page """
        docs = WebBaseLoader(url).load()
        process =self._postprocess(docs, "web_page")
        return process


    def load_sitemap(self, sitemap_url: str, filter_urls: Optional[List[str]] = None) -> List[Document]:
        """ Load all pages from a sitemap """
        loader = SitemapLoader(sitemap_url, filter_urls=filter_urls)
        docs = loader.load()
        processed = self._postprocess(docs, "sitemap")

        return processed

    def load_recursive(self, base_url: str, max_depth: int = 2) -> List[Document]:
        """  Recursively crawl a website starting from a base URL. """
        docs = RecursiveUrlLoader(
            url=base_url,
            max_depth=max_depth
        ).load()
        processed = self._postprocess(docs, "recursive")
        
        return processed


    def load_async_urls(self,urls: List[str])->List[Document]:
        """ Load multiple URLs asynchronously for faster processing. """
        loader = AsyncHtmlLoader(urls)
        docs = list(loader.lazy_load())

        docs = self.html_transformer.transform_documents(docs)
        processed = self._postprocess(docs, "async")

        return processed


In [None]:
all_docs = []
web_loader = WebLoader()

single_page_url = "https://www.google.com"
sitemap_page_url = ""
recursive_docs_url = ""
async_urls = []

single_page = web_loader.load_single_page(single_page_url)
#sitemap_page = web_loader.load_sitemap(sitemap_page_url)
#recursive_page = web_loader.load_recursive(recursive_docs_url)
#async_docs = web_loader.load_async_urls(async_urls)

MissingSchema: Invalid URL '': No scheme supplied. Perhaps you meant https://?

In [None]:
single_page

[Document(metadata={'source': 'https://www.google.com', 'title': 'Google', 'language': 'en-IN', 'source_type': 'web_page', 'ingested_at': '2026-01-13 11:07:06.152257', 'content_length': 266}, page_content='GoogleSearch Images Maps Play YouTube News Gmail Drive More »Web History | Settings | Sign in\xa0Advanced searchGoogle offered in:  हिन्दी বাংলা తెలుగు मराठी தமிழ் ગુજરાતી ಕನ್ನಡ മലയാളം ਪੰਜਾਬੀ AdvertisingBusiness SolutionsAbout GoogleGoogle.co.in© 2026 - Privacy - Terms')]

In [6]:
p = pdf_loader("../data")
c = csv_loader("../data")

print(p[0])
print(c[0].metadata)

loaded documents
done
page_content='Government of India
2023-24' metadata={'producer': 'Adobe PDF Library 16.0.7', 'creator': 'Adobe InDesign 17.4 (Windows)', 'creationdate': '2024-07-20T20:21:03+05:30', 'source': '..\\data\\pdf\\ecsurvey.pdf', 'file_path': '..\\data\\pdf\\ecsurvey.pdf', 'total_pages': 524, 'format': 'PDF 1.6', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2024-07-22T11:48:47+05:30', 'trapped': '', 'modDate': "D:20240722114847+05'30'", 'creationDate': "D:20240720202103+05'30'", 'page': 0, 'creationdata': '2026-01-20 10:52:49.232913', 'source_file': 'ecsurvey.pdf', 'file_type': 'pdf'}
{'source': '..\\data\\csv\\cereal.csv', 'row': 0, 'source_file': 'cereal.csv', 'creationdata': '2026-01-20 10:52:49.256256', 'format': 'csv'}


In [7]:
# semantic chunking

from langchain_text_splitters import RecursiveCharacterTextSplitter

In [8]:
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,
    chunk_overlap=100,
    length_function = len,
    is_separator_regex = False)

chunked_document = text_splitter.split_documents(p)

In [9]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid


In [10]:
from typing import List


class EmbeddingManager:

    def __init__(self,model_name="all-MiniLM-L6-v2"):
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
            try:
                self.model=SentenceTransformer(self.model_name)
                print("Loading model",{self.model_name})
                print(self.model.get_sentence_embedding_dimension())
            except Exception as e:
                raise ValueError("error in load_model")
    
    def generate_embedding(self,texts:List[str]) -> np.ndarray:
            if not self.model:
                raise ValueError("model not initiated")
            
            embeddings = self.model.encode(texts,show_progress_bar=True)
            return embeddings
        

embedding_manager = EmbeddingManager()

Loading model {'all-MiniLM-L6-v2'}
384


In [11]:
import os
from typing import Any


class VectorStore:

    def __init__(self,collection_name:str="pdf_documents",persist_dir:str = "../data/vector_store"):
        self.collection_name = collection_name
        self.persis_dir = persist_dir
        self.client = None
        self.collection = None
        self._initialize_store()
    
    def _initialize_store(self):
        try:
            os.makedirs(self.persis_dir,exist_ok = True)
            self.client = chromadb.PersistentClient(path=self.persis_dir)

            self.collection=self.client.get_or_create_collection(
                name = self.collection_name,
                metadata={"description":"PDF embeddings for RAG"})
            print("Vector store has been Initialized for Collection:",{self.collection_name})

        except Exception as e:
            raise ValueError("error in Vector Store initializing",e)
        
    def add_documents(self,documents:List[Any],embeddings:np.ndarray):

        if(len(documents)!=len(embeddings)):
            raise ValueError("Error")
        
        ids = []
        metadatas=[]
        document_text = []
        embedding_list = []

        for i,(doc,embed) in enumerate(zip(documents,embeddings)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)

            metadatas.append(metadata)
            document_text.append(doc.page_content)
            
            embedding_list.append(embed.tolist())

        if not documents or not embedding_list:
            raise ValueError("Documents or embeddings are empty")

        try:
            self.collection.add(
                ids = ids,
                metadatas=metadatas,
                documents=document_text,
                embeddings=embedding_list

            )
        except Exception as e:
            raise ValueError(e)
    
vector_store = VectorStore()
vector_store



Vector store has been Initialized for Collection: {'pdf_documents'}


<__main__.VectorStore at 0x26d291aaba0>

In [12]:
chunk_text = [doc.page_content for doc in chunked_document]

embeddings = embedding_manager.generate_embedding(chunk_text)
print(embeddings)


Batches: 100%|██████████| 70/70 [01:29<00:00,  1.28s/it]

[[-0.09751279 -0.01514472 -0.02372612 ... -0.05802242  0.00421078
   0.02525925]
 [-0.05923247 -0.05849332 -0.07195604 ... -0.08649798 -0.08849315
   0.01236684]
 [ 0.02185366 -0.02050512 -0.07425625 ... -0.07585315  0.04138426
  -0.01694065]
 ...
 [ 0.02820305  0.08696805  0.01746436 ...  0.0338297  -0.04200668
   0.00789222]
 [-0.01964607  0.03022924  0.03099425 ... -0.0782057  -0.1084028
  -0.02260338]
 [-0.08511829  0.05696283 -0.08482104 ... -0.05781307  0.05345576
   0.00556867]]





In [13]:
vector_store.add_documents(chunked_document,embeddings)

In [14]:
print(len(embeddings))

2232


In [15]:
from sklearn.metrics.pairwise import cosine_similarity
from typing import Dict

In [16]:
from typing import Dict

class RAGRetriever:

    def __init__(self,vector_store:VectorStore,embedding_manager:EmbeddingManager):
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self,query:str,top_k:int = 5,score_threshold:float = 0.25)->List[Dict[str,Any]]:
        """
        Retrieve relevant documents for a query
        
        Args:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold
            
        Returns:
            List of dictionaries containing retrieved documents and metadata
        """

        query_embeddings = self.embedding_manager.generate_embedding([query])
        try:
            results = self.vector_store.collection.query(
                query_embeddings=query_embeddings,
                n_results=top_k)
            
            retrieved_docs = []
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]


                for i,(doc_id,distance,metadata,document) in enumerate(zip(ids,distances,metadatas,documents)):
                    similarity_score = 1-distance
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []
        


rag = RAGRetriever(vector_store,embedding_manager)




In [33]:
query = "What is the growth outlook for India in 2024?"
docs = rag.retrieve(query)

if not docs:
    print("The documents do not contain enough information to answer this question.")

Batches: 100%|██████████| 1/1 [00:00<00:00, 59.96it/s]

Retrieved 5 documents (after filtering)





In [34]:
docs

[{'id': 'doc_b2be1551_911',
  'content': "citizens while ensuring that the growth momentum continued to be sustained through a wide \nrange of structural reforms. India’s strength has always been its institutions, and, many a time, \nthe institutional strength has enabled the country to wade through multiple challenges. \n5.33.\t The structural reforms undertaken by the Government of India over the course of the \nlast decade have put the economy firmly on a growth path, thanks to which India is soon set \nto become the third largest economy in the world, following the US and China. In its April \n2024 World Economic Outlook, the IMF has raised India's growth forecast for 2024-25 to\xa06.8 \nper cent\xa0from 6.5 per cent on the back of strong domestic demand and a rising working-age",
  'metadata': {'trapped': '',
   'creator': 'Adobe InDesign 17.4 (Windows)',
   'doc_index': 911,
   'author': '',
   'creationdate': '2024-07-20T20:21:03+05:30',
   'source': '..\\data\\pdf\\ecsurvey.pdf

## RAG Pipeline Vector DB to LLM

In [77]:
import os
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from langchain_core.prompts import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate
)

from langchain.messages import HumanMessage,SystemMessage

load_dotenv()

GROQ_API_KEY = os.getenv("GROQ_API_KEY")

In [89]:
class LLM:
    def __init__(self,model_name:str="meta-llama/llama-4-maverick-17b-128e-instruct"):
        self.model_name = model_name
        self.api_keys = GROQ_API_KEY

        if not self.api_keys:
            raise ValueError("API Key Not Found")
        
        self.llm=ChatGroq(
            model = self.model_name,
            temperature=0.3,
            max_tokens=2048,
        )
        
        # self.prompt = ChatPromptTemplate.from_messages([
        #     ("system", "You are a helpful AI assistant. Answer ONLY using the provided context. If the context does not contain the answer, say so clearly."),
        #     ("human", "Context:\n{context}\n\nQuestion:\n{question}")
        # ])

        self.prompt = ChatPromptTemplate.from_messages([
   
             SystemMessage(
                    content="You are a helpful AI assistant. Answer ONLY using the provided context."
            ),
    

            HumanMessagePromptTemplate.from_template(
                    "Context:\n{context}\n\nQuestion:\n{question}"
             )
        ])

    def invoke(self,query:str)->str:
        try:
            response = self.llm.invoke(query)
            return str(response.content)
        
        except Exception as e:
            raise ValueError(e)
        
        
    def generate_response(self,query:str,context:str,max_length:int = 1000)->str:
        try:
            chain = self.prompt | self.llm
            
            response = chain.invoke({
                "question": query,
                "context": context
            })


            print("Generating LLM Response from query and context")
            return str(response.content)

        except Exception as e:
            return f"Error generating response: {e}"



In [90]:
try:
    groq_llm = LLM()
    print("Groq LLM initialized successfully!")
except ValueError as e:
    print(f"Warning: {e}")
    groq_llm = None

Groq LLM initialized successfully!


In [91]:
def rag_retrive(query,rag,llm,top_k=2):
    result = rag.retrieve(query,top_k)
    context = "\n\n".join([doc['content'] for doc in result]) if result else ""
    
    if not context:
        return "No Answer"
    
    response = llm.generate_response(
        query=query,
        context=context
    )

    return response


In [92]:
query = "What is the growth outlook for India in 2024?"
ans = rag_retrive(query,rag,llm=groq_llm)
print(ans)


Batches: 100%|██████████| 1/1 [00:00<00:00, 82.04it/s]


Retrieved 2 documents (after filtering)
Generating LLM Response from query and context
According to the text, the IMF has raised India's growth forecast for 2024-25 to 6.8 per cent from 6.5 per cent. This indicates a positive growth outlook for India in 2024, driven by strong domestic demand and a rising working-age population.


In [None]:
from typing import Dict, Any
import time

class AdvancedRAGPipeline:
    def __init__(self, rag, llm):
        self.retriever = rag
        self.llm = llm
        self.history = [] 

    def query(self, question: str, top_k: int = 5, min_score: float = 0.2, stream: bool = False, summarize: bool = False) -> Dict[str, Any]:
    
        results = self.retriever.retrieve(question, top_k=top_k, score_threshold=min_score)
        
        if not results:
            answer = "No relevant context found."
            sources = []
            context = ""
        else:
            context = "\n\n".join([doc['content'] for doc in results])
            sources = [{
                'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
                'page': doc['metadata'].get('page', 'unknown'),
                'score': doc['similarity_score'],
                'preview': doc['content'][:120] + '...'
            } for doc in results]
            # Streaming answer simulation
            prompt = f"""Use the following context to answer the question concisely.\nContext:\n{context}\n\nQuestion: {question}\n\nAnswer:"""
            if stream:
                print("Streaming answer:")
                for i in range(0, len(prompt), 80):
                    print(prompt[i:i+80], end='', flush=True)
                    time.sleep(0.05)
                print()
            response = self.llm.generate_response(context=context, query=question)
            answer = response

        # Add citations to answer
        citations = [f"[{i+1}] {src['source']} (page {src['page']})" for i, src in enumerate(sources)]
        answer_with_citations = answer + "\n\nCitations:\n" + "\n".join(citations) if citations else answer

        # Optionally summarize answer
        summary = None
        if summarize and answer:
            summary_prompt = f"Summarize the following answer in 2 sentences:\n{answer}"
            summary_resp = self.llm.invoke([summary_prompt])
            summary = summary_resp

        # Store query history
        self.history.append({
            'question': question,
            'answer': answer,
            'sources': sources,
            'summary': summary
        })

        return {
            'question': question,
            'answer': answer_with_citations,
            'sources': sources,
            'summary': summary,
            'history': self.history
        }


In [96]:
adv_RAG = AdvancedRAGPipeline(rag=rag,llm=groq_llm)
ans = adv_RAG.query(query)
print(ans)

Batches: 100%|██████████| 1/1 [00:00<00:00, 82.33it/s]


Retrieved 5 documents (after filtering)
Generating LLM Response from query and context
{'question': 'What is the growth outlook for India in 2024?', 'answer': "The IMF has raised India's growth forecast for 2024-25 to 6.8 per cent from 6.5 per cent on the back of strong domestic demand and a rising working-age population.\n\nCitations:\n[1] ecsurvey.pdf (page 222)\n[2] ecsurvey.pdf (page 222)\n[3] ecsurvey.pdf (page 222)\n[4] ecsurvey.pdf (page 201)\n[5] ecsurvey.pdf (page 201)", 'sources': [{'source': 'ecsurvey.pdf', 'page': 222, 'score': 0.4310893416404724, 'preview': 'citizens while ensuring that the growth momentum continued to be sustained through a wide \nrange of structural reforms. ...'}, {'source': 'ecsurvey.pdf', 'page': 222, 'score': 0.4310893416404724, 'preview': 'citizens while ensuring that the growth momentum continued to be sustained through a wide \nrange of structural reforms. ...'}, {'source': 'ecsurvey.pdf', 'page': 222, 'score': 0.4310893416404724, 'preview': 'citi