# Use Docling for Parsing

In [1]:
from dotenv import load_dotenv
load_dotenv()
import glob
import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain.embeddings.base import Embeddings
from langchain_community.document_loaders.csv_loader import CSVLoader    
from langchain_community.document_loaders import PyPDFLoader, UnstructuredExcelLoader
from langchain.prompts import ChatPromptTemplate
import shutil
from langchain_chroma import Chroma
import os
import google.generativeai as genai
import ollama
import re
import json
import pandas as pd
from typing import Dict, Any, Union, Optional
import psutil
import time


In [2]:

API_KEY = os.environ.get("GEMINI_API_KEY")  
genai.configure(api_key=API_KEY)


In [3]:

CONTEXT_PROMPT = """
You are an AI assistant which provides detailed and long answers.

Answer the question based only on the given context.

Question : {question}

Conext : {context}

"""

FORMATTING_PROMPT = '''
Task: Extract and structure information from a query into a JSON object.

Instructions:
1. Analyze the query and extract only the following fields if they appear:
   - "ID"
   - "name"
   - "salary"
   - "vacations_remaining"
   - "join_date"
2. For any field that is not mentioned in the query, do not include it in the output.
3. Do not include any additional fields or extra text regarding assumptions.

Example 1:
Input: Update the current salary of employee ID:1011 to 2300?
Output: {{"ID": 1011, "salary": 2300}}

Example 2:
Input: "Change the join date of Muhid Qaiser to 1999."
Output: {{"name": "Muhid Qaiser", "join_date": 1999}}

Example 3:
Input: "Can ID:1041 take a vacation tomorrow?"
Output: {{ "ID": 1011, "vacations_remaining": 1 }}

Example 4:
Input: "Can John Smith's take 4 vacations?"
Output: {{ "name": "John Smith", "vacations_remaining": 4 }}

'''

PARAPHRASE_PROMPT = '''
Task: Rewrite the provided text using formal language.

Instructions: 
1. Please rephrase the text below in a formal tone, as if an HR representative were communicating with an employee.
2. Keep it concise and straight to the point.
3. No need to include extra text for context or explanation or Subject as in emails.


Text: {text}

'''

CHROMA_PATH = "chroma"
DATA_PATH = "documents\\"

WRITE_KEYWORDS = {'update', 'change', 'modify', 'delete', 'insert', 'set', 'assign'}
READ_KEYWORDS  = {'get', 'show', 'find', 'retrieve', 'list', 'display', 'fetch'}


In [4]:


class GeminiEmbeddings(Embeddings):
    def __init__(self, model: str = "models/embedding-001"):
        """Initialize with Gemini embedding model.
        
        Args:
            model: The model name to use (default: "models/embedding-001")
        """
        self.model = model
    
    def embed_query(self, text: str) -> list[float]:
        """Generate an embedding for a single text."""
        try:
            embedding = genai.embed_content(
                model=self.model,
                content=text,
                task_type="retrieval_query"
            )
            
            # Extract embedding values
            values = embedding["embedding"]
            
            # Normalize the embedding vector using L2 normalization
            norm = np.linalg.norm(values)
            if norm == 0:
                return values
            normalized_embedding = [float(x) / norm for x in values]
            return normalized_embedding
        except Exception as e:
            print(f"Error generating embedding: {e}")
            # Return a zero vector of appropriate dimension if there's an error
            # Typical Gemini embedding dimensions are 768 or 1024
            return [0.0] * 768

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        """Generate embeddings for a list of texts."""
        return [self.embed_query(text) for text in texts]

class OllamaEmbeddings(Embeddings):
    def __init__(self, model: str = "mxbai-embed-large"):
        self.model = model
        self.client = ollama.Client()
    
    def embed_query(self, text: str) -> list[float]:
        response = self.client.embeddings(model=self.model, prompt=text)
        embedding = response["embedding"]
        # Normalize the embedding vector using L2 normalization
        norm = np.linalg.norm(embedding)
        if norm == 0:
            return embedding
        normalized_embedding = [x / norm for x in embedding]
        return normalized_embedding

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        return [self.embed_query(text) for text in texts]


