RAG Pipelines - Data Ingestion to Vectore DB Pipeline

In [1]:
import os 
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path
from langchain_huggingface import HuggingFaceEndpoint

  from .autonotebook import tqdm as notebook_tqdm


Read all the pdfs inside the directory

In [2]:
def process_all_pdfs(pdf_directory):
    """Process all PDF files in a directory"""
    all_documents = []
    pdf_dir = Path(pdf_directory)
    
    # Find all PDF files recursively
    pdf_files = list(pdf_dir.glob("**/*.pdf"))
    
    print(f"Found {len(pdf_files)} PDF files to process")
    
    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()
            
            # Add source information to metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            
            all_documents.extend(documents)
            print(f"  ✓ Loaded {len(documents)} pages")
            
        except Exception as e:
            print(f"  ✗ Error: {e}")
    
    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

# Process all PDFs in the data directory
all_pdf_documents = process_all_pdfs(r"C:\Users\chand\Desktop\DATA SCIENTIST\RAG\data")

Found 1 PDF files to process

Processing: The_GALE_ENCYCLOPEDIA_of_MEDICINE_SECOND.pdf
  ✓ Loaded 759 pages

Total documents loaded: 759


In [3]:
all_pdf_documents[100]

Document(metadata={'producer': 'GPL Ghostscript 9.10', 'creator': '', 'creationdate': '2017-05-01T10:37:35-07:00', 'moddate': '2017-05-01T10:37:35-07:00', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'source': 'C:\\Users\\chand\\Desktop\\DATA SCIENTIST\\RAG\\data\\The_GALE_ENCYCLOPEDIA_of_MEDICINE_SECOND.pdf', 'total_pages': 759, 'page': 100, 'page_label': '101', 'source_file': 'The_GALE_ENCYCLOPEDIA_of_MEDICINE_SECOND.pdf', 'file_type': 'pdf'}, page_content='cancer, the risk factor begins to increase in the late teens.\nRates for carcinoma in situ peak between the ages of 20\nand 30. In the United States, the incidence of invasive\ncervical cancer increases rapidly with age for African\nAmerican women over the age of 25. The incidence rises\nmore slowly for Caucasian women. However, women\nover age 65 account for more than 25% of all cases of\ninvasive cervical cancer.\nThe incidence of cervical cancer is highest among\npoor women and among women in developing countries.\

Text splitting get into chunks

In [4]:
def split_documents(documents,chunk_size=1000,chunk_overlap=200):
    """Split documents into smaller chunks for better RAG performance"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")
        
    return split_docs

In [5]:
chunks=split_documents(all_pdf_documents)

Split 759 documents into 4110 chunks


In [6]:
type(chunks)

list

In [7]:
chunks[100]

Document(metadata={'producer': 'GPL Ghostscript 9.10', 'creator': '', 'creationdate': '2017-05-01T10:37:35-07:00', 'moddate': '2017-05-01T10:37:35-07:00', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'source': 'C:\\Users\\chand\\Desktop\\DATA SCIENTIST\\RAG\\data\\The_GALE_ENCYCLOPEDIA_of_MEDICINE_SECOND.pdf', 'total_pages': 759, 'page': 25, 'page_label': '26', 'source_file': 'The_GALE_ENCYCLOPEDIA_of_MEDICINE_SECOND.pdf', 'file_type': 'pdf'}, page_content='of endometrial, breast, and colon cancer.\nCertain drugs, which are currently being used for\ntreatment, could also be suitable for prevention. For\nexample, the drug tamoxifen (Nolvadex), which has been\nvery effective against breast cancer, is currently being\ntested by the National Cancer Institute for its ability to\nprevent cancer. Similarly, retinoids derived from vitamin\nA are being tested for their ability to slow the progression\nor prevent head and neck cancers. Certain studies have\nsuggested that cancer inc

Embedding and VectorStorDB

In [8]:
import os

os.environ["HF_HOME"] = r"C:\Users\chand\hf_cache"
os.environ["TRANSFORMERS_CACHE"] = r"C:\Users\chand\hf_cache"
os.environ["SENTENCE_TRANSFORMERS_HOME"] = r"C:\Users\chand\hf_cache"

In [9]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity 

In [10]:
class EmbeddingManager:
    """Handles document embedding generation using SentenceTransformer"""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager
        
        Args:
            model_name: HuggingFace model name for sentence embeddings
        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts
        
        Args:
            texts: List of text strings to embed
            
        Returns:
            numpy array of embeddings with shape (len(texts), embedding_dim)
        """
        if not self.model:
            raise ValueError("Model not loaded")
        
        print(f"Generating embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings


## initialize the embedding manager

embedding_manager=EmbeddingManager()
embedding_manager

Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x1cd8b7b4c10>

VectorStore

In [11]:
class VectorStore:
    """Manages document embeddings in a ChromaDB vector store"""
    
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        """
        Initialize the vector store
        
        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        try:
            # Create persistent ChromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            
            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the vector store
        
        Args:
            documents: List of LangChain documents
            embeddings: Corresponding embeddings for the documents
        """
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store...")
        
        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []
        
        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique ID
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)
            
            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)
            
            # Document content
            documents_text.append(doc.page_content)
            
            # Embedding
            embeddings_list.append(embedding.tolist())
        
        # Add to collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"Successfully added {len(documents)} documents to vector store")
            print(f"Total documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

vectorstore=VectorStore()
vectorstore

Vector store initialized. Collection: pdf_documents
Existing documents in collection: 20550


<__main__.VectorStore at 0x1cdb91f4250>

In [12]:
### Convert the text to embeddings
texts=[doc.page_content for doc in chunks]

## Generate the Embeddings

embeddings=embedding_manager.generate_embeddings(texts)

##store int the vector dtaabase
vectorstore.add_documents(chunks,embeddings)

Generating embeddings for 4110 texts...


Batches: 100%|██████████| 129/129 [02:52<00:00,  1.34s/it]


Generated embeddings with shape: (4110, 384)
Adding 4110 documents to vector store...
Successfully added 4110 documents to vector store
Total documents in collection: 24660


Retriever Pipeline From VectorStore

In [13]:
class RAGRetriever:
    """Handles query-based retrieval from the vector store"""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever
        
        Args:
            vector_store: Vector store containing document embeddings
            embedding_manager: Manager for generating query embeddings
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a query
        
        Args:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold
            
        Returns:
            List of dictionaries containing retrieved documents and metadata
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")
        
        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        # Search in vector store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            
            # Process results
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]
                
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever=RAGRetriever(vectorstore,embedding_manager)

