Data Ingestion Pipeline

In [2]:
from langchain_core.documents import Document

doc = Document(
    page_content = "This is the main content of creating a RAG application",
    metadata = {
        "source": "file.txt",
        "pages": 1,
        "author": "Akshita Khandelwal"
    }
)
doc

Document(metadata={'source': 'file.txt', 'pages': 1, 'author': 'Akshita Khandelwal'}, page_content='This is the main content of creating a RAG application')

In [None]:
# File Loader

from langchain_community.document_loaders import TextLoader

loader = TextLoader("../data/text_files/python.txt", encoding='utf-8')
loader.load()

[Document(metadata={'source': '../data/text_files/python.txt'}, page_content="Python Programming\n\nPython is a computer programming language often used to build websites and software, automate tasks, and analyse data. \nPython is a general-purpose language, not specialised for any specific problems, and used to create various programmes.\nPython's syntax is a lot closer to English and so it is easier to read and write, making it the simplest type of code to learn how to write and develop with.")]

In [12]:
# Directory Loader
from langchain_community.document_loaders import DirectoryLoader

dir_loader = DirectoryLoader(
    "../data/text_files",
    glob = "**/*.txt",
    loader_cls= TextLoader,
    loader_kwargs= {'encoding': 'utf-8'},
    show_progress=False
)

dir_loader.load()

