In [1]:
import requests  # Used for making HTTP requests, essential for interacting with APIs and web services.
import json      # Provides methods for serializing and deserializing JSON data, often used for API request/response handling.
import os        # Provides a way to interact with the operating system, useful for file path management, environment variables, etc.
import time      # Includes functions for time-related operations, such as delays, sleep, and measuring time intervals.

import pandas as pd  # Imports pandas for data manipulation and analysis, often used to read, process, and transform data in tabular formats.

# Import a specific class for loading PDF documents, allowing easy extraction of text from PDF files.
from langchain.document_loaders import UnstructuredPDFLoader  

# Imports a text splitter for breaking down text documents into smaller, manageable chunks for processing, 
# especially useful in natural language processing or document retrieval tasks.
from langchain_text_splitters import RecursiveCharacterTextSplitter  

import chromadb  # Imports ChromaDB, a database specifically designed for managing embeddings, useful for retrieval-based AI applications.

# Imports a specific error class from ChromaDB to handle exceptions when working with collections that may not exist.
from chromadb.errors import InvalidCollectionException

In [2]:
# Function to initialize a ChromaDB client with a persistent storage directory
def get_chroma_client():
    # Define the directory where ChromaDB data will be stored
    persist_dir = os.path.join(os.getcwd(), "chromadb_data")  
    os.makedirs(persist_dir, exist_ok=True)  # Create the directory if it does not exist
    # Initialize and return a persistent ChromaDB client, with data saved to persist_dir
    return chromadb.PersistentClient(path=persist_dir)  

# Helper function to split lists into smaller chunks
def chunk_list(lst, chunk_size):
    """
    Split a list into smaller chunks of specified size.

    Parameters:
    - lst (list): The list to be split.
    - chunk_size (int): The maximum size of each chunk.

    Returns:
    - list: A list of chunks, each a sublist of the original list.
    """
    return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

# EmbeddingAPI class to handle text embedding requests via an external API
class EmbeddingAPI:
    def __init__(self, url: str, model: str):
        """
        Initializes the EmbeddingAPI with the specified endpoint and model.

        Parameters:
        - url (str): The API endpoint URL for embedding requests.
        - model (str): The model identifier to specify which embedding model to use.
        """
        self.url = url  # URL of the embedding API
        self.model = model  # Embedding model name
        self.headers = {
            'Content-Type': 'application/json'  # Sets request headers for JSON payloads
        }
    
    def encode(self, input_text: str):
        """
        Sends a request to the embedding API to generate embeddings for the input text.

        Parameters:
        - input_text (str): The text to encode into embeddings.

        Returns:
        - tuple: A tuple containing the full result and the embeddings list.
        """
        # Construct the payload in JSON format
        payload = json.dumps({
            "model": self.model,
            "input": input_text
        })
        
        try:
            # Send a POST request to the API with headers and payload
            response = requests.post(self.url, headers=self.headers, data=payload)
            response.raise_for_status()  # Raises an error if the request was unsuccessful
            result = response.json()  # Parse JSON response
            return result, result["embeddings"]  # Return both the full response and embeddings
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")
            return None, []  # Return empty embeddings on error

# ChatAssistant class to handle chat response generation using an external API
class ChatAssistant:
    def __init__(self, url: str, model: str):
        """
        Initializes the ChatAssistant with the specified chat model API endpoint.

        Parameters:
        - url (str): The endpoint URL for the chat API.
        - model (str): Model identifier for generating chat responses.
        """
        self.url = url  # API endpoint for chat model requests
        self.model = model  # Chat model name
        self.headers = {"Content-Type": "application/json"}

    def get_chat_response(self, query: str, retrieved_docs: str) -> str:
        """
        Generates a response from the chat model based on a user query and retrieved document context.

        Parameters:
        - query (str): The user's question.
        - retrieved_docs (str): Context or content from documents to aid in answering the query.

        Returns:
        - str: The model's response text, or None if an error occurs.
        """
        # Define the system prompt, instructing the assistant to use only the provided context
        system_prompt = f"""You are an intelligent assistant that helps users understand documents. 
        Base your responses solely on the provided context. If the context doesn't contain enough 
        information to answer the question, say so clearly. Keep responses concise and relevant.
        Do not generate information outside of the provided context.

        Context: {retrieved_docs}
        """
        
        # Construct the payload for the API request with system and user messages
        payload = {
            "model": self.model,
            "option": {"temperature": 0.7},  # Set temperature for controlled response variability
            "messages": [
                {
                    "role": "system",
                    "content": system_prompt
                },
                {
                    "role": "user",
                    "content": f"Answer the following query using only the given context. Query: '{query}'."
                }
            ]
        }

        try:
            # Send request to the chat API and parse response
            response = requests.post(self.url, json=payload, headers=self.headers)
            response.raise_for_status()  # Raises an error for unsuccessful requests
            return response.json()["choices"][0]["message"]["content"]  # Extract and return response content
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")
            return None  # Return None if request fails

