### Data Ingestion to Vector DB pipeline

In [1]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

In [4]:
### Read all the pdfs inside the directory

def process_all_pdfs(pdf_directory):
    all_documents = []
    pdf_dir = Path(pdf_directory)

    # Find all PDF files recursively
    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print(f"Found {len(pdf_files)} PDF files to process")

    for pdf_file in pdf_files:
        print("Processing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            # Add source information to metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'

            all_documents.extend(documents)
            print(f"✅ Loaded {len(documents)} pages")
        except Exception as e:
            print(f"Error {e}")

    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

# Process all PDFs in the data directory
all_pdf_documents = process_all_pdfs("../data")


Found 5 PDF files to process
Processing: {pdf_file.name}
✅ Loaded 1 pages
Processing: {pdf_file.name}
✅ Loaded 1 pages
Processing: {pdf_file.name}
✅ Loaded 2 pages
Processing: {pdf_file.name}
✅ Loaded 1 pages
Processing: {pdf_file.name}
✅ Loaded 1 pages

Total documents loaded: 6


In [5]:
all_pdf_documents

[Document(metadata={'producer': 'ReportLab PDF Library - www.reportlab.com', 'creator': '(unspecified)', 'creationdate': '2026-02-07T11:26:34+00:00', 'author': '(anonymous)', 'keywords': '', 'moddate': '2026-02-07T11:26:34+00:00', 'subject': '(unspecified)', 'title': '(anonymous)', 'trapped': '/False', 'source': '..\\data\\pdf\\data_engineering_for_ai_dummy.pdf', 'total_pages': 1, 'page': 0, 'page_label': '1', 'source_file': 'data_engineering_for_ai_dummy.pdf', 'file_type': 'pdf'}, page_content='Data Engineering for AI Systems\nData Pipelines\nAutomated flows that ingest, clean, and transform data.\nETL vs ELT\nETL transforms before loading; ELT transforms after loading.\nData Quality\nAccuracy, completeness, and consistency are critical for ML systems.\nStorage Systems\nData lakes, warehouses, and object storage support AI workloads.\nImportance for RAG\nReliable pipelines ensure high-quality documents for retrieval.'),
 Document(metadata={'producer': 'ReportLab PDF Library - www.repo

In [7]:
### Text splitting get into chunks

def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    """Split documents into smaller chunks for better RAG performs"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )

    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}")
        print(f"Metadata: {split_docs[0].metadata}")

    return split_docs


In [9]:
chunks = split_documents(all_pdf_documents)
chunks

Split 6 documents into 7 chunks

Example chunk:
Content: Data Engineering for AI Systems
Data Pipelines
Automated flows that ingest, clean, and transform data.
ETL vs ELT
ETL transforms before loading; ELT transforms after loading.
Data Quality
Accuracy, co
Metadata: {'producer': 'ReportLab PDF Library - www.reportlab.com', 'creator': '(unspecified)', 'creationdate': '2026-02-07T11:26:34+00:00', 'author': '(anonymous)', 'keywords': '', 'moddate': '2026-02-07T11:26:34+00:00', 'subject': '(unspecified)', 'title': '(anonymous)', 'trapped': '/False', 'source': '..\\data\\pdf\\data_engineering_for_ai_dummy.pdf', 'total_pages': 1, 'page': 0, 'page_label': '1', 'source_file': 'data_engineering_for_ai_dummy.pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'ReportLab PDF Library - www.reportlab.com', 'creator': '(unspecified)', 'creationdate': '2026-02-07T11:26:34+00:00', 'author': '(anonymous)', 'keywords': '', 'moddate': '2026-02-07T11:26:34+00:00', 'subject': '(unspecified)', 'title': '(anonymous)', 'trapped': '/False', 'source': '..\\data\\pdf\\data_engineering_for_ai_dummy.pdf', 'total_pages': 1, 'page': 0, 'page_label': '1', 'source_file': 'data_engineering_for_ai_dummy.pdf', 'file_type': 'pdf'}, page_content='Data Engineering for AI Systems\nData Pipelines\nAutomated flows that ingest, clean, and transform data.\nETL vs ELT\nETL transforms before loading; ELT transforms after loading.\nData Quality\nAccuracy, completeness, and consistency are critical for ML systems.\nStorage Systems\nData lakes, warehouses, and object storage support AI workloads.\nImportance for RAG\nReliable pipelines ensure high-quality documents for retrieval.'),
 Document(metadata={'producer': 'ReportLab PDF Library - www.repo

### Embedding and VectorStoreDB

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer # Embedding model
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity 

In [12]:
class EmbeddingManager:
    """Handles document embedding generation using SentenceTransformer"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model_name = model_name # initializing the model name
        self.model = None  
        self._load_model()  # loading the specialized model 

    def _load_model(self):
        """Load the sentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)  # loading the model
            print(f"Model loaded sucessfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")   # getting the embedding dimension
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_emeddings(self, texts: List[str]) -> np.ndarray:
        """
            Generate embeddings for a list of texts

            Args:
                texts: List of text strings to embed

            returns:
                numpy array of embeddings with shape (len(texts), embedding_dim)
        """ 
        if not self.model:
            raise ValueError("MOdel not loaded")
    
        print(f"Generating embedding for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings
    

## Initializing the embedding manager

embedding_manager = EmbeddingManager()
embedding_manager




Loading embedding model: all-MiniLM-L6-v2


Loading weights: 100%|██████████| 103/103 [00:00<00:00, 887.81it/s, Materializing param=pooler.dense.weight]                             
[1mBertModel LOAD REPORT[0m from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m


Model loaded sucessfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x1f9a6af5430>