### Objective:
- Using the RAG system from HW1, create a simple web app using Gradio and capture the system log for debugging and create a MVE!
- Log interaction into a db and csv file!

In [1]:
import os
import gradio as gr
import fitz ## PyMuPDF
import sqlite3
from datetime import datetime
import uuid
from tqdm.notebook import tqdm
import traceback
from dotenv import load_dotenv
from typing import List, Optional, Dict, Any, Union
import logging
from rich import print
import json
import re
from pathlib import Path

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
os.getcwd()

'/Users/vigneshkannan/Documents/Projects/AI-SDLC-April25-Projects/workshop1/notebooks'

In [3]:
load_dotenv(dotenv_path="../../project_secrets.env")
load_dotenv(dotenv_path="../../../ai_sdlc_secrets.env")

True

In [4]:
root_dir = Path(os.environ.get("ROOT_DIR")) ## type: ignore
sys_path = root_dir.parent.parent / "scripts"

In [5]:
RESTRICTIONS = """
Don't hallucinate
Don't provide information that is not present in the context. Apologize and request more information if the context is not helpful.
Cross-question the user to get more information if the context is not helpful.
"""

PROMPT_TEMPLATE = """
You are a helpful assistant that can answer questions based on the provided context within the restrictions permitted
Context:
{context}

Question:
{query}

Restrictions:
{restrictions}

Answer:
"""

MODEL_CONFIG = {
    "model_name": "gpt-4o-mini",
    "model_provider": "openai",
    "model_parameters": {
        "max_tokens": 1000,
        "top_p": 1,
        "temperature": 0.3,}
}



### RAG without logging - For reference:

In [6]:
## 1. Document and Nodes
class Document:
    def __init__(self, text: str, metadata: Optional[Dict[str, Any]] = None):
        self.text = text
        self.metadata = metadata or {}

class Node:
    def __init__(
            self, 
            text: str, 
            metadata: Optional[Dict[str, Any]] = None,
            node_id: Optional[str] = None):
        self.text = text
        self.metadata = metadata or {}
        self.node_id = node_id or f"node_{id(self)}" ## Simple Unique ID if not provided

    def __repr__(self):
        return f"Node(id={self.node_id}, text={self.text[:50]}, metadata={self.metadata})"
    

class Response:
    def __init__(self, response: str, prompt: str) -> None:
        self.response = response
        self.prompt = prompt

    def __str__(self) -> str:
        return self.response
    
## 3. For Indexing      
class SimpleNodeParser:
    def __init__(self, chunk_size: int = 4096, chunk_overlap: int= 200) -> None:
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def split_text(self, text: str) -> List[str]:
        """Split text into chunks with overalap"""
        if len(text) <= self.chunk_size:
            return [text]
        
        chunks = []
        for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
            chunk = text[i:i+self.chunk_size]
            if chunk:
                chunks.append(chunk)

        return chunks

    def get_nodes_from_documents(self, documents: List[Document]) -> List[Node]:
        """Convert documents to nodes by splitting text into chunks"""
        nodes = []
        for doc in documents:
            text_chunks = self.split_text(doc.text)
            for i, text_chunk in enumerate(text_chunks):
                ## Copy metadata and add chunk info:
                metadata = doc.metadata.copy()
                metadata.update({
                    "chunk_index": i,
                    "total_chunks": len(text_chunks),

                })
                nodes.append(Node(
                    text=text_chunk, metadata=metadata))
                
        return nodes
    
class SimpleDirectoryReader:
    """Read all text files from a directory and return a list of documents."""
    def __init__(self, directory_path: str) -> None:
        self.directory_path = directory_path

    def load_data(self) -> List['Document']:
        """ Load all text files from the directory. """
        documents = []
        for filename in os.listdir(self.directory_path):
            if filename.endswith('.txt'):
                with open(os.path.join(self.directory_path, filename), 'r', encoding='utf-8') as file:
                    text = file.read()
                documents.append(Document(text, metadata={"source": filename}))
        return documents

import openai
import torch
from torch.nn import functional as F