In [14]:
rag_retriever

<__main__.RAGRetriever at 0x1cdb91f4b80>

In [23]:
rag_retriever.retrieve("What is rash")

Retrieving documents for query: 'What is rash'
Top K: 5, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00,  2.81it/s]


Generated embeddings with shape: (1, 384)
Retrieved 5 documents (after filtering)


[{'id': 'doc_cd767c2f_2433',
  'content': '10016. (800) 622-9010. <http://www.kidney.org>.\nUnited States Renal Data System (USRDS). The University of\nMichigan, 315 W. Huron, Suite 240, Ann Arbor, MI 48103.\n(734) 998-6611. <http://www.med.umich.edu/usrds>.\nPaula Anne Ford-Martin\nDiaper rash\nDefinition\nDermatitis of the buttocks, genitals, lower abdomen,\nor thigh folds of an infant or toddler is commonly referred\nto as diaper rash.\nDescription\nThe outside layer of skin normally forms a protec-\ntive barrier that prevents infection. One of the primary\ncauses of dermatitis in the diaper area is prolonged skin\ncontact with wetness. Under these circumstances, natural\noils are stripped away, the outer layer of skin is damaged,\nand there is increased susceptibility to infection by bacte-\nria or yeast.\nDiaper rash is a term that covers a broad variety of\nskin conditions that occur on the same area of the body.\nSome babies are more prone to diaper rash than others.\nCauses and


Integration Vectordb Context pipeline With LLM output

In [19]:
import os
from huggingface_hub import InferenceClient
from dotenv import load_dotenv

load_dotenv()
HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN")

# Initialize HF InferenceClient
MODEL_ID = "google/gemma-2-2b-it"
client = InferenceClient(model=MODEL_ID, token=HF_TOKEN)

# Simple RAG function using HF chat model
def rag_simple(query, retriever, client, top_k=3):
    # 1️⃣ Retrieve context
    results = retriever.retrieve(query, top_k=top_k)
    context = "\n\n".join([doc['content'] for doc in results]) if results else ""
    if not context:
        return "No relevant context found to answer the question."
    
    # 2️⃣ Prepare chat messages
    messages = [
        {"role": "system", "content": "You are an experienced professional Physician. Use the context to answer questions concisely asked by patients."},
        {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}\nAnswer:"}
    ]

    # 3️⃣ Generate response using HF chat model
    try:
        resp = client.chat_completion(
            messages=messages,
            max_tokens=400,        # adjust as needed
            temperature=0.1,
            top_p=0.9,
            stop=None
        )
        answer = resp.choices[0].message["content"]
    except Exception as e:
        answer = f"Error generating response: {e}"

    return answer


In [28]:
answer=rag_simple("what to do if i have a headache around my temples",rag_retriever,client)
print(answer)

Retrieving documents for query: 'what to do if i have a headache around my temples'
Top K: 3, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 50.00it/s]

Generated embeddings with shape: (1, 384)
Retrieved 3 documents (after filtering)





A headache around your temples can be caused by various factors.  It's important to see a doctor to determine the cause and get appropriate treatment. 

**In the meantime, you can try:**

* **Resting in a quiet, dark room.**
* **Applying a cold compress or ice pack to your forehead or temples.**
* **Staying hydrated by drinking plenty of water.** 

**If the headache is severe or doesn't improve with these measures, seek medical attention immediately.** 

