##### RAG Pipeline (Data Ingestion -> Chunking -> Embedding -> Vector DB)

In [1]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

In [2]:
# read all the pdf files in the current directory -> adds the metadata of source file to each document -> returns a list of all documents
def process_all_pdfs(pdf_directory):

    all_documents = []
    pdf_dir = Path(pdf_directory) # create a Path object for the directory {has attributes - .name, .parent, .stem etc. and methods - .glob(), etc.}
    pdf_files = list(pdf_dir.glob("**/*.pdf")) # list of all pdf files in the directory and subdirectories
    print(f"Found {len(pdf_files)} PDF files in directory: {pdf_directory}")
    for pdf_file in pdf_files:
        try:
            loader = PyPDFLoader(str(pdf_file)) # convert Path object to string
            documents = loader.load()

            for doc in documents:
                # adding source metadata to each document - adding the key value pair to the metadata dictionary
                doc.metadata["source_files"] = pdf_file.name
                doc.metadata["file_type"] = 'pdf'
 
            all_documents.extend(documents)
            print(f"Processed file: {pdf_file.name}")
            print(f"Loaded {len(documents)} pages")

        except Exception as e:
            print(f"Error! : {e}")

    print(f"Total documents loaded: {len(all_documents)}")
    return all_documents

# process all pdfs in the data directory

all_pdf_documents = process_all_pdfs("../data")

Found 5 PDF files in directory: ../data
Processed file: 1028-ArticleText-7147-1-10-202401291.pdf
Loaded 11 pages
Processed file: agriengineering-07-00043-v2.pdf
Loaded 23 pages
Processed file: Anomaly_Detection_in_Smart_Agriculture_Systems_on_.pdf
Loaded 31 pages
Processed file: arora2021.pdf
Loaded 6 pages
Processed file: sensors-20-06430-v2.pdf
Loaded 30 pages
Total documents loaded: 101


In [3]:
all_pdf_documents

[Document(metadata={'producer': 'www.ilovepdf.com', 'creator': 'Microsoft® Word 2016', 'creationdate': '2024-01-10T07:07:09+00:00', 'author': 'USER', 'moddate': '2024-01-10T07:07:10+00:00', 'rgid': 'PB:377844972_AS:11431281221210787@1706725446319', 'source': '..\\data\\pdf_files\\1028-ArticleText-7147-1-10-202401291.pdf', 'total_pages': 11, 'page': 0, 'page_label': '1', 'source_files': '1028-ArticleText-7147-1-10-202401291.pdf', 'file_type': 'pdf'}, page_content='See discussions, stats, and author profiles for this publication at: https://www.researchgate.net/publication/377844972\nAnomaly Detection In IoT Sensor Data Using Machine Learning Techniques\nFor Predictive Maintenance In Smart Grids\nArticle\xa0\xa0in \xa0\xa0International Journal Of Science Technology & Management · January 2024\nDOI: 10.46729/ijstm.v5i1.1028\nCITATIONS\n24\nREADS\n1,497\n4 authors:\nEdwin Omol\nKCA University\n24 PUBLICATIONS\xa0\xa0\xa0297 CITATIONS\xa0\xa0\xa0\nSEE PROFILE\nLucy Mburu\nKCA University\n34

 #### Chunking

In [4]:
def chunk_documents(documents, chunk_size = 1000, chunk_overlap = 200):
    # initialize the text splitter - chunking for better RAG performance
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""]
    )
    split_docs = text_splitter.split_documents(documents) # call the split_documents method of the text splitter class
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")
   
    # example of a chunked document
    if split_docs:
        print("\nExample chunked document:")
        print(f"\nContent: {split_docs[0].page_content[:300]}...") # print first 300 characters
        print(f"\nMetadata: {split_docs[0].metadata}")

    return split_docs

In [5]:
chunks = chunk_documents(all_pdf_documents)
chunks

Split 101 documents into 515 chunks

Example chunked document:

Content: See discussions, stats, and author profiles for this publication at: https://www.researchgate.net/publication/377844972
Anomaly Detection In IoT Sensor Data Using Machine Learning Techniques
For Predictive Maintenance In Smart Grids
Article  in   International Journal Of Science Technology & Managem...