# RAG (Retrieval-Augmented Generation) class to manage document retrieval and response generation
class RAG:
    def __init__(self):
        """
        Initializes the RAG class, setting up clients for embedding and chat models
        and initializing a vector database collection.
        """
        self.vector_db_client = get_chroma_client()  # Initialize ChromaDB client for vector storage
        self.embedding_model = EmbeddingAPI(url="http://localhost:11434/api/embed", model="llama3.2:1b")  # Embedding model
        self.chat_assistant = ChatAssistant(url="http://localhost:11434/v1/chat/completions/", model="llama3.2:1b")  # Chat assistant
        self.init_collection()  # Initialize or retrieve a vector collection in ChromaDB

    def init_collection(self):
        """Initializes a collection in ChromaDB if it doesn't already exist."""
        try:
            self.collection = self.vector_db_client.get_collection(name="rag")  # Retrieve the collection
        except InvalidCollectionException:
            # Create collection if it does not exist, setting metadata for similarity search
            self.collection = self.vector_db_client.create_collection(
                name="rag",
                metadata={"hnsw:space": "cosine"}
            )

    def validate_xlsx(self, df):
        """
        Validates that the DataFrame has the required columns for processing.

        Parameters:
        - df (DataFrame): The DataFrame to validate.

        Returns:
        - bool: True if valid, False otherwise.
        """
        required_columns = {'question', 'answer'}
        return all(col in df.columns for col in required_columns)  # Check for necessary columns

    def load_and_process_xlsx(self, filepath):
        """Loads and processes an Excel file containing question-answer pairs for embedding."""
        try:
            df = pd.read_excel(filepath)  # Load Excel file
            
            if not self.validate_xlsx(df):  # Validate columns
                print("Excel file must contain 'question' and 'answer' columns")
                return None
            
            # Process each row into a dictionary containing QA pairs
            chunks = []
            for idx, row in df.iterrows():
                qa_text = f"Question: {row['question']}\nAnswer: {row['answer']}"
                chunks.append({
                    "source": "xlsx",
                    "page": idx,
                    "chunk_idx": 0,
                    "text": qa_text,
                    "question": row['question'],
                    "answer": row['answer']
                })
            
            return chunks  # Return processed chunks
        except Exception as e:
            print(f"Error processing Excel file: {str(e)}")
            return None

    def load_and_process_pdf(self, pdf_path):
        """Loads and processes text content from a PDF, returning document chunks for embedding."""
        try:
            loader = UnstructuredPDFLoader(file_path=pdf_path)  # Load PDF
            documents = loader.load()
            
            # Process each page into chunks
            chunks = []
            for i, doc in enumerate(documents):
                doc_chunks = self.chunk_text(doc.page_content)  # Split page content into chunks
                for chunk_idx, chunk in enumerate(doc_chunks):
                    chunks.append({
                        "source": doc.metadata.get('source', 'unknown'),
                        "page": i,
                        "chunk_idx": chunk_idx,
                        "text": chunk
                    })
            
            return chunks  # Return document chunks
        except Exception as e:
            print(f"Error processing PDF: {str(e)}")
            return None

    def chunk_text(self, content):
        """
        Splits a long text into smaller overlapping chunks using RecursiveCharacterTextSplitter.

        Parameters:
        - content (str): The text content to split.

        Returns:
        - list: A list of text chunks.
        """
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,  # Size of each chunk
            chunk_overlap=200,  # Overlap between chunks to preserve context
            length_function=len,
            is_separator_regex=False
        )
        return text_splitter.split_text(content)  # Split and return chunks

    def get_embeddings_in_chunks(self, texts, chunk_size=20):
        """
        Encodes a list of texts into embeddings in batches.

        Parameters:
        - texts (list): The list of texts to encode.
        - chunk_size (int): Batch size for embedding requests.

        Returns:
        - list: A list of embeddings.
        """
        try:
            embeds = []
            for chunk in chunk_list(texts, chunk_size):  # Process texts in smaller batches
                _, embeddings = self.embedding_model.encode(chunk)  # Get embeddings for the batch
                embeds.extend(embeddings)  # Add embeddings to the list
            return embeds
        except Exception as e:
            print(f"Error generating embeddings: {str(e)}")
            return None

    def add_to_chromadb(self, chunks):
        """
        Adds processed chunks to the ChromaDB collection with embeddings.

        Parameters:
        - chunks (list): A list of document chunks, each containing text and metadata.

        Returns:
        - bool: True if successful, False otherwise.
        """
        try:
            batch_size = 100  # Set the batch size for ChromaDB operations
            for i in range(0, len(chunks), batch_size):
                batch = chunks[i:i + batch_size]
                
                # Generate IDs and texts, and retrieve embeddings
                ids = [f"{chunk['page']}_{chunk['chunk_idx']}" for chunk in batch]
                texts = [chunk["text"] for chunk in batch]
                embeddings = self.get_embeddings_in_chunks(texts)
                
                if embeddings:
                    # Add documents, metadata, and embeddings to the collection
                    self.collection.add(
                        documents=texts,
                        metadatas=batch,
                        embeddings=embeddings,
                        ids=ids
                    )
            return True
        except Exception as e:
            print(f"Error adding to ChromaDB: {str(e)}")
            return False

    def search_document(self, query):
        """
        Searches for the most relevant documents based on the query embedding.

        Parameters:
        - query (str): The search query string.

        Returns:
        - list or None: A list of matching results, or None if an error occurs.
        """
        try:
            _, embedding = self.embedding_model.encode(query)  # Generate embedding for query
            results = self.collection.query(
                query_embeddings=embedding,
                n_results=5,  # Number of top results to return
                include=["documents", "metadatas", "distances"]
            )
            return results  # Return search results
        except Exception as e:
            print(f"Error searching ChromaDB: {str(e)}")
            return None

    def delete_collection(self):
        """Deletes the 'rag' collection from ChromaDB to clean up or reset."""
        self.vector_db_client.delete_collection("rag")  # Delete the collection if it exists


