### RAG pipeline: Data ingestion to vector db pipeline

In [None]:
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
### Read all the pdf's inside the directory
def process_all_pdfs(pdf_directory):
    """Process all PDF files in a directory"""
    all_documents = []
    pdf_dir = Path(pdf_directory)
    
    # Find all PDF files recursively
    pdf_files = list(pdf_dir.glob("**/*.pdf"))
    
    print(f"Found {len(pdf_files)} PDF files to process")
    
    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()
            
            # Add source information to metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            
            all_documents.extend(documents)
            print(f"  ✓ Loaded {len(documents)} pages")
            
        except Exception as e:
            print(f"  ✗ Error: {e}")
    
    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

# Process all PDFs in the data directory
all_pdf_documents = process_all_pdfs("../data/pdf/test")

In [None]:
all_pdf_documents

In [None]:
def split_documents(documents,chunk_size=1000,chunk_overlap=200):
    """Split documents into smaller chunks for better RAG performance"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")
    
    # Show example of a chunk
    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")
    
    return split_docs

In [None]:
chunks=split_documents(all_pdf_documents)
chunks

In [None]:
from collections import Counter
for i, chunk in enumerate(chunks):
    counts = Counter(chunk.metadata.get("source_file", "unknown source") for chunk in chunks)
chunkCount = dict(counts)

for source, count in chunkCount.items():
    print(f"Source: {source}, Chunk Count: {count}")

### Embedding and vector store db

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity
import os

In [None]:
class EmbeddingManager:
  def __init__(self, model_name: str = "all-miniLM-L6-v2"):
    self.model_name = model_name
    self.model = None
    self._load_model()

  def _load_model(self):
    try:
      print(f"Loading embedding model: {self.model_name}")
      self.model = SentenceTransformer(self.model_name)
      embedding_dimension = self.get_embedding_dimension()
      print(f"model loaded successfully. embedding dimension: {embedding_dimension}")
    
    except Exception as e:
      print(f"Error loading model: {self.model_name}: {e}")
      raise

  def generate_embeddings(self, texts: List[str]) -> np.ndarray:
    if not self.model:
      raise ValueError("Model not loaded")

    print(f"Generating embedding for {len(texts)} texts")
    embeddings = self.model.encode(texts, show_progress_bar= True)
    print(f"Embedding completed shape {embeddings.shape}")
    return embeddings

  def get_embedding_dimension(self) -> int:
    if not self.model:
      raise ValueError("Model not loaded")
    
    return self.model.get_sentence_embedding_dimension()

    

embedding_manager = EmbeddingManager()
embedding_manager

In [None]:
class VectorStore:
  def __init__(self, collection_name: str = "pdf_document", persist_directory: str = "../data/vector_store"):
    self.collection_name = collection_name
    self.persist_directory = persist_directory
    self.client = None
    self.collection = None
    self._initialize_store()

  def _initialize_store(self):
    try:
      #Create persistant chromadb client
      os.makedirs(self.persist_directory, exist_ok = True)
      self.client = chromadb.PersistentClient(path=self.persist_directory)

      #Create or Get collection
      self.collection = self.client.get_or_create_collection(
        name = self.collection_name, 
        metadata={"description": "PDF document embedding for RAG"}
      )

      print(f"vector store initialized, collection name {self.collection_name}")
      print(f"number of collections {self.collection.count()}")

    except Exception as e:
      print(f"Error Initializing vector store {e}")
      raise

  def add_documents(self, documents: List[Any], embeddings: np.ndarray):
    """
    Add documents and their embeddings to the vector store
    
    Args:
        documents: List of LangChain documents
        embeddings: Corresponding embeddings for the documents
    """
    if len(documents) != len(embeddings):
        raise ValueError("Number of documents must match number of embeddings")
    
    print(f"Adding {len(documents)} documents to vector store...")
    
    # Prepare data for ChromaDB
    ids = []
    metadatas = []
    documents_text = []
    embeddings_list = []
        
    for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
      # Generate unique ID
      doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
      ids.append(doc_id)
      
      # Prepare metadata
      metadata = dict(doc.metadata)
      metadata['doc_index'] = i
      metadata['content_length'] = len(doc.page_content)
      metadatas.append(metadata)
      
      # Document content
      documents_text.append(doc.page_content)
      
      # Embedding
      embeddings_list.append(embedding.tolist())
    
    # Add to collection
    try:
      self.collection.add(
          ids=ids,
          embeddings=embeddings_list,
          metadatas=metadatas,
          documents=documents_text
      )
      print(f"Successfully added {len(documents)} documents to vector store")
      print(f"Total documents in collection: {self.collection.count()}")
        
    except Exception as e:
      print(f"Error adding documents to vector store: {e}")
      raise

vectorstore=VectorStore()
vectorstore


In [None]:
texts = [doc.page_content for doc in chunks]
embeddings = embedding_manager.generate_embeddings(texts)
vector_store = vectorstore.add_documents(chunks, embeddings)
vector_store

In [None]:
class RAGRetriever:
    """Handles query-based retrieval from the vector store"""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever
        
        Args:
            vector_store: Vector store containing document embeddings
            embedding_manager: Manager for generating query embeddings
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 10, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a query
        
        Args:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold
            
        Returns:
            List of dictionaries containing retrieved documents and metadata
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")
        
        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        # Search in vector store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            
            # Process results
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]
                
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever=RAGRetriever(vectorstore,embedding_manager)
rag_retriever


In [None]:
# 1. First, check if the sentence exists in your chunks
search_text = "who is the author of the paper A Review Paper about Deep Learning for Medical Image Analysis"
matching_chunks = [
    (i, chunk.page_content) 
    for i, chunk in enumerate(chunks) 
    if search_text.lower() in chunk.page_content.lower()
]

print(f"Found {len(matching_chunks)} chunks containing the text:")
for idx, content in matching_chunks:
    print(f"\nChunk {idx}:\n{content}...")

In [None]:
retrieved_docs = rag_retriever.retrieve("who is the author of the paper A Review Paper about Deep Learning for Medical Image Analysis", top_k=5, score_threshold=0.1)
print(f"Retrieved {retrieved_docs[0]} documents from RAG retriever:")
informations = []
print(f"length of retrieved docs {len(retrieved_docs)}")
for doc in retrieved_docs:
    print(f"metadata: {doc['metadata']}")
    content = doc['content']
    author = doc['metadata']['author'] if 'author' in doc['metadata'] else 'Unknown'
    page_number = doc['metadata']['page_number'] if 'page_number' in doc['metadata'] else 'Unknown'
    title = doc['metadata']['title'] if 'title' in doc['metadata'] else 'Unknown'
    informations.append(f"title: {title}, author: {author}, page number: {page_number}, content: {content}")

In [None]:
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate

class ollamaRAG:
    def __init__(self, model_name: str = "llama3.2:3b", temperature: float = 0.3):
        self.model_name = model_name
        self.temperature = temperature
        self.llm = ChatOllama(model=self.model_name, temperature=self.temperature)
        self.prompt_template = ChatPromptTemplate.from_messages([
            ("system", """You are a document assistant. You must STRICTLY follow these rules:
1. Answer ONLY using information from the provided context
2. Do NOT use your general knowledge or training data
3. If the answer is not in the context, respond EXACTLY with: "I don't know - this information is not in the provided documents."
4. Do NOT make assumptions or infer information not explicitly stated in the context
5. Quote relevant parts from the context when answering"""),
            ("user", """Context:
{context}

Question: {question}

Answer based ONLY on the context above:""")
        ])

    def ask(self, question: str, retrieved_chunks: List[str]) -> str:
        context = "\n\n---\n\n".join(retrieved_chunks)
        print(f"Constructed context for question:\n{context[:500]}...")  # Show a preview of the context

        res = self.prompt_template | self.llm
        response = res.invoke({
            "context": context,
            "question": question
        })

        return response.content




rag = ollamaRAG()
answer = rag.ask("who is the author of the paper A Review Paper about Deep Learning for Medical Image Analysis?", [doc for doc in informations])
print(answer)


In [None]:
print(answer)