Data Ingestion Pipeline

In [2]:
import os
import glob
from langchain.document_loaders import PyMuPDFLoader,PyPDFLoader

In [4]:
### Process All PDF Document and convert into Langchain Documents

def process_all_pdf(path: str="../data/"):

    files = os.listdir(path)
    pdf_files = []
    documents = []

    for file in files:

        if file.endswith(".pdf"):
            pdf_files.append(file)

    print(f"No of files {len(pdf_files)}")

    for pdf_file in pdf_files:

        print(f"\nProcessing: {pdf_file}")
        loader = PyMuPDFLoader(path+pdf_file)
        document = loader.load()

        for doc in document:
            doc.metadata["FileName"] = pdf_file
            doc.metadata["FileType"] = "pdf"

        documents.extend(document)    

    return documents

In [None]:
documents = process_all_pdf("../data/")
documents

In [None]:
### Text spiltter and chunk all the documents
from langchain.text_splitter import RecursiveCharacterTextSplitter

def splitDocumentToChunks(documents,chunk_size, chunk_overlap):

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n","\n"," ",'']
    )

    split_docs = text_splitter.split_documents(documents)

    if split_docs:
        print(f"\nExample Chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}")
        print(f"Metadata: {split_docs[0].metadata}")
    
    return split_docs

chunks = splitDocumentToChunks(documents,1000,200)
chunks

Embeddings And VectorStoreDB

In [9]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [10]:
### Embedding Manager class defines methods to load the embedding model and then convert chunks into embeddings (which are vectors)

class EmbeddingManager:

    def __init__(self,model_name : str = "all-MiniLM-L6-v2"):

        self.model_name=model_name
        self.model=None
        self._load_model()

    
    def _load_model(self):
       self.model=SentenceTransformer(self.model_name)

    """
    chunks: list of strings (each chunk's text)
    returns: list of embedding vectors (lists of floats)
    """
    def convertChunksToVector(self,chunks: List[str], batch_size: int = 64, normalize: bool=True)->List[List[float]]:
        embeddings =  self.model.encode(chunks, 
                                 batch_size=batch_size,
                                 convert_to_numpy=True,
                                 normalize_embeddings=normalize,
                                 show_progress_bar=True)

        return embeddings.tolist()


#embedding.convertChunksToVector(chunks)

In [11]:
### ChromaDB manager to upsert embeddings and text,metadatas,ids in the chroma DB

class ChromaVectoreStore:

    def __init__(
            self,
            persist_dir: str="../data",
            collection: str = "rag_corpus",
            space: str="cosine"):
        
        self.client = chromadb.Client(Settings(persist_directory=persist_dir))

        try:
            self.col = self.client.get_collection(collection)
        except:
            self.col=self.client.create_collection(collection,
                                                   metadata={"hnsw:space":space})    

    def insert(self, ids: List[str], documents: List[str], embeddings: List[List[float]], metadatas):

        self.col.add(ids=ids,
                        documents=documents,
                        embeddings=embeddings,
                        metadatas=metadatas)



    def count(self):
        return self.col.count()

    def query(self, query_text: str, k: int=5):
        return self.col.query(query_texts=[query_text], n_results=5)    

In [14]:
### convert the chunks page_content from dict into string and upsert/insert in chromaDB

docs = [chunk.page_content for chunk in chunks]
ids = [str(uuid.uuid4()) for _ in range(len(chunks))]
metadetas = [chunk.metadata for chunk in chunks]

embedding_manager = EmbeddingManager()
vectors = embedding_manager.convertChunksToVector(docs)

store = ChromaVectoreStore(persist_dir="../chroma_store",collection="rag_corpus")

store.insert(ids,docs,vectors,metadetas)
store.count()

Batches: 100%|██████████| 9/9 [00:02<00:00,  3.85it/s]


573

In [15]:
### Retrieval based on Query from vector DB, convert the query to embedding and do search in DB and retrieve

class RAGRetriever:

    def __init__(self, vector_store: ChromaVectoreStore, embedding_manager: EmbeddingManager):

        self.vectoreStore = vector_store
        self.embeddingManager = embedding_manager

    
    def retrieve(self, query: str, top_k: int=5, score_threshold: float=0.0) -> List[Dict[str,Any]]:

        #Generate Query embedding
        queryEmbedding = self.embeddingManager.convertChunksToVector([query])[0]

        results = self.vectoreStore.col.query(query_embeddings=queryEmbedding,n_results=top_k)

        #Process results
        retrieved_docs = []

        if results['documents'] and results['distances'][0]:
            documents = results['documents'][0]
            metadetas = results['metadatas'][0]
            distances = results['distances'][0]
            ids = results['ids'][0]

            for i,(doc_id,document,metadata,distance) in enumerate(zip(ids,documents,metadetas,distances)):

                similarity_score = 1-distance 

                if similarity_score>score_threshold:
                    retrieved_docs.append({
                        'id': doc_id,
                        'content' : document,
                        'metadeta' : metadata,
                        'similarity_score' : similarity_score,
                        'distance' : distance,
                        'rank' : i+1
                    })

                print(f"Retreived {len(retrieved_docs)} documents (after filtering)")
        else:
            print("No documents found")        

        return retrieved_docs

In [None]:
rag_retriever = RAGRetriever(store,embedding_manager)
rag_retriever.retrieve(query="What is jdbc Template")

In [None]:
### Simplr RAG pipeline with OPENAI LLM

from langchain_openai import ChatOpenAI
import os

os.environ["OPENAI_API_KEY"] = ""

llm = ChatOpenAI(model_name="gpt-3.5-turbo")

def generate_answer(llm : ChatOpenAI, ragRetriever : RAGRetriever, query: str, top_k: int) -> str:

    #Retrieve relevant documents
    retrieved_docs = ragRetriever.retrieve(query,top_k=top_k)

    #Construct context from retrieved documents
    context = "\n\n".join([doc['content'] for doc in retrieved_docs])

    #Create prompt
    prompt = f"Use the following context to answer the question:\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"

    #Generate answer using LLM
    response = llm.invoke(prompt.format(context=context, query=query))

    return response.content

answer = generate_answer(llm,rag_retriever,"Explain JDBC Template in Java",top_k=3)
answer

Enhanced RAG Pipeline Features

In [None]:
def rag_advanced(query, rag_retriever, llm, top_k=5,min_score=0.2, return_context=False):
    
    # Retrieve relevant documents
    retrieved_docs = rag_retriever.retrieve(query, top_k=top_k, score_threshold=min_score)

    print(retrieved_docs)
    # Construct context from retrieved documents
    context = "\n\n".join([doc['content'] for doc in retrieved_docs])

    sources = [{
        'sources': doc['metadeta'].get('source_file',doc['metadeta'].get('source','unknown')),
        'page': doc['metadeta'].get('page', 'unknown'),
        'score': doc['similarity_score'],
        'preview': doc['content'][:300] + '...'
    } for doc in retrieved_docs]

    confidence = max([doc['similarity_score'] for doc in retrieved_docs])
    
    # Create prompt
    prompt = f"Use the following context to answer the question:\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"


    # Generate answer using LLM
    response = llm.invoke(prompt.format(context=context, query=query))

    output = {
        'answer': response.content,
        'confidence': confidence,
        'sources': sources
    }

    if return_context:
        output['context'] = context
    return output
    
result = rag_advanced("What is JDBC Template in Java?", rag_retriever, llm, top_k=3, min_score=0.1, return_context=True)
print("Answer:", result['answer'])
print("Confidence:", result['confidence'])
print("Sources:", result['sources'])
print("Context:", result['context'][:300])