def load_documents():
    documents = []
    # Load CSV files
    for csv_file in glob.glob(os.path.join(DATA_PATH, "*.csv")):
        loader = CSVLoader(file_path=csv_file)
        docs = loader.load_and_split()
        documents.extend(docs)
    # Load XLSX files
    for xlsx_file in glob.glob(os.path.join(DATA_PATH, "*.xlsx")):
        loader = UnstructuredExcelLoader(file_path=xlsx_file)
        docs = loader.load_and_split()
        documents.extend(docs)
    # Load PDF files
    for pdf_file in glob.glob(os.path.join(DATA_PATH, "*.pdf")):
        loader = PyPDFLoader(pdf_file)
        docs = loader.load_and_split()
        documents.extend(docs)
    # Load TXT files
    for txt_file in glob.glob(os.path.join(DATA_PATH, "*.txt")):
        with open(txt_file, 'r', encoding='utf-8') as file:
            content = file.read()
            document = Document(page_content=content, metadata={'source': txt_file})
            documents.append(document)

    return documents

def split_text(documents: list[Document]):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=50,
        length_function=len,
        add_start_index=True,
    )
    chunks = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(chunks)} chunks.")
    return chunks

def save_to_chroma(chunks: list[Document]):
    # Use GeminiEmbeddings instead of OllamaEmbeddings
    embeddings = GeminiEmbeddings()
    # embeddings = OllamaEmbeddings()
    db = Chroma(collection_name="foo", embedding_function=embeddings, persist_directory=CHROMA_PATH)
    
    # Add chunks in batches
    batch_size = 100  # Adjust this batch size as needed
    for i in range(0, len(chunks), batch_size):
        batch_chunks = chunks[i:i + batch_size]
        db.add_documents(batch_chunks)
    
    print(f"Saved {len(chunks)} chunks to {CHROMA_PATH}.")
    return db

def generate_data_store():
    documents = load_documents()
    chunks = split_text(documents)
    return save_to_chroma(chunks)


In [None]:

def force_delete_folder(folder_path, retries=3, delay=2):

    # terminate_chromadb_server()

    # Identify and terminate processes using the folder
    for proc in psutil.process_iter(['pid', 'name']):
        try:
            for item in proc.open_files():
                if item.path.startswith(folder_path):
                    proc.terminate()  # or proc.kill() for a more forceful termination
                    proc.wait()  # wait for the process to terminate
                    print(f"Terminated process {proc.pid} that was using the file {item.path}")
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    
    # Retry deletion with better error handling
    for attempt in range(retries):
        try:
            shutil.rmtree(folder_path)
            print(f"Deleted folder: {folder_path}")
            break
        except FileNotFoundError:
            print(f"Folder not found: {folder_path}")
            break
        except PermissionError:
            print(f"Permission denied: {folder_path}, retrying in {delay} seconds...")
            time.sleep(delay)  # wait before retrying
        except Exception as e:
            print(f"Error deleting folder: {str(e)}")
            break
    else:
        print(f"Failed to delete folder {folder_path} after {retries} attempts")

def rule_based_classify(query: str) -> str:
    query_lower = query.lower()

    if ('vacation' in query_lower or 'holiday' in query_lower or "sick-leave" in query_lower or "sick leave" in query_lower or "day off" in query_lower or "day-off" in query_lower) and ('can' in query_lower or 'want' in query_lower):
        return "write", 1

    found_write = {kw for kw in WRITE_KEYWORDS if kw in query_lower}
    found_read = {kw for kw in READ_KEYWORDS if kw in query_lower}
    
    if found_write and not found_read:
        return "write", 0
    if found_read and not found_write:
        return "read", 0
    return "ambiguous", 0

def ml_based_classify(query: str) -> str:
    prompt = '''
    Task: Identify if the query is a read or write request.

    Instructions: Answer only in "read" or "write" 
    Do not include anything else.

    query: {question}
    '''
    client = ollama.Client()
    model = 'mistral:latest'
    response = client.generate(model, prompt.format(question=query))
    return response['response'], 0