## Need to create a base embedding class that extensible for different embedding models: Cohere and HuggingFace.
class BaseEmbedding:
    def get_embedding(self, text: List[str]) -> List[float]:
        """Get embedding from a single text file using API."""
        raise NotImplementedError("Subclasses must implement this method.")

class OpenAIEmbedding(BaseEmbedding):
    def __init__(self, model_name: str = "text-embedding-ada-002") -> None:
        self.model_name = model_name

    def get_embedding(self, text: str ) -> List[float]:
        """Get embedding from a single text file using OpenAI API."""
        response = openai.embeddings.create(
            model=self.model_name,
            input=text,
            encoding_format="float",
        )
        return response.data[0].embedding
    
    def get_embeddings(self, texts:List[str]) -> List[List[float]]:
        """Get embeddings for a list of texts."""
        return [self.get_embedding([text]) for text in texts]
    
class LLMResponseSynthesizer:
    def __init__(self, restrictions: str = RESTRICTIONS, prompt_template: str = PROMPT_TEMPLATE, model_config: Dict[str, Any] = MODEL_CONFIG) -> None:
        self.client = openai.OpenAI() ## Need to update this to support different LLM providers.
        self.model_config = model_config

        self.restrictions = restrictions
        self.prompt_template = prompt_template

    def synthesize(self, query: str, nodes: List[Node]) -> Response:
        """Synthesize a response from the context and query using the initialized LLM.        
        """

        ## Build context from nodes:
        context = "\n\n".join([f"Document chunk: {node.text}" for node in nodes])

        ## Build the prompt with context and query:
        prompt = self.prompt_template.format(context=context, query=query, restrictions=self.restrictions)
        # print(prompt)

        ## Call OpenAI API:
        response = self.client.chat.completions.create(
            model=self.model_config['model_name'],
            max_tokens=self.model_config['model_parameters']['max_tokens'],
            top_p=self.model_config['model_parameters']['top_p'],
            temperature=self.model_config['model_parameters']['temperature'],
            messages=[{
                "role": "user",
                "content": prompt
            }],
        )

        return Response(response=response, prompt=prompt) # type: ignore


class SimpleVectorStore:
    """Simple Vector Store: Functionalities to add nodes, retrieve nodes based on similarity search (top_k using cosine similarity)"""
    def __init__(self) -> None:
        self.embeddings = []
        self.node_ids = []
        self.node_dict = {} ## Store actual node objects by ID

    def add_notes(self, nodes: List[Node], embeddings: List[List[float]]) -> None:
        for node, embedding in zip(nodes, embeddings):
            self.embeddings.append(embedding)
            self.node_ids.append(node.node_id)
            self.node_dict[node.node_id] = node

    def similarity_search(self, query_embedding: List[float], top_k: int = 2) -> List[Node]:
        """Find `top_k` most similar nodes to the query embedding using cosine similarity."""

        if not self.embeddings:
            logging.warning("No embeddings in the vector store.")
            return []
        
        ## Convert lists to tensor 
        query_tensor = torch.tensor(query_embedding, dtype=torch.float32)
        embeddings_tensor = torch.tensor(self.embeddings, dtype=torch.float32)

        ##Normalize the query embedding and c
        query_tensor = F.normalize(query_tensor, p=2, dim=0)
        embeddings_tensor = F.normalize(embeddings_tensor, p=2, dim=1)

        ## Compute cosine similarities:
        similarities = torch.matmul(query_tensor, embeddings_tensor.T)

        ##Get top_k indices:
        top_indices = torch.argsort(similarities, descending=True)[:top_k].tolist()

        ## Return the nodes corresponding to the top_k indices:
        return [self.node_dict[self.node_ids[idx]] for idx in top_indices]
    