In [3]:
rag_agent = RAG()

In [4]:
pdf_path = "../docs/Transformers.pdf"
# Load and process the PDF
documents = rag_agent.load_and_process_pdf(pdf_path)

In [5]:
rag_agent.add_to_chromadb(documents)

Add of existing embedding ID: 0_0
Add of existing embedding ID: 0_1
Add of existing embedding ID: 0_2
Add of existing embedding ID: 0_3
Add of existing embedding ID: 0_4
Add of existing embedding ID: 0_5
Add of existing embedding ID: 0_6
Add of existing embedding ID: 0_7
Add of existing embedding ID: 0_8
Add of existing embedding ID: 0_9
Add of existing embedding ID: 0_10
Add of existing embedding ID: 0_11
Add of existing embedding ID: 0_12
Add of existing embedding ID: 0_13
Add of existing embedding ID: 0_14
Add of existing embedding ID: 0_15
Add of existing embedding ID: 0_16
Add of existing embedding ID: 0_17
Add of existing embedding ID: 0_18
Add of existing embedding ID: 0_19
Add of existing embedding ID: 0_20
Add of existing embedding ID: 0_21
Add of existing embedding ID: 0_22
Add of existing embedding ID: 0_23
Add of existing embedding ID: 0_24
Add of existing embedding ID: 0_25
Add of existing embedding ID: 0_26
Add of existing embedding ID: 0_27
Add of existing embedding ID: 

True

In [6]:
query = "whats is Transformers?"

retrieved_docs = rag_agent.search_document(query)

In [7]:
context = "\n\n".join(retrieved_docs["documents"][0])

In [8]:
rag_agent.chat_assistant.get_chat_response(query, retrieved_docs=context)

'According to the context, a "Transformers" is mentioned as the first transduction model that relies entirely on self-attention to compute representations of its input and output without using sequence-aligned RNNs or convolution.'

In [9]:
# !rm -rvf "./chromadb_data/"

# rag_agent.delete_collection()

In [10]:
documents = rag_agent.load_and_process_xlsx("../docs/FAQ_sample.xlsx")

In [11]:
documents

[{'source': 'xlsx',
  'page': 0,
  'chunk_idx': 0,
  'text': "Question: How can I regroup elements on a slide after ungrouping?\nAnswer: Currently, once elements are ungrouped, they cannot be regrouped. This feature is on our roadmap, but we don't have a specific timeline yet.",
  'question': 'How can I regroup elements on a slide after ungrouping?',
  'answer': "Currently, once elements are ungrouped, they cannot be regrouped. This feature is on our roadmap, but we don't have a specific timeline yet."},
 {'source': 'xlsx',
  'page': 1,
  'chunk_idx': 0,
  'text': 'Question: How can I add my company logo to all slides?\nAnswer: You can add your company logo to the footer via the footer settings in the right sidebar. Please note that not all slide layouts support footers due to aesthetic considerations.',
  'question': 'How can I add my company logo to all slides?',
  'answer': 'You can add your company logo to the footer via the footer settings in the right sidebar. Please note that no