def classify_query(query: str) -> str:
    rule_result = rule_based_classify(query)
    if rule_result != "ambiguous":
        return rule_result
    return ml_based_classify(query)

def extract_json_from_text(text):
    """
    Extracts valid JSON from text by finding content within outermost curly braces.
    Returns the parsed JSON object or None if no valid JSON is found.
    """
    try:
        # Find content between outermost curly braces using regex
        # This handles nested braces and multiline JSON
        pattern = r'\{(?:[^{}]|(?:\{(?:[^{}]|(?:\{(?:[^{}]|(?:\{[^{}]*\}))*\}))*\}))*\}'
        matches = re.findall(pattern, text)
        
        if not matches:
            return None
        
        # Try to parse each match as JSON and return the first valid one
        for match in matches:
            try:
                return json.loads(match)
            except json.JSONDecodeError:
                continue
                
        return None
    except Exception as e:
        print(f"Error extracting JSON: {e}")
        return None

def update_csv_from_json(json_data: Union[Dict, str], csv_path: str, backup: bool = True) -> bool:
    """
    Update a CSV file based on the provided JSON data.
    
    Args:
        json_data: JSON data as dictionary or string, containing ID or name for record lookup
                  and additional fields to update
        csv_path: Path to the CSV file
        backup: Whether to create a backup of the original file before updating
        
    Returns:
        bool: True if update was successful, False otherwise
        
    Example JSON formats:
        {"ID": 1011, "salary": 2300}
        {"name": "John Smith", "vacations_remaining": 5}
    """
    try:
        # Parse JSON if it's a string
        if isinstance(json_data, str):
            json_data = json.loads(json_data)
        
        # Validate JSON data
        if not isinstance(json_data, dict):
            print("Error: JSON data must be a dictionary")
            return False
            
        # Check if we have either ID or name to locate the record
        has_id = "ID" in json_data
        has_name = "name" in json_data
        
        if not (has_id or has_name):
            print("Error: JSON must contain either 'ID' or 'name' to locate a record")
            return False
        
        # Create a backup if requested
        if backup and os.path.exists(csv_path):
            backup_path = f"{csv_path}.bak"
            pd.read_csv(csv_path).to_csv(backup_path, index=False)
            print(f"Backup created at {backup_path}")
        
        # Load CSV into DataFrame
        df = pd.read_csv(csv_path)
        
        # Locate the record
        record_found = False
        if has_id:
            id_value = json_data["ID"]
            mask = df["ID"] == id_value
            record_found = mask.any()
            if not record_found:
                print(f"No record found with ID: {id_value}")
                return False
        else:  # has_name
            name_value = json_data["name"]
            mask = df["name"] == name_value
            record_found = mask.any()
            if not record_found:
                print(f"No record found with name: {name_value}")
                return False
        
        # Update the record with each field in the JSON
        update_count = 0
        for key, value in json_data.items():
            # Skip the identifier fields
            if key in ["ID", "name"]:
                continue
                
            # Check if the column exists
            if key not in df.columns:
                print(f"Warning: Column '{key}' not found in CSV, skipping")
                continue
                
            # Update the value
            df.loc[mask, key] = value
            update_count += 1
        
        if update_count == 0:
            print("No fields to update")
            return False
            
        # Save the updated DataFrame back to CSV
        df.to_csv(csv_path, index=False)
        print(f"Successfully updated {update_count} fields for record")
        return True
        
    except Exception as e:
        print(f"Error updating CSV: {str(e)}")
        return False