## Query Engine:
class QueryEngine:
    def __init__(
            self, 
            vector_store: SimpleVectorStore, 
            response_synthesizer: LLMResponseSynthesizer, 
            similarity_topk: int = 2,
            ) -> None:
        self.vector_store = vector_store
        self.response_synthesizer = response_synthesizer
        self.embedding_service = OpenAIEmbedding()
        self.similarity_topk = similarity_topk

    def query(self, query: str) -> Response:
        """Execute the query and return the response."""

        ## Get query embedding:
        query_embedding = self.embedding_service.get_embedding(query) # type: ignore ## Single statement so we use `get_embedding`

        ## Retrieve the relevant nodes using similarity search:
        retrieved_nodes = self.vector_store.similarity_search(
            query_embedding=query_embedding,
            top_k=self.similarity_topk,
        )

        ## Generate response
        response = self.response_synthesizer.synthesize(query=query, nodes=retrieved_nodes)

        return response
    
## Vector Store Index:
class VectorStoreIndex:
    """Vector Store Index: Manage nodes, embeddings, and vector store."""
    def __init__(self, nodes: List[Node], vector_store: SimpleVectorStore, similarity_topk: int = 2) -> None:
        self.nodes = nodes
        self.vector_store = vector_store
        self.similarity_topk = similarity_topk

    @classmethod
    def from_documents(
        cls, 
        documents: List[Document],
        embedding_service: OpenAIEmbedding,
        node_parser=None) -> None:

        """ Create index from documents"""
        ## Initialize the embedding service:
        embedding_service = embedding_service or OpenAIEmbedding()
        node_parser = node_parser or SimpleNodeParser()

        ## Create nodes from documents:
        nodes = node_parser.get_nodes_from_documents(documents=documents)

        ## Get embeddings for all nodes:
        texts = [node.text for node in nodes]
        embeddings = embedding_service.get_embeddings(texts=texts)

        ## Create and populate vector store:
        vector_store = SimpleVectorStore()
        vector_store.add_notes(nodes=nodes, embeddings=embeddings)

        return cls(nodes=nodes, vector_store=vector_store)
    
    def as_query_engine(self, response_synthesizer: LLMResponseSynthesizer, similarity_topk: int = 2) -> QueryEngine:
        """Create a query engine from this index"""
        response_synthesizer = response_synthesizer or LLMResponseSynthesizer() ## Default to a simple response synthesizer
        return QueryEngine(
            vector_store=self.vector_store,
            response_synthesizer=response_synthesizer,
            similarity_topk=similarity_topk,
        )


### Embedding and Vector Store:


In [7]:
## Configure logging to print to console.
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

In [8]:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

In [9]:
root_dir = Path(os.environ.get("ROOT_DIR"))
log_dir = root_dir / "logs"
log_dir.mkdir(exist_ok=True)

### Modified RAG-System with logging capabilities