[Document(metadata={'source': '../data/text_files/python.txt'}, page_content="Python Programming Intro\n\nPython is a computer programming language often used to build websites and software, automate tasks, and analyse data. \nPython is a general-purpose language, not specialised for any specific problems, and used to create various programmes.\nPython's syntax is a lot closer to English and so it is easier to read and write, making it the simplest type of code to learn how to write and develop with."),
 Document(metadata={'source': '../data/text_files/ml.txt'}, page_content='Machine Learning Intro\n\nMachine learning (ML) is a subset of artificial intelligence (AI) and computer science that focuses on using data and algorithms to enable AI systems to learn, improve, and make predictions without being explicitly programmed. \nIt involves training models to identify patterns in data to perform tasks like classification, regression, or generation. \n\nKey Aspects of Machine Learning:\n\n

In [2]:
# Directory Loader for PDF
from langchain_community.document_loaders import DirectoryLoader, PyMuPDFLoader

dir_loader_pdf = DirectoryLoader(
    "../data/pdf_files",
    glob = "**/*.pdf",
    loader_cls= PyMuPDFLoader,
    show_progress=False
)

dir_loader_pdf.load()

[Document(metadata={'producer': 'Canva', 'creator': 'Canva', 'creationdate': '2025-10-30T10:53:00+00:00', 'source': '../data/pdf_files/Meena Jain PythonDjango  (4).pdf', 'file_path': '../data/pdf_files/Meena Jain PythonDjango  (4).pdf', 'total_pages': 2, 'format': 'PDF 1.4', 'title': 'Meena Jain Python/Django', 'author': 'Renu Fulmali', 'subject': '', 'keywords': 'DAGouASs830,BAFs0rJREMo,0', 'moddate': '2025-10-30T10:52:58+00:00', 'trapped': '', 'modDate': "D:20251030105258+00'00'", 'creationDate': "D:20251030105300+00'00'", 'page': 0}, page_content='SUMMARY\nI’m a skilled Python/Django developer with over 5+ years of professional experience in back-end web\ndevelopment. My core strength lies in building efficient, secure, and scalable web applications using Django\nand related technologies. I have hands-on experience with Flask, JavaScript, PostgreSQL, MySQL, SQLite,\nDocker, and Git, TDD and I follow Agile methodologies and best practices in API development using RESTful\nservices. I

In [16]:
# Directory Loader for Excel
from langchain_community.document_loaders import DirectoryLoader, CSVLoader

dir_loader = DirectoryLoader(
    "../data/google_sheets",
    glob = "**/*.csv",
    loader_cls= CSVLoader,
    show_progress=False
)

dir_loader.load()

[Document(metadata={'source': '../data/google_sheets/Untitled spreadsheet - Sheet4.csv', 'row': 0}, page_content='Q1. Get all books priced less than 500: Q2. Get all books priced less than or equal to 500\nBook.objects.filter(price__lt=500): Book.objects.filter(price__lte=500)'),
 Document(metadata={'source': '../data/google_sheets/Untitled spreadsheet - Sheet4.csv', 'row': 1}, page_content='Q1. Get all books priced less than 500: Q3. Get all books priced greater than 1000\nBook.objects.filter(price__lt=500): Book.objects.filter(price__gt=1000)'),
 Document(metadata={'source': '../data/google_sheets/Untitled spreadsheet - Sheet4.csv', 'row': 2}, page_content='Q1. Get all books priced less than 500: Q4. Get all books with rating greater than or equal to 4.5\nBook.objects.filter(price__lt=500): Book.objects.filter(rating__gte=4.5)'),
 Document(metadata={'source': '../data/google_sheets/Untitled spreadsheet - Sheet4.csv', 'row': 3}, page_content='Q1. Get all books priced less than 500: Q5

Embedding

In [10]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from typing import List, Tuple, Any, Dict
import uuid 
from sklearn.metrics.pairwise import cosine_similarity

In [11]:
class EmbeddingManager:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        try:
            self.model = SentenceTransformer(self.model_name)
            print("Model loaded succesfully")

        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")  

    
    def generate_embeddings(self, texts: List[str]):
        if not self.model:
            raise ValueError("Model not loaded")
        
        print("Generating embeddings")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print("Generated embeddings with shape:", embeddings.shape)
        
        return embeddings
    

embed = EmbeddingManager()
embed

Loading weights: 100%|██████████| 103/103 [00:00<00:00, 332.00it/s, Materializing param=pooler.dense.weight]                             
[1mBertModel LOAD REPORT[0m from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m


Model loaded succesfully


<__main__.EmbeddingManager at 0x7ec4b4c67fe0>

VectorDB

In [14]:
import os
import uuid

class VectorStore:
    def __init__(self, collection_name: str = 'pdf_documents', persist_directory: str = '../data/vector_store'):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self.initialize_store()

    def initialize_store(self):
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print("Vector Store initialized. Collection =", self.collection_name, self.collection.count())

        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")   


    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents do not match")
        
        ids = []
        metadatas = []
        document_text = []
        embeddings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            document_text.append(doc.page_content)

            embeddings_list.append(embedding.tolist())

            try:
                self.collection.add(
                    ids=ids,
                    embeddings=embeddings_list,
                    metadatas=metadatas,
                    documents=document_text
                )

                print(f"Succesfully added {len(documents)} documents to vector store")

            except Exception as e:
                print(f"Error loading model {self.model_name}: {e}")  
            

vector_store = VectorStore()
vector_store

Vector Store initialized. Collection = pdf_documents 0


<__main__.VectorStore at 0x7ec4b162ad20>

Chunking

In [15]:
from typing import List, Any
from langchain_text_splitters import RecursiveCharacterTextSplitter

def split_documents(documents: List[Any], chunk_size: int = 1000, chunk_overlap: int = 200):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )

    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    if split_docs:
        print(f"Content: {split_docs[0].page_content[:50]}...")

    return split_docs


all_pdfs = dir_loader_pdf.load()
chunks = split_documents(all_pdfs)


Split 4 documents into 13 chunks
Content: SUMMARY
I’m a skilled Python/Django developer with...


In [16]:
# Convert text to embeddings

text_var = [doc.page_content for doc in chunks]

embeddings = embed.generate_embeddings(text_var)
vector_store.add_documents(chunks, embeddings)


Generating embeddings


Batches: 100%|██████████| 1/1 [00:00<00:00,  1.94it/s]


Generated embeddings with shape: (13, 384)
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store
Succesfully added 13 documents to vector store


Data Retrieval Pipeline

In [43]:
class RAGRetreiver:
    def __init__(self, embedding_manager: EmbeddingManager, vector_store: VectorStore):
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int=3, score_threshold: float=0.0) -> List[Dict[str, Any]]:
        print("Retrieving documents for", query)

        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            # print(results['documents'][0])

            retrieved_docs = []
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    similarity_score = 1 - distance / 2
                    print(similarity_score)
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'distance': distance,
                            'similarity_score': similarity_score,
                            'rank': i+1
                        })

                print(f"Retrieved {len(retrieved_docs)} documents")
            else:
                print("No docs found")

            return retrieved_docs
        
        except Exception as e:
            print("Exception:", e)
    


rag_retriever = RAGRetreiver(embed, vector_store)
query = "What is summary?"
rag_retriever.retrieve(query)



Retrieving documents for What is summary?
Generating embeddings


Batches: 100%|██████████| 1/1 [00:00<00:00, 93.27it/s]

Generated embeddings with shape: (1, 384)
0.17406773567199707
0.1401815414428711
0.11842691898345947
Retrieved 3 documents





[{'id': 'doc_cfbac0a8_9',
  'content': 'Cloud-based analytics and Reporting system\nIt is a  centralized platform designed to ingest, process, store, and visualize data from multiple sources\n(e.g., transactional systems, APIs, 3rd party tools), hosted in a cloud environment AWS. Users interact with\ndashboards, reports, or embedded analytics to make data-driven decisions.\nSkills: Django, Jasper Tool, PostgreSQL, MySql, ELK, Docker, Celery\nRoles and Responsibilities: \nDeveloped a comprehensive Cloud-based Analytics and Reporting System that allows users to create\n        dynamic reports using JasperTool.\nThe entire system is containerized using Docker for seamless deployment and scalability. \nUsed Celery and Redis for asynchronous tasks Explore and integrate emerging technologies such as\nconsensus protocols to enhance project functionality. \nContribute to all phases of the software development lifecycle, including requirement analysis, design,\ncoding, testing, and deployment.'