def calculate_remaining_vacations(json_data: Union[Dict, str], csv_path: str) -> Optional[int]:
    """
    Calculate the remaining vacation days after subtracting the requested amount.
    
    Args:
        json_data: JSON data as dictionary or string, containing ID or name for record lookup
                  and vacations_remaining field representing the requested vacation days
        csv_path: Path to the CSV file
        
    Returns:
        int: The number of vacation days that would remain after the request, or None if error
        
    Example JSON formats:
        {"ID": 10, "vacations_remaining": 5}
        {"name": "James Smith", "vacations_remaining": 2}
    """
    try:
        # Parse JSON if it's a string
        if isinstance(json_data, str):
            json_data = json.loads(json_data)
        
        # Validate JSON data
        if not isinstance(json_data, dict):
            print("Error: JSON data must be a dictionary")
            return None
            
        # Check if we have vacations_remaining field
        if "vacations_remaining" not in json_data:
            print("Error: JSON must contain 'vacations_remaining' field")
            return None
            
        # Check if we have either ID or name to locate the record
        has_id = "ID" in json_data
        has_name = "name" in json_data
        
        if not (has_id or has_name):
            print("Error: JSON must contain either 'ID' or 'name' to locate a record")
            return None
        
        # Load CSV into DataFrame
        df = pd.read_csv(csv_path)
        
        # Locate the record
        if has_id:
            id_value = json_data["ID"]
            mask = df["ID"] == id_value
            if not mask.any():
                print(f"No record found with ID: {id_value}")
                return None
        else:  # has_name
            name_value = json_data["name"]
            mask = df["name"] == name_value
            if not mask.any():
                print(f"No record found with name: {name_value}")
                return None
        
        # Get the current vacations_remaining value from CSV
        current_vacations = df.loc[mask, "vacations_remaining"].values[0]
        
        # Get the requested vacations from JSON
        requested_vacations = json_data["vacations_remaining"]
        
        # Calculate the remaining vacations
        remaining_vacations = current_vacations - requested_vacations
        
        return remaining_vacations
        
    except Exception as e:
        print(f"Error calculating remaining vacations: {str(e)}")
        return None

def get_record_from_json(json_data: Union[Dict, str], csv_path: str) -> Optional[Dict]:
    """
    Retrieve a record from a CSV file based on ID or name in the provided JSON.
    
    Args:
        json_data: JSON data as dictionary or string containing ID or name for lookup
        csv_path: Path to the CSV file
        
    Returns:
        Dict: The complete record as a dictionary, or None if not found
        
    Example JSON formats:
        {"ID": 6}
        {"name": "James Smith"}
    """
    try:
        # Parse JSON if it's a string
        if isinstance(json_data, str):
            json_data = json.loads(json_data)
        
        # Validate JSON data
        if not isinstance(json_data, dict):
            print("Error: JSON data must be a dictionary")
            return None
            
        # Check if we have either ID or name to locate the record
        has_id = "ID" in json_data
        has_name = "name" in json_data
        
        if not (has_id or has_name):
            print("Error: JSON must contain either 'ID' or 'name' to locate a record")
            return None
        
        # Load CSV into DataFrame
        df = pd.read_csv(csv_path)
        
        # Locate the record
        if has_id:
            id_value = json_data["ID"]
            mask = df["ID"] == id_value
            if not mask.any():
                print(f"No record found with ID: {id_value}")
                return None
        else:  # has_name
            name_value = json_data["name"]
            mask = df["name"] == name_value
            if not mask.any():
                print(f"No record found with name: {name_value}")
                return None
        
        # Convert the record to a dictionary
        record = df.loc[mask].iloc[0].to_dict()
        
        # Ensure numeric fields are properly typed (not numpy types)
        for key, value in record.items():
            if hasattr(value, 'item'):  # Check if it's a numpy type
                record[key] = value.item()  # Convert numpy type to native Python type
        
        return record
        
    except Exception as e:
        print(f"Error retrieving record: {str(e)}")
        return None

def predict(model, query_text, db, csv_file, combined=False):
    results = db.similarity_search_with_relevance_scores(query_text, k=4)
    
    if len(results) == 0 or results[0][1] < 0.2:
        return "Unable to find matching results."

    query_context = model.generate_content([
                {"role": "user", "parts": [FORMATTING_PROMPT]},
                {"role": "user", "parts": [query_text]}
            ]).text
    extracted_json = extract_json_from_text(query_context)
    record = get_record_from_json(extracted_json, csv_file)

    if record is None:
        context_text = "\n---\n".join([doc.page_content for doc, _score in results])
    elif record is not None and combined:
        context_text = "\n---\n".join([doc.page_content for doc, _score in results])
        context_text += "\n---\n" + str(record)
    else:
        context_text = record
        
    prompt_template = ChatPromptTemplate.from_template(CONTEXT_PROMPT)
    prompt = prompt_template.format(context=context_text, question=query_text)
    
    answer = model.generate_content([
                    {"role": "user", "parts": [prompt]}
                ]).text

    sources = [doc.metadata['source'] for doc, _score in results]

    return answer, sources

