### RAG Pipeline - Data ingestion to vector db pipeline
#### 1. Data Ingestion
#### 2. Data Parsing - chunks
#### 3. Embeddings - text/vectors

In [22]:
import os
from langchain_community.document_loaders import PyMuPDFLoader, DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

In [23]:
def process_all_pdfs(pdf_directory):
    """
    process all the pdfs files in a directory
    """
    all_documents = []
    pdf_dir = Path(pdf_directory)

    # recursively find the files
    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print(f"found {len(pdf_files)} PDF files to process")

    for pdf_file in pdf_files:
        print(f"\nProcessing {pdf_file.name}")
        try:
            loader = PyMuPDFLoader(str(pdf_file))
            documents = loader.load()
            for doc in documents:
                print(f" - Loaded document with {len(doc.page_content)} characters")
                doc.metadata["source_file"] = pdf_file.name
                doc.metadata['file_type']='pdf'
            
            all_documents.extend(documents)
            print(f" pages loaded: {len(documents)}")

        except Exception as e:
            print(f"Error processing {pdf_file.name}: {e}")

    return all_documents

all_pdf_documents = process_all_pdfs("../data/pdf")

found 3 PDF files to process

Processing Assigntment 2.pdf
 - Loaded document with 139 characters
 - Loaded document with 1669 characters
 - Loaded document with 810 characters
 pages loaded: 3

Processing Assignment 1.pdf
 - Loaded document with 139 characters
 - Loaded document with 1928 characters
 - Loaded document with 1658 characters
 - Loaded document with 1712 characters
 - Loaded document with 678 characters
 pages loaded: 5

Processing assignment.pdf
 - Loaded document with 2093 characters
 pages loaded: 1


In [24]:
# text splitting into chunks
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    """
    Split documents into smaller chunks for better RAG performance
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n","\n"," ",""]
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"split {len(documents)} into {len(split_docs)} chunks")

    if split_docs:
        print("\nExample chunks")
        print(f"content: {split_docs[0].page_content[:200]}")
        print(f"metadata: {split_docs[0].metadata}")

    return split_docs

chunks = split_documents(all_pdf_documents)

split 9 into 16 chunks

Example chunks
content: Indian Institute of Technology Jodhpur
Fundamentals of Distributed Systems
Assignment – 2
Total Marks:
20
Submission Deadline:
27 July 2025
metadata: {'producer': 'pdfTeX-1.40.26', 'creator': 'LaTeX with hyperref', 'creationdate': '2025-07-22T09:44:08+00:00', 'source': '../data/pdf/Assigntment 2.pdf', 'file_path': '../data/pdf/Assigntment 2.pdf', 'total_pages': 3, 'format': 'PDF 1.5', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2025-07-22T09:44:08+00:00', 'trapped': '', 'modDate': 'D:20250722094408Z', 'creationDate': 'D:20250722094408Z', 'page': 0, 'source_file': 'Assigntment 2.pdf', 'file_type': 'pdf'}


### Embeddings and vectorStoreDB

In [25]:
import numpy as np
import chromadb
from sentence_transformers import SentenceTransformer
from chromadb.config import Settings
import uuid
from typing import List, Dict, Tuple, Any
from sklearn.metrics.pairwise import cosine_similarity

In [26]:
class EmbeddingManager:
    """
    Handles document embedding generation using SentenceTransformer
    """

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the EmbeddingManager with a specified model.

        :param model_name: Name of the SentenceTransformer model to use.
        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """
        Load the SentenceTransformer model.
        """
        try:
            print(f"Loading model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts.

        :param texts: List of strings to embed.
        :return: Numpy array of embeddings.
        """
        if not self.model:
            raise ValueError("Model is not loaded.")
        
        try:
            embeddings = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
            print(f"Generated embeddings for {len(texts)} texts with shape {embeddings.shape}")
            return embeddings
        except Exception as e:
            print(f"Error generating embeddings: {e}")
            raise

    def get_embedding_dimension(self) -> int:
        """
        Get the dimension of the embeddings produced by the model.

        :return: Embedding dimension as an integer.
        """
        if not self.model:
            raise ValueError("Model is not loaded.")
        
        return self.model.get_sentence_embedding_dimension()
    