Metadata: {'producer': 'www.ilovepdf.com', 'creator': 'Microsoft® Word 2016', 'creationdate': '2024-01-10T07:07:09+00:00', 'author': 'USER', 'moddate': '2024-01-10T07:07:10+00:00', 'rgid': 'PB:377844972_AS:11431281221210787@1706725446319', 'source': '..\\data\\pdf_files\\1028-ArticleText-7147-1-10-202401291.pdf', 'total_pages': 11, 'page': 0, 'page_label': '1', 'source_files': '1028-ArticleText-7147-1-10-202401291.pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'www.ilovepdf.com', 'creator': 'Microsoft® Word 2016', 'creationdate': '2024-01-10T07:07:09+00:00', 'author': 'USER', 'moddate': '2024-01-10T07:07:10+00:00', 'rgid': 'PB:377844972_AS:11431281221210787@1706725446319', 'source': '..\\data\\pdf_files\\1028-ArticleText-7147-1-10-202401291.pdf', 'total_pages': 11, 'page': 0, 'page_label': '1', 'source_files': '1028-ArticleText-7147-1-10-202401291.pdf', 'file_type': 'pdf'}, page_content='See discussions, stats, and author profiles for this publication at: https://www.researchgate.net/publication/377844972\nAnomaly Detection In IoT Sensor Data Using Machine Learning Techniques\nFor Predictive Maintenance In Smart Grids\nArticle\xa0\xa0in \xa0\xa0International Journal Of Science Technology & Management · January 2024\nDOI: 10.46729/ijstm.v5i1.1028\nCITATIONS\n24\nREADS\n1,497\n4 authors:\nEdwin Omol\nKCA University\n24 PUBLICATIONS\xa0\xa0\xa0297 CITATIONS\xa0\xa0\xa0\nSEE PROFILE\nLucy Mburu\nKCA University\n34

#### Embedding

In [6]:
# creating class for the whole embedding process

from typing import List
import numpy as np
from sentence_transformers import SentenceTransformer

class EmbeddingManager:
    """ Manages embedding models and their configurations."""
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager
        Args:
            model_name (str): HuggingFace model name for embeddings
        """
        self.model_name = model_name
        self.model = None
        self._load_model() # private method to load the model - it loads the model when the object is created
    
    def _load_model(self):
        """Load the sentence transformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts
        Args:
            texts (List[str]): List of texts to embed
        Returns:
            np.ndarray: Array of embeddings
        """
        if not self.model:
            raise ValueError("Model not loaded.")
        
        print(f"Generating embeddings for {len(texts)} texts.")
        embeddings = self.model.encode(texts, show_progress_bar=False) # encoding using the sentence transformer model (encode is a method of SentenceTransformer class)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings
    
# initialize the embedding manager with default model

embedding_manager = EmbeddingManager()
embedding_manager # initialized embedding manager
        


Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x1ece8b4c590>

#### VectorStore DB

In [7]:
import os
import uuid
from typing import Any, List
import numpy as np
import chromadb

""" 
chromadb (library)
│
├── PersistentClient (class)
│       ↓
│   client = PersistentClient()
│
└── Collection (internal class)
        ↓
    collection = client.get_or_create_collection()
"""
# class for vector store management

class VectorStoreManager:
    """ Manages vector store operations such as adding and retrieving embeddings.
    """
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        """
        Initialize the vector store

        Args:
            collection_name (str): Name of the vector store collection
            persist_directory (str): Directory to persist the vector store
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.collection = None # placeholder for the vector store collection
        self.client = None # placeholder for the vector store client (e.g., Chroma client)
        self._initialize_store()

    # collection is basically a table in the vector store where we store the embeddings and documents
    def _initialize_store(self):
        """Initialize chromadb client and collection"""
        try:
            # create the persist directory if it doesn't exist
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            # get or create the collection
            self.collection = self.client.get_or_create_collection(name=self.collection_name, metadata={"description": "PDF Document Embeddings"}) # calling get_or_create_collection method of the persistent client class
            print(f"Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing deocuments in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    # adding documents and embeddings to the collection
    def add_documents(self, documents: List[Any], embeddings: np.ndarray): # embeddings is linked to previously created embeddings using the embedding manager function
        """
        Add documents and their embeddings to the vector store
        
        Args:
            documents (List[Any]): List of documents to add
            embeddings (np.ndarray): Corresponding embeddings for the documents
        """    
        if len(documents) != embeddings.shape[0]:
            raise ValueError("Number of documents and embeddings must match.")
        
        print(f"Adding {len(documents)} documents to the vector store.") # here documents is a list - which is basically the chunks that we created earlier
        
        # prepare data for chromadb
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # generate unique ID
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"  # it generates a unique id for each document using uuid module - 8 character hex string
            ids.append(doc_id)

            # prepare metadata
            metadata = dict(doc.metadata)  # convert metadata to dictionary
            metadata['doc_index'] = i # adding document index to metadata
            metadata['content_length'] = len(doc.page_content)  # adding content length to metadata
            metadatas.append(metadata)

            # document content
            documents_text.append(doc.page_content) # adding the actual text content of the document to the list

            # embeddings
            embeddings_list.append(embedding.tolist()) # convert numpy array to list and add to embeddings list

        # add to chromadb collection
        try:
            self.collection.add(   # calling add method of the Collection class
                ids=ids,
                metadatas=metadatas,
                documents=documents_text,
                embeddings=embeddings_list
            )
            print(f"Successfully added {len(documents)} documents to the vector store.")
            print(f"Total documents in collection after addition: {self.collection.count()}")

        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

vectorstore = VectorStoreManager()
vectorstore

Vector store initialized. Collection: pdf_documents
Existing deocuments in collection: 515


<__main__.VectorStoreManager at 0x1ece8a23290>

In [8]:
chunks

[Document(metadata={'producer': 'www.ilovepdf.com', 'creator': 'Microsoft® Word 2016', 'creationdate': '2024-01-10T07:07:09+00:00', 'author': 'USER', 'moddate': '2024-01-10T07:07:10+00:00', 'rgid': 'PB:377844972_AS:11431281221210787@1706725446319', 'source': '..\\data\\pdf_files\\1028-ArticleText-7147-1-10-202401291.pdf', 'total_pages': 11, 'page': 0, 'page_label': '1', 'source_files': '1028-ArticleText-7147-1-10-202401291.pdf', 'file_type': 'pdf'}, page_content='See discussions, stats, and author profiles for this publication at: https://www.researchgate.net/publication/377844972\nAnomaly Detection In IoT Sensor Data Using Machine Learning Techniques\nFor Predictive Maintenance In Smart Grids\nArticle\xa0\xa0in \xa0\xa0International Journal Of Science Technology & Management · January 2024\nDOI: 10.46729/ijstm.v5i1.1028\nCITATIONS\n24\nREADS\n1,497\n4 authors:\nEdwin Omol\nKCA University\n24 PUBLICATIONS\xa0\xa0\xa0297 CITATIONS\xa0\xa0\xa0\nSEE PROFILE\nLucy Mburu\nKCA University\n34

#### Convert the text chunks to embeddings

In [9]:
# list comprehension to extract texts from the chunked documents
texts = [doc.page_content for doc in chunks]

# pass the texts to the embedding manager to get embeddings
embeddings = embedding_manager.generate_embeddings(texts) # calling the generate_embeddings method of the EmbeddingManager class

# store the chunks and their embeddings in the vector store
vectorstore.add_documents(chunks, embeddings)

Generating embeddings for 515 texts.
Generated embeddings with shape: (515, 384)
Adding 515 documents to the vector store.
Successfully added 515 documents to the vector store.
Total documents in collection after addition: 1030


#### ------ End of Data Ingestion Pipeline ------

#### RAG retrieval pipeline (User Query -> Embedding -> Vectorstore)

In [10]:
from typing import List, Dict, Any

class Retriever:
    """ Handles retrieval of documents from the vector store based on user queries.
    """
    
    def __init__(self, vector_store: VectorStoreManager, embedding_manager: EmbeddingManager):  # to give retriever access to the vector store and embedding manager
        """
        Initializes the retriever with a vector store and embedding manager.
        Args:
            vector_store (VectorStoreManager): The vector store manager instance
            embedding_manager (EmbeddingManager): The embedding manager instance
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    # method to retrieve relevant documents based on user query
    def retrieve(self, query:str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents based on the user query.
        Args:
            query (str): The user query
            top_k (int): Number of top documents to retrieve
            score_threshold (float): Minimum score threshold for retrieved documents
        Returns:
            List[Dict[str, Any]]: List of retrieved documents with metadata
        """
        # generate embedding for the query
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]  # calling generate_embeddings method of the embedding manager
        
        # search the vector store
        try:
            results = self.vector_store.collection.query(  # calling query method of the Collection class
                query_embeddings=[query_embedding],
                n_results=top_k
            ) # results is a dictionary with keys - 'ids', 'distances', 'metadatas', 'documents' - which is packaged by query method of Collection class
            
            # process results
            retrieved_docs = []

            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0] # [0] is because the values are list of lists --> we need only the first list
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)): # iterate through the documents
                    # converting distance to similarity score
                    similarity_score = 1 - distance

                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            "id": doc_id,
                            "content": document,
                            "metadata": metadata,
                            "similarity_score": similarity_score,
                            "distance": distance,
                            "rank": i + 1
                        })

                print(f"Retrieved {len(retrieved_docs)} documents for the query: '{query}'")
            else:
                print(f"No documents retrieved for the query: '{query}'")
            return retrieved_docs

        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []
        