def update_db(db, retries=1, delay=0):
    print(f"\nUpdating the Database....")
    db.delete_collection()
    force_delete_folder(folder_path=CHROMA_PATH, retries=retries, delay=delay)
    db = generate_data_store()
    print(f"\nDatabase Updated.")
    return db

def rag_chatbot(model, db, csv_file, formalize_flag=False):

    print("Chatbot: Hello! How can I help you today? (Type 'exit' to end the chat)")
    while True:
        query_text = input("You: ")
        if query_text.lower() == "exit":
            print("Chatbot: Goodbye!")
            break

        classification, vaca_flag = classify_query(query_text)

        if classification == "write":
            output = model.generate_content([
                {"role": "user", "parts": [FORMATTING_PROMPT]},
                {"role": "user", "parts": [query_text]}
            ]).text
            extracted_json = extract_json_from_text(output)

            if extracted_json != {}:
                if vaca_flag:
                    remaining = calculate_remaining_vacations(extracted_json, csv_file)
                    if remaining is not None:
                        print(f"Remaining vacations after request: {remaining}")
                        if remaining < 0:
                            response = f"Not enough vacation days available! \nNo vacation granted. You will have {remaining} days remaining."
                        else:
                            response = f"Yes, You can take the vacation. You will have {remaining} days remaining."

                else:
                    result = update_csv_from_json(extracted_json, csv_file, backup=False)

                    if result:
                        response = "Successfully updated the record."
                        sources = [csv_file]
                    else:
                        response = "Failed to update the record."
                        sources = []

                    db = update_db(db, retries=1, delay=0)

            else:
                response = "Unable to extract Information from text. Please try phrasing the text in a more clear and structured manner."
                sources = []

            if formalize_flag:
                response = model.generate_content([
                    {"role": "user", "parts": [PARAPHRASE_PROMPT.format(text=response)]},
                ]).text
            
        else:
            response, sources = predict(model, query_text, db, csv_file)

        print(f"Chatbot: {response}")
        print(f"Sources: {sources}")



In [None]:

# Initialize the generative model
model = genai.GenerativeModel('gemini-2.0-flash')

# Delete the chromadb folder if it exists
chromadb_path = 'chroma'
if os.path.exists(chromadb_path):
    shutil.rmtree(chromadb_path)

db = generate_data_store()
csv_file = "documents\\employees.csv"

Split 99 documents into 99 chunks.
Saved 99 chunks to chroma.


In [None]:

rag_chatbot(model, db, csv_file, formalize_flag=True)


Chatbot: Hello! How can I help you today? (Type 'exit' to end the chat)
Chatbot: Rebecca Schwartz is an employee with an annual salary of $124,076. As of the current context, she has 13 vacation days remaining. Her join date, indicating the start of her employment, is August 15, 2017.

Sources: ['documents\\employees.csv', 'documents\\employees.csv', 'documents\\employees.csv', 'documents\\employees.csv']
Chatbot: Based on the provided context, Rebecca Schwartz has 13 vacations remaining.

Sources: ['documents\\employees.csv', 'documents\\employees.csv', 'documents\\employees.csv', 'documents\\employees.csv']
Remaining vacations after request: 10

Updating the Database....
Permission denied: chroma, retrying in 0 seconds...
Failed to delete folder chroma after 1 attempts
Split 99 documents into 99 chunks.
Saved 99 chunks to chroma.

Database Updated.
Chatbot: Your vacation request has been approved. You will have a remaining balance of 10 vacation days.

Sources: ['documents\\employees