#  Initilizing the embedding manager
embedding_manager = EmbeddingManager()
embedding_manager

Loading model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x169ff3230>

### VectorStore

In [27]:
class VectorStore:
    """
    Manages document embeddings in a chromadb vector store.
    """

    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        """
        Initialize the VectorStore with a specified collection name and persistence directory.

        :param collection_name: Name of the chromadb collection.
        :param persist_directory: Directory to persist the vector store data.
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """
        Initialize the chromadb client and collection.
        """
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            self.collection = self.client.get_or_create_collection(
                name = self.collection_name,
                metadata={
                    "description": "PDF documents embeddings for RAG",
                }
            )

            print(f"Vector store initialized with collection: {self.collection_name}")
            print(f"Existing number of documents in store: {self.collection.count()}")

        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the vector store.

        :param documents: List of document objects with metadata.
        :param embeddings: Numpy array of embeddings corresponding to the documents.
        """
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents and embeddings must match.")
        
        print(f"Adding {len(documents)} documents to the vector store.")

        # prepare data for metadata
        ids = []
        metadatas = []
        document_text = []
        embedding_list = []

        for i, (doc, embeddding) in enumerate(zip(documents, embeddings)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            metadata = dict(doc.metadata)  # ensure metadata is a dict
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            document_text.append(doc.page_content)
            embedding_list.append(embeddding.tolist())

        # add to collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embedding_list,
                metadatas=metadatas,
                documents=document_text
            )
            print(f"Successfully added {len(documents)} documents to the vector store.")
            print(f"Total documents in store now: {self.collection.count()}")
        
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

vectorstore = VectorStore()
vectorstore

Vector store initialized with collection: pdf_documents
Existing number of documents in store: 32


<__main__.VectorStore at 0x169ff30e0>

In [28]:
chunks