rag_retriever = Retriever(vectorstore, embedding_manager)
rag_retriever
        

<__main__.Retriever at 0x1eceb32a8a0>

In [11]:
rag_retriever.retrieve("What is the role of anomaly detection in agriculture?")

Generating embeddings for 1 texts.
Generated embeddings with shape: (1, 384)
Retrieved 5 documents for the query: 'What is the role of anomaly detection in agriculture?'


[{'id': 'doc_dd27ec2d_183',
  'content': 'approach, achieving an impressive anomaly detection accuracy of 99.7%. This research contrib- 18 \nutes significantly to the development of robust and efficient attack and anomaly detection tech- 19 \nniques for smart agriculture systems at the network edge, ultimately enhancing the reliability 20 \nand sustainability of agricultural practices.      21 \nKeywords: Anomaly detection , Smart agriculture , Network edge , Deep learning , Internet of 22 \nThings (IoT) , Zero-day attacks , Distributed Denial of Service (DDoS) , Sensor data analysis , 23 \nCNN-LSTM. 24 \nIntroduction 25 \nAgriculture is one of the most important elements of life, as it is one of the main 26 \nsources through which a person obtains the nutrients that he feeds on, and it also has an 27 \nimpact on the economy in some countries that seek to provide the necessary food and 28 \nachieve self-sufficiency in crops. The traditional farming system suffers from many factors. 29'

#### Integrating Context pipeline with LLM output

In [16]:
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()  # load environment variables from .env file

# initialize the Groq llm
groq_api_key = os.getenv("GROQ_API_KEY")

llm = ChatGroq(groq_api_key=groq_api_key, model="llama-3.1-8b-instant", temperature=0.1, max_tokens=1024)

# RAG function to retrieve the context and generate the response using the LLM
def rag_llm(query, retriever, llm, top_k=5):
    # retrieve relevant documents
    retrieved_docs = retriever.retrieve(query, top_k=top_k)
    # concatenate the content of the retrieved documents to form the context
    context = "\n\n".join([doc['content'] for doc in retrieved_docs]) if retrieved_docs else ""
    if not context:
        return "No relevant documents found to answer the query."
    
    # create the prompt for the LLM
    prompt = f""" Use the following context to answer the question:\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"""

    # generate the response using the LLM
    response = llm.invoke(prompt) # invoke is a method of the ChatGroq class
    return response.content # content is an attribute of the response object - it returns the generated text


In [17]:
answer = rag_llm("Explain the significance of anomaly detection in agriculture.", rag_retriever, llm)
print(answer)

Generating embeddings for 1 texts.
Generated embeddings with shape: (1, 384)
Retrieved 5 documents for the query: 'Explain the significance of anomaly detection in agriculture.'
Anomaly detection in agriculture is a crucial aspect of ensuring the reliability and sustainability of agricultural practices. It involves identifying unusual patterns or outliers in sensor data that may indicate potential issues or threats to crop health, such as crop damage, disease, or pests.

The significance of anomaly detection in agriculture can be understood in several ways:

1. **Early Detection of Crop Damage**: Anomaly detection can help identify crop damage or disease at an early stage, allowing farmers to take corrective action and minimize losses.
2. **Prevention of Zero-Day Attacks**: In the context of smart agriculture systems, anomaly detection can help prevent zero-day attacks, which are cyber threats that exploit previously unknown vulnerabilities.
3. **Distributed Denial of Service (DDoS) Pr