In [10]:
class RAGInteractionLogger:
    """Handles logging of RAG system interactions to both SQLite and CSV"""
    
    def __init__(self, db_path: str = "rag_interactions.db", csv_path: str = "rag_interactions.csv"):
        self.db_path = db_path
        self.csv_path = csv_path
        self._init_db()

    def _init_db(self) -> None:
        """Initialize SQLite database with required tables"""
        conn = sqlite3.connect(self.db_path)
        c = conn.cursor()
        
        # Create interactions table with enhanced fields
        c.execute('''
            CREATE TABLE IF NOT EXISTS interactions (
                id TEXT PRIMARY KEY,
                timestamp TEXT,
                query TEXT,
                response TEXT,
                source_document TEXT,
                system_prompt TEXT,
                model_name TEXT,
                model_type TEXT,
                model_parameters TEXT,
                retrieved_context TEXT,
                processing_time REAL,
                metadata TEXT
                
            )
        ''')
        logger.info(f"Created interactions table in {self.db_path}")
        
        # Create system_logs table
        c.execute('''
            CREATE TABLE IF NOT EXISTS system_logs (
                id TEXT PRIMARY KEY,
                timestamp TEXT,
                level TEXT,
                message TEXT,
                module TEXT,
                function TEXT,
                traceback TEXT,
                llm_config TEXT
            )
        ''')
        logger.info(f"Created system_logs table in {self.db_path}")
        
        conn.commit()
        conn.close()
        return None
    
    def log_system_event(
            self, 
            level: str,
            message: str,
            module: str,
            function: str,
            traceback: str,
            llm_config: Optional[Dict[str, Any]] = None,
            ) -> None:
        
        """Log system events to database with LLM configuration when needed"""
        log_id = str(uuid.uuid4())
        timestamp = datetime.now().isoformat()
        llm_config_str = json.dumps(llm_config or {})
        
        conn = sqlite3.connect(self.db_path)
        c = conn.cursor()
        c.execute('''
            INSERT INTO syste_logs
            (id, timestamp, level, message, module, function, lin_number, llm_config)
            VALUES (? ? ?, ?, ?, ?, ?, ?, ?)
        ''', (log_id, timestamp, level, message, module, function, traceback, llm_config_str))
        conn.commit()
        conn.close()
        return None
    
    def log_interaction(self, 
                       query: str, 
                       response: str, 
                       source_document: str,
                       system_prompt: str,
                       model_name: str,
                       model_type: str,
                       model_parameters: Dict[str, Any],
                       retrieved_context: List[str],
                       metadata: Optional[Dict[str, Any]] = None,
                       processing_time: float = 0.0) -> None:
        
        """Log an interaction to both database and CSV with enhanced LLM details"""
        interaction_id = str(uuid.uuid4())
        timestamp = datetime.now().isoformat()
        metadata_str = json.dumps(metadata or {})
        model_params_str = json.dumps(model_parameters)
        context_str = json.dumps(retrieved_context)
        
        # Log to SQLite
        conn = sqlite3.connect(self.db_path)
        c = conn.cursor()
        c.execute('''
            INSERT INTO interactions 
            (id, timestamp, query, response, source_document, 
             system_prompt, model_name, model_type, model_parameters,
             retrieved_context, processing_time, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (interaction_id, timestamp, query, response, source_document,
              system_prompt, model_name, model_type, model_params_str,
              context_str, processing_time, metadata_str))
        conn.commit()
        conn.close()


In [11]:
class RAGwithLogging:
    def __init__(self, data_dir: str, model_config: Dict[str, Any], similarity_topk: int = 2, restrictions: str = RESTRICTIONS, prompt_template: str = PROMPT_TEMPLATE):
        self.data_dir = data_dir
        self.similarity_topk = similarity_topk
        self.model_config = model_config
        self.restrictions = restrictions
        self.prompt_template = prompt_template
        self.logger = RAGInteractionLogger()
        self.documents = self._load_documents()
        self.vector_index = self.create_vector_index()
        self.query_engine = self._create_query_engine() ## Fundctions not meant to be called directly.

    def _load_documents(self) -> List[Document]:
        """
        Load documents from directory and log the process.
        """
        try:
            documents = []
            for filename in os.listdir(self.data_dir)[:3]:
                if filename.endswith('.txt'):
                    file_path = os.path.join(self.data_dir, filename)
                    with open(file_path, 'r', encoding='utf-8') as file:
                        text = file.read()
                    documents.append(Document(text, metadata={"source": filename}))
            return documents
        except Exception as e: 
            self.logger.log_system_event(
                level="ERROR",
                message=f"Error loading documents: {str(e)}",
                module=__name__,
                function="_load_documents",
                traceback=traceback.format_exc()
            )
            raise Exception(f"Error loading documents: {str(e)}")
        
    def create_vector_index(self) -> VectorStoreIndex:
        """Create vector index from documents and log the process"""
        try:
            node_parser = SimpleNodeParser()
            embedding_service = OpenAIEmbedding()
            nodes = node_parser.get_nodes_from_documents(self.documents)
            texts = [node.text for node in nodes]
            embeddings = embedding_service.get_embeddings(texts)
            
            vector_store = SimpleVectorStore()
            vector_store.add_notes(nodes=nodes, embeddings=embeddings)

            return VectorStoreIndex(nodes=nodes, vector_store=vector_store, similarity_topk=self.similarity_topk)
        except Exception as e:
            self.logger.log_system_event(
                level="ERROR",
                message=f"Error creating vector index: {str(e)}",
                module=__name__,
                function="_create_vector_index",
                traceback=traceback.format_exc()
            )   
            raise Exception(f"Error creating vector index: {str(e)}")
        
    def _create_query_engine(self) -> QueryEngine:
        """Create query engine from vector index and log the process"""
        try:
            response_synthesizer = LLMResponseSynthesizer(model_config=self.model_config, restrictions=self.restrictions, prompt_template=self.prompt_template)
            return self.vector_index.as_query_engine(response_synthesizer=response_synthesizer, similarity_topk=self.similarity_topk)
        except Exception as e:
            self.logger.log_system_event(
                level="ERROR",
                message=f"Error creating query engine: {str(e)}",
                module=__name__,
                function="_create_query_engine",
                traceback=traceback.format_exc()
            )
            raise Exception(f"Error creating query engine: {str(e)}")
        
    def query(self, query: str, model_config: Dict[str, Any]) -> Union[str, Response]:
        """Execute query with enhanced logging"""
        start_time = datetime.now()
        try:
            # Get the response and context
            response = self.query_engine.query(query)
            query_embedding = self.query_engine.embedding_service.get_embedding(query)
            retrieved_nodes = self.query_engine.vector_store.similarity_search(
                query_embedding=query_embedding,
                top_k=self.query_engine.similarity_topk
            )
            retrieved_context = [node.text for node in retrieved_nodes]
            processing_time = (datetime.now() - start_time).total_seconds()

            self.logger.log_interaction(
                query=query,
                response=str(response.response),
                source_document=self.documents[0].metadata.get("source", "unknown"),
                system_prompt=response.prompt,
                model_name=self.model_config['model_name'],
                model_type=self.model_config['model_provider'],
                model_parameters=self.model_config['model_parameters'],
                retrieved_context=retrieved_context,
                metadata={"processing_time": processing_time},
                processing_time=processing_time
            )
            return response.response

            

        except Exception as e:
            self.logger.log_system_event(
                level="ERROR",
                message=f"Error executing query: {str(e)}",
                module=__name__,
                function="_query",
                traceback=traceback.format_exc()
            )
            

In [12]:
rag_sys = RAGwithLogging(data_dir="../apps/data", model_config=MODEL_CONFIG, similarity_topk=3)

2025-04-30 17:16:41,406 - __main__ - INFO - Created interactions table in rag_interactions.db
2025-04-30 17:16:41,407 - __main__ - INFO - Created system_logs table in rag_interactions.db
2025-04-30 17:16:42,064 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-30 17:16:42,435 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-30 17:16:42,898 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-30 17:16:43,127 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-30 17:16:43,715 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-30 17:16:44,063 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-30 17:16:44,328 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-

In [13]:
response = rag_sys.query(query="What is the name of the first person in the document?", model_config=MODEL_CONFIG)

2025-04-30 17:16:47,613 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-30 17:16:48,273 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-04-30 17:16:48,430 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


ChatCompletion(id='chatcmpl-BSA1PpH8UMNZvWfqysr6Vaobm4e0l', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='The name of the first person in the document is Chris Griffin.', refusal=None, role='assistant', annotations=[], audio=None, function_call=None, tool_calls=None))], created=1746051407, model='gpt-4o-mini-2024-07-18', object='chat.completion', service_tier='default', system_fingerprint='fp_0392822090', usage=CompletionUsage(completion_tokens=14, prompt_tokens=2298, total_tokens=2312, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0)))

In [14]:
import pandas as pd
db_path = log_dir / "rag_interactions.db" 
conn = sqlite3.connect(db_path)
try:
    interactions_df = pd.read_sql_query("SELECT * FROM interactions", conn)
    display(interactions_df) # Use display() for better rendering in Jupyter
except pd.io.sql.DatabaseError as e:
    print(f"Could not query 'interactions' table: {e}")

In [None]:
log_dir = root_dir / "logs"
db_path = log_dir / "rag_interactions.db" 
conn = sqlite3.connect(db_path)
interactions_df = pd.read_sql_query("SELECT * FROM interactions", conn)

In [None]:
 interactions_df = pd.read_sql_query("SELECT * FROM interactions", conn)