[Document(metadata={'producer': 'pdfTeX-1.40.26', 'creator': 'LaTeX with hyperref', 'creationdate': '2025-07-22T09:44:08+00:00', 'source': '../data/pdf/Assigntment 2.pdf', 'file_path': '../data/pdf/Assigntment 2.pdf', 'total_pages': 3, 'format': 'PDF 1.5', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2025-07-22T09:44:08+00:00', 'trapped': '', 'modDate': 'D:20250722094408Z', 'creationDate': 'D:20250722094408Z', 'page': 0, 'source_file': 'Assigntment 2.pdf', 'file_type': 'pdf'}, page_content='Indian Institute of Technology Jodhpur\nFundamentals of Distributed Systems\nAssignment – 2\nTotal Marks:\n20\nSubmission Deadline:\n27 July 2025'),
 Document(metadata={'producer': 'pdfTeX-1.40.26', 'creator': 'LaTeX with hyperref', 'creationdate': '2025-07-22T09:44:08+00:00', 'source': '../data/pdf/Assigntment 2.pdf', 'file_path': '../data/pdf/Assigntment 2.pdf', 'total_pages': 3, 'format': 'PDF 1.5', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '202

In [29]:
# convert the text to embeddings
texts = [doc.page_content for doc in chunks]
texts

['Indian Institute of Technology Jodhpur\nFundamentals of Distributed Systems\nAssignment – 2\nTotal Marks:\n20\nSubmission Deadline:\n27 July 2025',
 'Datasets\n• Cruise data: Cruise CSV file (click here to download)\n• Customer churn data: Customer Churn CSV file (click here to download)\n• E-commerce customer data: E-commerce Customer CSV file (click here to down-\nload)\nInstructions\n• Implement all MapReduce jobs using the mrjob library and Hadoop in Google Colab.\n• At the top of your notebook, install dependencies and setup hadoop.\n• Load each CSV directly from the URLs above using wget or curl command into the\nGoogle Colab.\n• For each question:\n1. Write mapper, reducer (and combiners or multi-step definitions) as mrjob classes.\n2. Include a brief docstring explaining your design in Google Colab using markdown\nfeature for each question and cell of Colab.\n3. Demonstrate correctness on a small inline example.\n• Name your notebook Assignment2-(Roll No of Yours)-(Name of yo

In [30]:
# generate the embeddings
embeddings = embedding_manager.generate_embeddings(texts)

# store in vector database
vectorstore.add_documents(chunks, embeddings)

Batches: 100%|██████████| 1/1 [00:00<00:00, 10.91it/s]

Generated embeddings for 16 texts with shape (16, 384)
Adding 16 documents to the vector store.
Successfully added 16 documents to the vector store.
Total documents in store now: 48





### Retriever Pipeline from vector store

In [31]:
class RagRetriever:
    """
    Retrieves relevant documents from the vector store based on query similarity.
    """
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the Retriever with a vector store and embedding manager.
        :param vector_store: Instance of VectorStore to retrieve documents from.
        :param embedding_manager: Instance of EmbeddingManager to manage embeddings.
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager
        
    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents based on the query.
        :param query: The input query string.
        :param top_k: Number of top relevant documents to retrieve.
        :param score_threshold: Minimum score threshold for filtering documents.
        :return: List of relevant documents with metadata.
        """
        print(f"Retrieving documents for query: {query}")
        print(f"top K: {top_k}, score threshold: {score_threshold}")

        # generate query embeddings
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]

        # search in vector store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k,
            )

            # process results
            retrieved_docs = []

            if results['documents'] and len(results['documents']) > 0:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc, meta, dist, doc_id) in enumerate(zip(documents, metadatas, distances, ids)):
                    similarity = 1 - dist  # convert distance to similarity
                    if similarity >= score_threshold:
                        retrieved_docs.append(
                            {
                                'id': doc_id,
                                'content': doc,
                                'metadata': meta,
                                'similarity_score': similarity,
                                'rank': i + 1
                            }
                        )
                print(f"Retrieved {len(retrieved_docs)} documents after applying score threshold.")
            else:
                print("No documents retrieved from the vector store.")

            return retrieved_docs
        
        except Exception as e:
            print(f"Error retrieving documents: {e}")
            raise

rag_retriever = RagRetriever(vectorstore, embedding_manager)
rag_retriever

<__main__.RagRetriever at 0x169ff2120>

In [32]:
rag_retriever.retrieve("What do I have to do in Vector Clocks and Causal Ordering assignment?")

Retrieving documents for query: What do I have to do in Vector Clocks and Causal Ordering assignment?
top K: 5, score threshold: 0.0


Batches: 100%|██████████| 1/1 [00:00<00:00, 83.35it/s]

Generated embeddings for 1 texts with shape (1, 384)
Retrieved 3 documents after applying score threshold.





[{'id': 'doc_4b5456c3_5',
  'content': '1. Vector Clocks and Causal Ordering\n[10 Marks]\nObjective\nTo move beyond simple event ordering by implementing Vector Clocks to capture the causal\nrelationships between events in a distributed system. You will apply this to build a causally\nconsistent, multi-node key-value store.\nProblem Description\nYou will build a distributed key-value store with three or more nodes. The key challenge is\nto ensure that writes to the store are causally ordered. If event B is causally dependent on\nevent A (e.g., a value is read and then updated), all nodes must process event A before they\nprocess event B. Simple Lamport clocks are insufficient for this, so you will use Vector Clocks.\nTechnology Constraints\n• Programming Language: The entire application logic for the nodes and client must\nbe written exclusively in Python.\n• Containerization: The system must be containerized and orchestrated solely using\nDocker and Docker Compose.\nTasks\nYour implem

### Integration Vector db context pipeline with LLM output

In [None]:
# simple RAG pipeline with groq LLM
from langchain_groq import ChatGroq
from dotenv import load_dotenv
load_dotenv()

# 1. Initialize the ChatGroq LLM with the API key from environment variables
chat_groq = ChatGroq(api_key=os.getenv("CHAT_GROQ_API_KEY"), model_name="llama-3.1-8b-instant" , temperature=0.1, max_tokens=4096)
# chat_genai = ChatGoogleGenerativeAI(api_key=os.getenv("CHAT_GOOGLE_API_KEY"), model="gemini-2.5-flash", temperature=0.1, max_tokens=1024)

# 2. Define a function to perform RAG with Groq LLM
def rag_simple(query:str, retriever: RagRetriever = rag_retriever, llm: ChatGroq = chat_groq, top_k:int=5) -> str:
    """
    Perform a simple RAG operation using the provided retriever and LLM.
    
    :param query: The input query string.
    :param retriever: Instance of RagRetriever to fetch relevant documents.
    :param llm: Instance of ChatGroq LLM to generate the answer.
    :param top_k: Number of top relevant documents to retrieve.
    :return: Generated answer from the LLM.
    """
    # Step 1: Retrieve relevant documents
    relevant_docs = retriever.retrieve(query, top_k=top_k)
    
    # Step 2: Prepare context for LLM
    context = "\n\n".join([doc['content'] for doc in relevant_docs]) if relevant_docs else ""
    if not context:
        return "No relevant documents found to answer the query."
    
    prompt = f"Using the following context answer the question precisely Context:\n\n{context}\n\nQuestion: {query}\nAnswer:"
    
    # Step 3: Generate answer using LLM
    response = llm.invoke([prompt.format(context=context, query=query)])
    
    return response.content

In [41]:
import groq
client = groq.Groq(api_key=os.getenv("CHAT_GROQ_API_KEY"))
models = client.models.list()
for m in models.data:
    print(m.id)

meta-llama/llama-guard-4-12b
openai/gpt-oss-120b
whisper-large-v3
openai/gpt-oss-20b
meta-llama/llama-4-maverick-17b-128e-instruct
qwen/qwen3-32b
moonshotai/kimi-k2-instruct-0905
llama-3.3-70b-versatile
llama-3.1-8b-instant
groq/compound
moonshotai/kimi-k2-instruct
meta-llama/llama-prompt-guard-2-22m
groq/compound-mini
whisper-large-v3-turbo
playai-tts-arabic
meta-llama/llama-prompt-guard-2-86m
allam-2-7b
openai/gpt-oss-safeguard-20b
playai-tts
meta-llama/llama-4-scout-17b-16e-instruct


In [56]:
answer = rag_simple("please provide the code for the assignment Dynamic Load Balancing for a Smart Grid?", rag_retriever, chat_groq, top_k=5)
print(answer)

Retrieving documents for query: please provide the code for the assignment Dynamic Load Balancing for a Smart Grid?
top K: 5, score threshold: 0.0


Batches: 100%|██████████| 1/1 [00:00<00:00, 29.01it/s]

Generated embeddings for 1 texts with shape (1, 384)
Retrieved 3 documents after applying score threshold.





I can provide a high-level design and some sample code for the Dynamic Load Balancing for a Smart Grid assignment. However, please note that this is a simplified example and may not cover all the requirements of the assignment.

**System Design:**

The system will consist of the following components:

1. **EV Charging Simulator:** This component will simulate the Electric Vehicle (EV) charging requests and send them to the Load Balancer.
2. **Load Balancer:** This component will receive the EV charging requests, determine the least loaded substation, and forward the request to that substation.
3. **Substation:** This component will receive the EV charging requests from the Load Balancer, process them, and update the load status.
4. **Monitoring Stack:** This component will collect and visualize key performance indicators (KPIs) such as load, request latency, and substation utilization.

**Sample Code:**

Here's a simplified example using Python and Flask for the Load Balancer and Subst