In [7]:
from src.agenticRAG.main import agenticRAGResponse

data = agenticRAGResponse(query="What is the capital of France?")

print(data)

ValueError: Missing required setting: GROQ_API_KEY

In [22]:
import os
from typing import Dict, Any
from dotenv import load_dotenv
load_dotenv()

class Settings:
    """Configuration settings for the AgenticRAG system"""
    
    # API Keys
    GROQ_API_KEY: str = os.getenv("GROQ_API_KEY", "")
    GOOGLE_API_KEY: str = os.getenv("GOOGLE_API_KEY", "")
    GOOGLE_CSE_ID: str = os.getenv("GOOGLE_CSE_ID", "")
    
    # Model Configuration
    GROQ_MODEL: str = "llama3-8b-8192"
    GROQ_TEMPERATURE: float = 0.1

    OPENAI_MODEL: str = "gpt-4.1-nano-2025-04-14"
    OPENAI_TEMPERATURE: float = 0.3
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
    
    # Embedding Model
    EMBEDDING_MODEL: str = "sentence-transformers/all-MiniLM-L6-v2"
    OPENAI_EMBEDDING_MODEL = "text-embedding-3-large"
    # Vector Store
    VECTORSTORE_PATH: str = "data/vectorstore"
    
    # Search Configuration
    SEARCH_RESULTS_COUNT: int = 5

    SERPER_API_KEY: str = os.getenv("SERPER_API_KEY", "")
    TAVILY_API_KEY: str = os.getenv("TAVILY_API_KEY", "")
    
    # Query Enhancement
    MAX_QUERY_LENGTH: int = 200
    
    # Routing Configuration
    DEFAULT_ROUTE: str = "DIRECT"
    
    @classmethod
    def validate(cls) -> bool:
        """Validate required settings"""
        required_keys = ["GROQ_API_KEY"]
        for key in required_keys:
            if not getattr(cls, key):
                raise ValueError(f"Missing required setting: {key}")
        return True

settings = Settings()

In [4]:
import os
from typing import List, Union
from pathlib import Path

# LangChain imports
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
    PyPDFLoader,
    Docx2txtLoader,
    TextLoader,
    UnstructuredMarkdownLoader
)
from langchain.schema import Document

class DocumentChunker:
    """
    A class to read various document types and chunk them using LangChain
    """
    
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        """
        Initialize the DocumentChunker
        
        Args:
            chunk_size (int): Size of each chunk in characters
            chunk_overlap (int): Number of characters to overlap between chunks
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )
    
    def read_pdf(self, file_path: str) -> List[Document]:
        """Read PDF file and return documents"""
        try:
            loader = PyPDFLoader(file_path)
            documents = loader.load()
            return documents
        except Exception as e:
            print(f"Error reading PDF file {file_path}: {e}")
            return []
    
    def read_docx(self, file_path: str) -> List[Document]:
        """Read DOCX file and return documents"""
        try:
            loader = Docx2txtLoader(file_path)
            documents = loader.load()
            return documents
        except Exception as e:
            print(f"Error reading DOCX file {file_path}: {e}")
            return []
    
    def read_txt(self, file_path: str) -> List[Document]:
        """Read TXT file and return documents"""
        try:
            loader = TextLoader(file_path, encoding='utf-8')
            documents = loader.load()
            return documents
        except Exception as e:
            print(f"Error reading TXT file {file_path}: {e}")
            return []
    
    def read_md(self, file_path: str) -> List[Document]:
        """Read Markdown file and return documents"""
        try:
            loader = UnstructuredMarkdownLoader(file_path)
            documents = loader.load()
            return documents
        except Exception as e:
            print(f"Error reading MD file {file_path}: {e}")
            return []
    
    def load_document(self, file_path: str) -> List[Document]:
        """
        Load document based on file extension
        
        Args:
            file_path (str): Path to the document file
            
        Returns:
            List[Document]: List of loaded documents
        """
        file_extension = Path(file_path).suffix.lower()
        
        if file_extension == '.pdf':
            return self.read_pdf(file_path)
        elif file_extension == '.docx':
            return self.read_docx(file_path)
        elif file_extension == '.txt':
            return self.read_txt(file_path)
        elif file_extension == '.md':
            return self.read_md(file_path)
        else:
            print(f"Unsupported file type: {file_extension}")
            return []
    
    def chunk_documents(self, documents: List[Document]) -> List[str]:
        """
        Chunk documents and return list of strings
        
        Args:
            documents (List[Document]): List of documents to chunk
            
        Returns:
            List[str]: List of chunked text strings
        """
        if not documents:
            return []
        
        # Split documents into chunks
        chunks = self.text_splitter.split_documents(documents)
        
        # Extract text content from chunks
        chunk_texts = [chunk.page_content for chunk in chunks]
        
        return chunk_texts
    
    def process_file(self, file_path: str) -> List[str]:
        """
        Process a single file: load and chunk it
        
        Args:
            file_path (str): Path to the file to process
            
        Returns:
            List[str]: List of chunked text strings
        """
        if not os.path.exists(file_path):
            print(f"File not found: {file_path}")
            return []
        
        # Load document
        documents = self.load_document(file_path)
        
        if not documents:
            print(f"No content loaded from {file_path}")
            return []
        
        # Chunk documents
        chunks = self.chunk_documents(documents)
        
        print(f"Successfully processed {file_path}: {len(chunks)} chunks created")
        return chunks
    
    def process_multiple_files(self, file_paths: List[str]) -> List[str]:
        """
        Process multiple files and return combined chunks
        
        Args:
            file_paths (List[str]): List of file paths to process
            
        Returns:
            List[str]: Combined list of chunked text strings
        """
        all_chunks = []
        
        for file_path in file_paths:
            chunks = self.process_file(file_path)
            all_chunks.extend(chunks)
        
        return all_chunks


# Example usage and utility functions
def main():
    """Example usage of the DocumentChunker class"""
    
    # Initialize chunker with custom parameters
    chunker = DocumentChunker(chunk_size=1000, chunk_overlap=150)
    
    # Example: Process a single file
    file_path = "/home/ubuntu/OMANI-Therapist-Voice-ChatBot/KnowledgebaseFile/SuicideGuard_An_NLP-Based_Chrome_Extension_for_Detecting_Suicidal_Thoughts_in_Bengali.pdf"  # Replace with your file path
    chunks = chunker.process_file(file_path)
    
    if chunks:
        print(f"Total chunks: {len(chunks)}")
        print("\nFirst chunk preview:")
        print(chunks[0][:200] + "..." if len(chunks[0]) > 200 else chunks[0])
    
    # Example: Process multiple files
    file_paths = [
        "document1.pdf",
        "document2.docx",
        "document3.txt",
        "document4.md"
    ]
    
    all_chunks = chunker.process_multiple_files(file_paths)
    print(f"\nTotal chunks from all files: {len(all_chunks)}")
    
    return all_chunks


def create_chunker_with_custom_settings(chunk_size: int = 1000, 
                                       chunk_overlap: int = 200) -> DocumentChunker:
    """
    Create a DocumentChunker with custom settings
    
    Args:
        chunk_size (int): Size of each chunk
        chunk_overlap (int): Overlap between chunks
        
    Returns:
        DocumentChunker: Configured chunker instance
    """
    return DocumentChunker(chunk_size=chunk_size, chunk_overlap=chunk_overlap)


if __name__ == "__main__":
    main()

Successfully processed /home/ubuntu/OMANI-Therapist-Voice-ChatBot/KnowledgebaseFile/SuicideGuard_An_NLP-Based_Chrome_Extension_for_Detecting_Suicidal_Thoughts_in_Bengali.pdf: 31 chunks created
Total chunks: 31

First chunk preview:
2024 27th International Conference on Computer and Information Technology (ICCIT)
20-22 December 2024, Cox’s Bazar, Bangladesh
SuicideGuard: An NLP-Based Chrome Extension
for Detecting Suicidal Though...
File not found: document1.pdf
File not found: document2.docx
File not found: document3.txt
File not found: document4.md

Total chunks from all files: 0


In [5]:
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings
from typing import Union, Literal

class EmbeddingFactory:
    """Factory for creating embedding instances"""
    
    _huggingface_instance = None
    _openai_instance = None
    
    @classmethod
    def get_embeddings(cls, provider: Literal["huggingface", "openai"] = "huggingface") -> Union[HuggingFaceEmbeddings, OpenAIEmbeddings]:
        """Get or create embeddings instance (singleton pattern)"""
        if provider == "huggingface":
            if cls._huggingface_instance is None:
                cls._huggingface_instance = HuggingFaceEmbeddings(
                    model_name=settings.EMBEDDING_MODEL
                )
            return cls._huggingface_instance
        elif provider == "openai":
            if cls._openai_instance is None:
                cls._openai_instance = OpenAIEmbeddings(
                    model=settings.OPENAI_EMBEDDING_MODEL,
                    openai_api_key=settings.OPENAI_API_KEY
                )
            return cls._openai_instance
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    @classmethod
    def create_new_embeddings(cls, provider: Literal["huggingface", "openai"] = "huggingface", **kwargs) -> Union[HuggingFaceEmbeddings, OpenAIEmbeddings]:
        """Create a new embeddings instance with custom parameters"""
        if provider == "huggingface":
            return HuggingFaceEmbeddings(
                model_name=kwargs.get("model_name", settings.EMBEDDING_MODEL),
                **{k: v for k, v in kwargs.items() if k != "model_name"}
            )
        elif provider == "openai":
            return OpenAIEmbeddings(
                model=kwargs.get("model", settings.OPENAI_EMBEDDING_MODEL),
                openai_api_key=kwargs.get("api_key", settings.OPENAI_API_KEY),
                **{k: v for k, v in kwargs.items() if k not in ["model", "api_key"]}
            )
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    @classmethod
    def get_huggingface_embeddings(cls) -> HuggingFaceEmbeddings:
        """Convenience method to get HuggingFace embeddings"""
        return cls.get_embeddings("huggingface")
    
    @classmethod
    def get_openai_embeddings(cls) -> OpenAIEmbeddings:
        """Convenience method to get OpenAI embeddings"""
        return cls.get_embeddings("openai")
    
    @classmethod
    def reset_instances(cls):
        """Reset singleton instances (useful for testing)"""
        cls._huggingface_instance = None
        cls._openai_instance = None

In [7]:
from langchain_groq import ChatGroq
from langchain_openai import ChatOpenAI

from typing import Union, Literal

class LLMFactory:
    """Factory for creating LLM instances"""
    
    _groq_instance = None
    _openai_instance = None
    
    @classmethod
    def get_llm(cls, provider: Literal["groq", "openai"] = "groq") -> Union[ChatGroq, ChatOpenAI]:
        """Get or create LLM instance (singleton pattern)"""
        if provider == "groq":
            if cls._groq_instance is None:
                cls._groq_instance = ChatGroq(
                    model=settings.GROQ_MODEL,
                    temperature=settings.GROQ_TEMPERATURE,
                    groq_api_key=settings.GROQ_API_KEY
                )
            return cls._groq_instance
        elif provider == "openai":
            if cls._openai_instance is None:
                cls._openai_instance = ChatOpenAI(
                    model=settings.OPENAI_MODEL,
                    temperature=settings.OPENAI_TEMPERATURE,
                    openai_api_key=settings.OPENAI_API_KEY
                )
            return cls._openai_instance
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    @classmethod
    def create_new_llm(cls, provider: Literal["groq", "openai"] = "groq", **kwargs) -> Union[ChatGroq, ChatOpenAI]:
        """Create a new LLM instance with custom parameters"""
        if provider == "groq":
            return ChatGroq(
                model=kwargs.get("model", settings.GROQ_MODEL),
                temperature=kwargs.get("temperature", settings.GROQ_TEMPERATURE),
                groq_api_key=kwargs.get("api_key", settings.GROQ_API_KEY)
            )
        elif provider == "openai":
            return ChatOpenAI(
                model=kwargs.get("model", settings.OPENAI_MODEL),
                temperature=kwargs.get("temperature", settings.OPENAI_TEMPERATURE),
                openai_api_key=kwargs.get("api_key", settings.OPENAI_API_KEY)
            )
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    @classmethod
    def get_groq_llm(cls) -> ChatGroq:
        """Convenience method to get Groq LLM"""
        return cls.get_llm("groq")
    
    @classmethod
    def get_openai_llm(cls) -> ChatOpenAI:
        """Convenience method to get OpenAI LLM"""
        return cls.get_llm("openai")
    
    @classmethod
    def reset_instances(cls):
        """Reset singleton instances (useful for testing)"""
        cls._groq_instance = None
        cls._openai_instance = None

In [None]:
from langchain_tavily import TavilySearch
from langchain_community.utilities import GoogleSerperAPIWrapper
from langchain_community.tools import GoogleSerperRun
from typing import Union, Literal

class SearchToolFactory:
    """Factory for creating search tools"""
    
    _tavily_instance = None
    _serper_instance = None
    
    @classmethod
    def get_search_tool(cls, provider: Literal["tavily", "serper"] = "tavily") -> Union[TavilySearch, GoogleSerperRun]:
        """Get or create search tool instance (singleton pattern)"""
        if provider == "tavily":
            if cls._tavily_instance is None:
                cls._tavily_instance = TavilySearch(
                    api_key=settings.TAVILY_API_KEY
                )
            return cls._tavily_instance
        elif provider == "serper":
            if cls._serper_instance is None:
                search_wrapper = GoogleSerperAPIWrapper(
                    serper_api_key=settings.SERPER_API_KEY
                )
                cls._serper_instance = GoogleSerperRun(api_wrapper=search_wrapper)
            return cls._serper_instance
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    @classmethod
    def create_new_search_tool(cls, provider: Literal["tavily", "serper"] = "tavily", **kwargs) -> Union[TavilySearchResults, GoogleSerperRun]:
        """Create a new search tool instance with custom parameters"""
        if provider == "tavily":
            return TavilySearch(
                api_key=kwargs.get("api_key", settings.TAVILY_API_KEY),
                max_results=kwargs.get("max_results", settings.SEARCH_RESULTS_COUNT),
                search_depth=kwargs.get("search_depth", settings.TAVILY_SEARCH_DEPTH),
                include_answer=kwargs.get("include_answer", settings.TAVILY_INCLUDE_ANSWER),
                include_raw_content=kwargs.get("include_raw_content", settings.TAVILY_INCLUDE_RAW_CONTENT),
                **{k: v for k, v in kwargs.items() if k not in ["api_key", "max_results", "search_depth", "include_answer", "include_raw_content"]}
            )
        elif provider == "serper":
            search_wrapper = GoogleSerperAPIWrapper(
                serper_api_key=kwargs.get("api_key", settings.SERPER_API_KEY),
                k=kwargs.get("k", settings.SEARCH_RESULTS_COUNT),
                type=kwargs.get("type", settings.SERPER_SEARCH_TYPE),
                country=kwargs.get("country", settings.SERPER_COUNTRY),
                location=kwargs.get("location", settings.SERPER_LOCATION),
                **{k: v for k, v in kwargs.items() if k not in ["api_key", "k", "type", "country", "location"]}
            )
            return GoogleSerperRun(api_wrapper=search_wrapper)
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    @classmethod
    def get_tavily_search(cls) -> TavilySearch:
        """Convenience method to get Tavily search tool"""
        return cls.get_search_tool("tavily")
    
    @classmethod
    def get_serper_search(cls) -> GoogleSerperRun:
        """Convenience method to get Serper search tool"""
        return cls.get_search_tool("serper")
    
    @classmethod
    def reset_instances(cls):
        """Reset singleton instances (useful for testing)"""
        cls._tavily_instance = None
        cls._serper_instance = None

In [9]:
from langchain_community.vectorstores import FAISS
from langchain_huggingface  import HuggingFaceEmbeddings
from typing import List, Optional
import os

class VectorStoreManager:
    """Manager for vector store operations"""
    
    def __init__(self):
        self.embeddings = EmbeddingFactory.get_embeddings()
        self.vectorstore = None
    
    def load_vectorstore(self, path: Optional[str] = None) -> bool:
        """Load vector store from path"""
        try:
            path = path or settings.VECTORSTORE_PATH
            if os.path.exists(path):
                self.vectorstore = FAISS.load_local(path, self.embeddings)
                return True
            return False
        except Exception as e:
            print(f"Error loading vectorstore: {e}")
            return False
    
    def search_documents(self, query: str, k: int = 3) -> List[str]:
        """Search for similar documents"""
        if not self.vectorstore:
            return []
        
        try:
            docs = self.vectorstore.similarity_search(query, k=k)
            return [doc.page_content for doc in docs]
        except Exception as e:
            print(f"Error searching documents: {e}")
            return []
    
    def add_documents(self, texts: List[str], metadatas: Optional[List[dict]] = None):
        """Add documents to vector store"""
        if not self.vectorstore:
            self.vectorstore = FAISS.from_texts(texts, self.embeddings, metadatas=metadatas)
        else:
            self.vectorstore.add_texts(texts, metadatas=metadatas)
    
    def save_vectorstore(self, path: Optional[str] = None):
        """Save vector store to path"""
        if self.vectorstore:
            path = path or settings.VECTORSTORE_PATH
            self.vectorstore.save_local(path)

In [30]:
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from typing import List, Optional, Dict, Any
import os
from pathlib import Path

class VectorStoreManager:
    """Manager for vector store operations"""
    
    def __init__(self):
        self.embeddings = EmbeddingFactory.get_embeddings()
        self.vectorstore = None
    
    def load_vectorstore(self, path: Optional[str] = None) -> bool:
        """Load vector store from path"""
        try:
            path = path or settings.VECTORSTORE_PATH
            if os.path.exists(path):
                self.vectorstore = FAISS.load_local(path, self.embeddings)
                return True
            return False
        except Exception as e:
            print(f"Error loading vectorstore: {e}")
            return False
    
    def search_documents(self, query: str, k: int = 3) -> List[str]:
        """Search for similar documents"""
        if not self.vectorstore:
            return []
        
        try:
            docs = self.vectorstore.similarity_search(query, k=k)
            return [doc.page_content for doc in docs]
        except Exception as e:
            print(f"Error searching documents: {e}")
            return []
    
    def add_documents(self, texts: List[str], metadatas: Optional[List[dict]] = None):
        """Add documents to vector store"""
        if not self.vectorstore:
            self.vectorstore = FAISS.from_texts(texts, self.embeddings, metadatas=metadatas)
        else:
            self.vectorstore.add_texts(texts, metadatas=metadatas)
    
    def save_vectorstore(self, path: Optional[str] = None):
        """Save vector store to path"""
        if self.vectorstore:
            path = path or settings.VECTORSTORE_PATH
            self.vectorstore.save_local(path)


def store_documents_in_vectorstore(
    file_paths: List[str],
    vectorstore_manager: Optional[VectorStoreManager] = None,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    save_path: Optional[str] = None,
    include_metadata: bool = True
) -> Dict[str, Any]:
    """
    Process documents and store them in vector store
    
    Args:
        file_paths (List[str]): List of file paths to process
        vectorstore_manager (VectorStoreManager, optional): Existing manager instance
        chunk_size (int): Size of each chunk
        chunk_overlap (int): Overlap between chunks
        save_path (str, optional): Path to save the vector store
        include_metadata (bool): Whether to include file metadata
        
    Returns:
        Dict[str, Any]: Processing results with statistics
    """
    # Initialize components
    if vectorstore_manager is None:
        vectorstore_manager = VectorStoreManager()
    
    chunker = DocumentChunker(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    
    # Load existing vectorstore if available
    vectorstore_manager.load_vectorstore(save_path)
    
    # Track processing statistics
    results = {
        "total_files": len(file_paths),
        "processed_files": 0,
        "failed_files": [],
        "total_chunks": 0,
        "chunks_by_file": {}
    }
    
    try:
        for file_path in file_paths:
            try:
                print(f"Processing file: {file_path}")
                
                # Process file into chunks
                chunks = chunker.process_file(file_path)
                
                if chunks:
                    # Prepare metadata if requested
                    metadatas = None
                    if include_metadata:
                        file_name = Path(file_path).name
                        file_extension = Path(file_path).suffix
                        metadatas = [
                            {
                                "source": file_path,
                                "file_name": file_name,
                                "file_extension": file_extension,
                                "chunk_index": i
                            }
                            for i in range(len(chunks))
                        ]
                    
                    # Add documents to vector store
                    vectorstore_manager.add_documents(chunks, metadatas)
                    
                    # Update statistics
                    results["processed_files"] += 1
                    results["total_chunks"] += len(chunks)
                    results["chunks_by_file"][file_path] = len(chunks)
                    
                    print(f"Successfully processed {file_path}: {len(chunks)} chunks")
                    
                else:
                    print(f"No chunks extracted from {file_path}")
                    results["failed_files"].append(file_path)
                    
            except Exception as e:
                print(f"Error processing file {file_path}: {e}")
                results["failed_files"].append(file_path)
        
        # Save the vector store
        if results["total_chunks"] > 0:
            vectorstore_manager.save_vectorstore(save_path)
            print(f"Vector store saved with {results['total_chunks']} total chunks")
        
        return results
        
    except Exception as e:
        print(f"Error in store_documents_in_vectorstore: {e}")
        results["error"] = str(e)
        return results


def store_single_document_in_vectorstore(
    file_path: str,
    vectorstore_manager: Optional[VectorStoreManager] = None,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    save_path: Optional[str] = None
) -> bool:
    """
    Process and store a single document in vector store
    
    Args:
        file_path (str): Path to the file to process
        vectorstore_manager (VectorStoreManager, optional): Existing manager instance
        chunk_size (int): Size of each chunk
        chunk_overlap (int): Overlap between chunks
        save_path (str, optional): Path to save the vector store
        
    Returns:
        bool: Success status
    """
    results = store_documents_in_vectorstore(
        file_paths=[file_path],
        vectorstore_manager=vectorstore_manager,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        save_path=save_path
    )
    
    return results["processed_files"] > 0


def batch_store_documents(
    directory_path: str,
    file_extensions: List[str] = [".pdf", ".docx", ".txt", ".md"],
    vectorstore_manager: Optional[VectorStoreManager] = None,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    save_path: Optional[str] = None
) -> Dict[str, Any]:
    """
    Process and store all documents from a directory
    
    Args:
        directory_path (str): Path to directory containing documents
        file_extensions (List[str]): List of file extensions to process
        vectorstore_manager (VectorStoreManager, optional): Existing manager instance
        chunk_size (int): Size of each chunk
        chunk_overlap (int): Overlap between chunks
        save_path (str, optional): Path to save the vector store
        
    Returns:
        Dict[str, Any]: Processing results
    """
    # Find all files with specified extensions
    directory = Path(directory_path)
    file_paths = []
    
    for extension in file_extensions:
        file_paths.extend(directory.glob(f"*{extension}"))
    
    # Convert to string paths
    file_paths = [str(path) for path in file_paths]
    
    if not file_paths:
        print(f"No files found in {directory_path} with extensions {file_extensions}")
        return {"total_files": 0, "processed_files": 0, "failed_files": [], "total_chunks": 0}
    
    print(f"Found {len(file_paths)} files to process")
    
    return store_documents_in_vectorstore(
        file_paths=file_paths,
        vectorstore_manager=vectorstore_manager,
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        save_path=save_path
    )


# Example usage
def main():
    """Example usage of the vector store functions"""
    
    # Initialize vector store manager
    vs_manager = VectorStoreManager()
    
    # Example 1: Store a single document
    print("=== Storing Single Document ===")
    file_path = "/home/ubuntu/OMANI-Therapist-Voice-ChatBot/KnowledgebaseFile/SuicideGuard_An_NLP-Based_Chrome_Extension_for_Detecting_Suicidal_Thoughts_in_Bengali.pdf"
    success = store_single_document_in_vectorstore(
        file_path=file_path,
        vectorstore_manager=vs_manager,
        chunk_size=1000,
        chunk_overlap=150
    )
    print(f"Single document processing: {'Success' if success else 'Failed'}")
    
    # # Example 2: Store multiple documents
    # print("\n=== Storing Multiple Documents ===")
    # file_paths = [
    #     "document1.pdf",
    #     "document2.docx",
    #     "document3.txt"
    # ]
    
    # results = store_documents_in_vectorstore(
    #     file_paths=file_paths,
    #     vectorstore_manager=vs_manager,
    #     chunk_size=1000,
    #     chunk_overlap=200
    # )
    
    # print(f"Processing Results:")
    # print(f"  Total files: {results['total_files']}")
    # print(f"  Processed files: {results['processed_files']}")
    # print(f"  Failed files: {results['failed_files']}")
    # print(f"  Total chunks: {results['total_chunks']}")
    
    # # Example 3: Batch process directory
    # print("\n=== Batch Processing Directory ===")
    # directory_path = "/home/ubuntu/OMANI-Therapist-Voice-ChatBot/KnowledgebaseFile/"
    
    # batch_results = batch_store_documents(
    #     directory_path=directory_path,
    #     file_extensions=[".pdf", ".docx", ".txt", ".md"],
    #     vectorstore_manager=vs_manager
    # )
    
    # print(f"Batch Processing Results:")
    # print(f"  Total files: {batch_results['total_files']}")
    # print(f"  Processed files: {batch_results['processed_files']}")
    # print(f"  Total chunks: {batch_results['total_chunks']}")
    
    # Example 4: Search the vector store
    print("\n=== Searching Vector Store ===")
    query = "suicide prevention techniques"
    search_results = vs_manager.search_documents(query, k=3)
    
    print(f"Search results for '{query}':")
    for i, result in enumerate(search_results):
        print(f"  Result {i+1}: {result[:200]}...")


if __name__ == "__main__":
    main()

=== Storing Single Document ===
Processing file: /home/ubuntu/OMANI-Therapist-Voice-ChatBot/KnowledgebaseFile/SuicideGuard_An_NLP-Based_Chrome_Extension_for_Detecting_Suicidal_Thoughts_in_Bengali.pdf
Successfully processed /home/ubuntu/OMANI-Therapist-Voice-ChatBot/KnowledgebaseFile/SuicideGuard_An_NLP-Based_Chrome_Extension_for_Detecting_Suicidal_Thoughts_in_Bengali.pdf: 31 chunks created
Successfully processed /home/ubuntu/OMANI-Therapist-Voice-ChatBot/KnowledgebaseFile/SuicideGuard_An_NLP-Based_Chrome_Extension_for_Detecting_Suicidal_Thoughts_in_Bengali.pdf: 31 chunks
Vector store saved with 31 total chunks
Single document processing: Success

=== Searching Vector Store ===
Search results for 'suicide prevention techniques':
  Result 1: by mining out depressions from their posts, as it has been trained
on 2,590 Bangla data. This immensely useful system has been
trained with the BERT model with 92% accuracy after analysing
models like...
  Result 2: Department of Computer Science and

In [10]:
from typing import List, Dict, Any, Optional
from pydantic import BaseModel

class QueryRequest(BaseModel):
    """Request schema for query processing"""
    query: str
    session_id: Optional[str] = None
    metadata: Optional[Dict[str, Any]] = None

class QueryResponse(BaseModel):
    """Response schema for query processing"""
    query: str
    upgraded_query: str
    route_taken: str
    response: str
    metadata: Dict[str, Any]
    processing_time: float

class ProcessingMetadata(BaseModel):
    """Metadata for processing steps"""
    upgrade_success: bool = False
    routing_success: bool = False
    path_success: bool = False
    errors: List[str] = []
    processing_time: float = 0.0

In [11]:
from typing import Dict, List, Any
from pydantic import BaseModel, Field

class AgentState(BaseModel):
    """State schema for the AgenticRAG workflow"""
    
    user_query: str = Field(description="Original user query")
    upgraded_query: str = Field(default="", description="Enhanced query")
    route_decision: str = Field(default="", description="Routing decision")
    retrieved_docs: List[str] = Field(default_factory=list, description="Retrieved documents")
    search_results: List[str] = Field(default_factory=list, description="Web search results")
    final_response: str = Field(default="", description="Final response")
    metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
    
    class Config:
        """Pydantic configuration"""
        arbitrary_types_allowed = True

In [12]:
from langchain_core.prompts import ChatPromptTemplate

class Prompts:
    """Centralized prompt templates"""
    
    QUERY_UPGRADER = ChatPromptTemplate.from_messages([
        ("system", """You are a query enhancement specialist. Your task is to improve user queries for better information retrieval.
        
        Enhancement guidelines:
        1. Add relevant keywords and synonyms
        2. Clarify ambiguous terms
        3. Expand abbreviations and acronyms
        4. Add context when missing
        5. Maintain original intent
        6. Keep enhanced query concise (under 200 characters)
        
        Return only the enhanced query, nothing else."""),
        ("human", "Original query: {query}")
    ])
    
    QUERY_ROUTER = ChatPromptTemplate.from_messages([
        ("system", """You are a query router. Analyze the query and decide which path to take:

        PATHS:
        1. "RAG" - For queries about specific knowledge base content, documents, or domain expertise
        2. "WEB" - For current events, real-time information, recent news, or trending topics
        3. "DIRECT" - For general conversation, creative tasks, opinions, or reasoning without specific facts

        DECISION CRITERIA:
        - RAG: Domain-specific questions, technical documentation, specific facts from knowledge base
        - WEB: Questions with temporal keywords (latest, current, recent, today), current events, real-time data
        - DIRECT: General chat, creative writing, opinions, mathematical reasoning, casual conversation

        Respond with only one word: RAG, WEB, or DIRECT"""),
        ("human", "Query: {query}")
    ])
    
    RAG_RESPONSE = ChatPromptTemplate.from_messages([
        ("system", """You are a helpful assistant. Answer the user's question based on the provided context from the knowledge base.
        
        Context: {context}
        
        If the context doesn't contain relevant information, say so clearly."""),
        ("human", "{query}")
    ])
    
    WEB_RESPONSE = ChatPromptTemplate.from_messages([
        ("system", """You are a helpful assistant. Answer the user's question based on the provided web search results.
        
        Search Results: {search_results}
        
        Provide a comprehensive answer based on the search results. If the results don't contain relevant information, say so clearly."""),
        ("human", "{query}")
    ])
    
    DIRECT_RESPONSE = ChatPromptTemplate.from_messages([
        ("system", """You are a helpful AI assistant. Answer the user's question directly using your knowledge and reasoning capabilities.
        
        Be conversational, accurate, and helpful. If you're unsure about something, acknowledge the uncertainty."""),
        ("human", "{query}")
    ])

In [13]:
class DirectLLMNode:
    """Node for direct LLM processing"""
    
    def __init__(self):
        self.llm = LLMFactory.get_llm()
        self.prompt = Prompts.DIRECT_RESPONSE
    
    def process_direct_llm(self, state: AgentState) -> AgentState:
        """Process direct LLM path"""
        
        try:
            chain = self.prompt | self.llm
            
            response = chain.invoke({"query": state.upgraded_query})
            state.final_response = response.content
            state.metadata["direct_llm_success"] = True
            
        except Exception as e:
            state.final_response = "Sorry, I couldn't process your request at the moment."
            state.metadata["direct_llm_success"] = False
            state.metadata["direct_llm_error"] = str(e)
        
        return state

# Node function for LangGraph
def direct_llm_node(state: AgentState) -> AgentState:
    """Node function for direct LLM processing"""
    direct_processor = DirectLLMNode()
    return direct_processor.process_direct_llm(state)

In [14]:
class QueryRouter:
    """Node for routing queries to appropriate paths"""
    
    def __init__(self):
        self.llm = LLMFactory.get_llm()
        self.prompt = Prompts.QUERY_ROUTER
    
    def route_query(self, state: AgentState) -> AgentState:
        """Route query to appropriate path"""
        
        chain = self.prompt | self.llm
        
        try:
            response = chain.invoke({"query": state.upgraded_query})
            route_decision = response.content.strip().upper()
            
            # Validate route decision
            if route_decision not in ["RAG", "WEB", "DIRECT"]:
                route_decision = settings.DEFAULT_ROUTE
                
            state.route_decision = route_decision
            state.metadata["routing_success"] = True
            
        except Exception as e:
            state.route_decision = settings.DEFAULT_ROUTE
            state.metadata["routing_success"] = False
            state.metadata["routing_error"] = str(e)
        
        return state

# Node function for LangGraph
def query_router_node(state: AgentState) -> AgentState:
    """Node function for query routing"""
    router = QueryRouter()
    return router.route_query(state)

In [15]:
class QueryUpgrader:
    """Node for upgrading user queries"""
    
    def __init__(self):
        self.llm = LLMFactory.get_llm()
        self.prompt = Prompts.QUERY_UPGRADER
    
    def upgrade_query(self, state: AgentState) -> AgentState:
        """Upgrade/enhance the user query"""
        
        chain = self.prompt | self.llm
        
        try:
            response = chain.invoke({"query": state.user_query})
            upgraded_query = response.content.strip()
            
            # Fallback to original if upgrade fails
            if not upgraded_query or len(upgraded_query) > settings.MAX_QUERY_LENGTH:
                upgraded_query = state.user_query
                
            state.upgraded_query = upgraded_query
            state.metadata["upgrade_success"] = True
            
        except Exception as e:
            state.upgraded_query = state.user_query
            state.metadata["upgrade_success"] = False
            state.metadata["upgrade_error"] = str(e)
        
        return state

# Node function for LangGraph
def query_upgrader_node(state: AgentState) -> AgentState:
    """Node function for query upgrading"""
    upgrader = QueryUpgrader()
    return upgrader.upgrade_query(state)

In [16]:
class RAGNode:
    """Node for RAG processing"""
    
    def __init__(self):
        self.llm = LLMFactory.get_llm()
        self.vectorstore_manager = VectorStoreManager()
        self.prompt = Prompts.RAG_RESPONSE
        
        # Load vectorstore
        self.vectorstore_manager.load_vectorstore()
    
    def process_rag(self, state: AgentState) -> AgentState:
        """Process RAG path - retrieve from knowledge base"""
        
        try:
            # Retrieve documents
            docs = self.vectorstore_manager.search_documents(state.upgraded_query, k=3)
            state.retrieved_docs = docs
            
            # Generate response with retrieved context
            chain = self.prompt | self.llm
            
            context = "\n".join(docs) if docs else "No relevant documents found."
            response = chain.invoke({
                "query": state.upgraded_query,
                "context": context
            })
            
            state.final_response = response.content
            state.metadata["rag_success"] = True
            
        except Exception as e:
            state.final_response = "Sorry, I couldn't retrieve information from the knowledge base."
            state.metadata["rag_success"] = False
            state.metadata["rag_error"] = str(e)
        
        return state

# Node function for LangGraph
def rag_node(state: AgentState) -> AgentState:
    """Node function for RAG processing"""
    rag_processor = RAGNode()
    return rag_processor.process_rag(state)

In [17]:
class WebSearchNode:
    """Node for web search processing"""
    
    def __init__(self):
        self.llm = LLMFactory.get_llm()
        self.search_tool = SearchToolFactory.get_search_tool()
        self.prompt = Prompts.WEB_RESPONSE
    
    def process_web_search(self, state: AgentState) -> AgentState:
        """Process web search path"""
        
        try:
            # Perform web search
            search_results = self.search_tool.run(state.upgraded_query)
            state.search_results = [search_results]
            
            # Generate response with search results
            chain = self.prompt | self.llm
            
            response = chain.invoke({
                "query": state.upgraded_query,
                "search_results": search_results
            })
            
            state.final_response = response.content
            state.metadata["web_search_success"] = True
            
        except Exception as e:
            state.final_response = "Sorry, I couldn't perform web search at the moment."
            state.metadata["web_search_success"] = False
            state.metadata["web_search_error"] = str(e)
        
        return state

# Node function for LangGraph
def web_search_node(state: AgentState) -> AgentState:
    """Node function for web search processing"""
    web_processor = WebSearchNode()
    return web_processor.process_web_search(state)

In [18]:
def route_query(state: AgentState) -> Literal["rag_path", "web_search", "direct_llm"]:
    """Route to appropriate path based on decision"""
    route_map = {
        "RAG": "rag_path",
        "WEB": "web_search", 
        "DIRECT": "direct_llm"
    }
    return route_map.get(state.route_decision, "direct_llm")

In [19]:
from langgraph.graph import StateGraph, END

class GraphBuilder:
    """Builder for the AgenticRAG graph"""
    
    @staticmethod
    def create_graph():
        """Create the LangGraph workflow"""
        
        # Initialize graph
        workflow = StateGraph(AgentState)
        
        # Add nodes
        workflow.add_node("query_upgrader", query_upgrader_node)
        workflow.add_node("query_router", query_router_node)
        workflow.add_node("rag_path", rag_node)
        workflow.add_node("web_search", web_search_node)
        workflow.add_node("direct_llm", direct_llm_node)
        
        # Set entry point
        workflow.set_entry_point("query_upgrader")
        
        # Add edges
        workflow.add_edge("query_upgrader", "query_router")
        
        # Add conditional edges based on routing decision
        workflow.add_conditional_edges(
            "query_router",
            route_query,
            {
                "rag_path": "rag_path",
                "web_search": "web_search",
                "direct_llm": "direct_llm"
            }
        )
        
        # All paths end at END
        workflow.add_edge("rag_path", END)
        workflow.add_edge("web_search", END)
        workflow.add_edge("direct_llm", END)
        
        # Compile the graph
        return workflow.compile()

In [29]:
import time
from typing import List
from loguru import logger

class AgenticRAGSystem:
    """Main AgenticRAG system"""
    
    def __init__(self):
        # Validate settings
        settings.validate()
        
        # Create graph
        self.app = GraphBuilder.create_graph()
        
        logger.info("AgenticRAG system initialized successfully")
    
    def process_query(self, query: str) -> QueryResponse:
        """Process a single query"""
        
        start_time = time.time()
        logger.info(f"Processing query: {query}")

        try:
            # Initialize state
            initial_state = AgentState(user_query=query)
            logger.info(f"Initial state created: {initial_state}")
            
            # Run the graph
            final_state = self.app.invoke(initial_state)
            logger.info(f"Final state after processing: {final_state}")
            
            # Calculate processing time
            processing_time = time.time() - start_time
            
            # Create response
            response = QueryResponse(
                query=final_state['user_query'],
                upgraded_query=final_state['upgraded_query'],
                route_taken=final_state['route_decision'],
                response=final_state['final_response'],
                metadata=final_state['metadata'],
                processing_time=processing_time
            )
            
            logger.info(f"Query processed successfully in {processing_time:.2f}s")
            return response
            
        except Exception as e:
            logger.error(f"Error processing query: {e}")
            raise
    
    def process_batch(self, queries: List[str]) -> List[QueryResponse]:
        """Process multiple queries"""
        
        responses = []
        for query in queries:
            try:
                response = self.process_query(query)
                responses.append(response)
            except Exception as e:
                logger.error(f"Error processing query '{query}': {e}")
        
        return responses

def main():
    """Main function"""
    
    # Initialize system
    system = AgenticRAGSystem()
    
    # Test queries
    test_queries = [
        "What is machine learning?",
        "Latest news about AI",
        "Write a poem about spring"
    ]
    
    # Process queries
    for query in test_queries:
        print(f"\n{'='*50}")
        print(f"Query: {query}")
        print(f"{'='*50}")
        
        try:
            response = system.process_query(query)
            
            print(f"Original Query: {response.query}")
            print(f"Upgraded Query: {response.upgraded_query}")
            print(f"Route Taken: {response.route_taken}")
            print(f"Response: {response.response}")
            print(f"Processing Time: {response.processing_time:.2f}s")
            print(f"Metadata: {response.metadata}")
            
        except Exception as e:
            print(f"Error: {e}")

if __name__ == "__main__":
    main()

[32m2025-07-12 19:44:25.409[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m15[0m - [1mAgenticRAG system initialized successfully[0m
[32m2025-07-12 19:44:25.410[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_query[0m:[36m21[0m - [1mProcessing query: What is machine learning?[0m
[32m2025-07-12 19:44:25.411[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_query[0m:[36m26[0m - [1mInitial state created: user_query='What is machine learning?' upgraded_query='' route_decision='' retrieved_docs=[] search_results=[] final_response='' metadata={}[0m



Query: What is machine learning?


[32m2025-07-12 19:44:26.493[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_query[0m:[36m30[0m - [1mFinal state after processing: {'user_query': 'What is machine learning?', 'upgraded_query': 'What is artificial intelligence machine learning algorithm?', 'route_decision': 'RAG', 'retrieved_docs': [], 'search_results': [], 'final_response': "I apologize, but since there are no relevant documents found in my knowledge base, I don't have any information on what artificial intelligence machine learning algorithm is. If you're looking for information on this topic, I suggest searching online or consulting a reliable source.", 'metadata': {'upgrade_success': True, 'routing_success': True, 'rag_success': True}}[0m
[32m2025-07-12 19:44:26.493[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_query[0m:[36m45[0m - [1mQuery processed successfully in 1.08s[0m
[32m2025-07-12 19:44:26.495[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_query[0m:[36m21[0m - [1mP

Original Query: What is machine learning?
Upgraded Query: What is artificial intelligence machine learning algorithm?
Route Taken: RAG
Response: I apologize, but since there are no relevant documents found in my knowledge base, I don't have any information on what artificial intelligence machine learning algorithm is. If you're looking for information on this topic, I suggest searching online or consulting a reliable source.
Processing Time: 1.08s
Metadata: {'upgrade_success': True, 'routing_success': True, 'rag_success': True}

Query: Latest news about AI


[32m2025-07-12 19:44:28.914[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_query[0m:[36m30[0m - [1mFinal state after processing: {'user_query': 'Latest news about AI', 'upgraded_query': 'Enhanced query: Latest artificial intelligence news updates', 'route_decision': 'WEB', 'retrieved_docs': [], 'search_results': [{'query': 'Enhanced query: Latest artificial intelligence news updates', 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [{'url': 'https://www.artificialintelligence-news.com/', 'title': 'AI News | Latest AI News, Analysis & Events', 'content': 'AI News reports on the latest artificial intelligence news and insights. Explore industry trends from the frontline of AI.', 'score': 0.5806618, 'raw_content': None}, {'url': 'https://www.sciencedaily.com/news/computers_math/artificial_intelligence/', 'title': 'Artificial Intelligence News - ScienceDaily', 'content': 'Artificial Intelligence News. Everything on AI including futuristic robots with art

Original Query: Latest news about AI
Upgraded Query: Enhanced query: Latest artificial intelligence news updates
Route Taken: WEB
Response: Based on the search results, here's a comprehensive answer to your query:

The latest artificial intelligence news updates can be found on various websites and blogs that report on the latest developments and breakthroughs in the field of AI. Some of the top sources for AI news include AI News, ScienceDaily, Crescendo AI, and Google's official blog.

According to AI News, the latest AI news and insights can be explored on their website, which reports on industry trends from the frontline of AI. ScienceDaily also provides a comprehensive section on artificial intelligence news, covering topics such as futuristic robots with AI, computer models of human intelligence, and more.

Crescendo AI recently published a blog post highlighting the latest AI breakthroughs and news from May, June, and July 2025. The post covers various developments in the field,

[32m2025-07-12 19:44:30.044[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_query[0m:[36m30[0m - [1mFinal state after processing: {'user_query': 'Write a poem about spring', 'upgraded_query': 'Enhanced query: Write a poem about the arrival of spring season', 'route_decision': 'DIRECT', 'retrieved_docs': [], 'search_results': [], 'final_response': "As winter's chill begins to fade,\nThe earth awakens from its shade,\nThe sun shines bright with warmer rays,\nAnd spring's sweet symphony sways.\n\nThe trees, once bare and branch-like gray,\nNow don a cloak of green array,\nTheir leaves unfurl, a vibrant hue,\nAs nature's palette is anew.\n\nThe air is filled with scents so sweet,\nOf blooming flowers, fresh and neat,\nThe hum of bees, a gentle thrum,\nAs they collect nectar, one by one.\n\nThe world, once frozen, now comes alive,\nWith chirping birds, a joyful jive,\nTheir songs, a chorus, loud and clear,\nAs spring's arrival banishes all fear.\n\nThe snow, that once lay deep 

Original Query: Write a poem about spring
Upgraded Query: Enhanced query: Write a poem about the arrival of spring season
Route Taken: DIRECT
Response: As winter's chill begins to fade,
The earth awakens from its shade,
The sun shines bright with warmer rays,
And spring's sweet symphony sways.

The trees, once bare and branch-like gray,
Now don a cloak of green array,
Their leaves unfurl, a vibrant hue,
As nature's palette is anew.

The air is filled with scents so sweet,
Of blooming flowers, fresh and neat,
The hum of bees, a gentle thrum,
As they collect nectar, one by one.

The world, once frozen, now comes alive,
With chirping birds, a joyful jive,
Their songs, a chorus, loud and clear,
As spring's arrival banishes all fear.

The snow, that once lay deep and wide,
Now melts away, a gentle tide,
Revealing hidden streams and brooks,
That babble, gurgle, and gentle crooks.

The earth, once barren, now is green,
A canvas, vibrant, fresh, and serene,
As spring's arrival brings new birth

In [1]:
from langchain_core.prompts import ChatPromptTemplate
import json

class Prompts:
    """Centralized prompt templates"""

    @staticmethod
    def load_kb_description():
        """Dynamically load knowledge base descriptions"""
        try:
            with open("knowledge_base_metadata.json", 'r', encoding='utf-8') as f:
                data = json.load(f)
            description = ""
            for item in data:
                description += f"- {item.get('description', '').strip()}\n"
            return description.strip()
        except FileNotFoundError:
            return "No knowledge base found."
        except json.JSONDecodeError as e:
            return f"Error decoding knowledge base: {e}"

    @classmethod
    def query_router(cls):
        """Return QUERY_ROUTER with dynamic KB info"""
        kb_description = cls.load_kb_description()
        return ChatPromptTemplate.from_messages([
            ("system", f"""You are a query router. Analyze the query and decide which path to take:

PATHS:
1. "RAG" - For queries about specific knowledge base content, documents, or domain expertise
2. "WEB" - For current events, real-time information, recent news, or trending topics
3. "DIRECT" - For general conversation, creative tasks, opinions, or reasoning without specific facts

DECISION CRITERIA:
- RAG: Domain-specific questions, technical documentation, specific facts from knowledge base
- WEB: Questions with temporal keywords (latest, current, recent, today), current events, real-time data
- DIRECT: General chat, creative writing, opinions, mathematical reasoning, casual conversation

Knowledge Base contains:
{kb_description}

Respond with only one word: RAG, WEB, or DIRECT"""),
            ("human", "Query: {{query}}")
        ])

    # Other prompts stay unchanged
    QUERY_UPGRADER = ChatPromptTemplate.from_messages([
        ("system", """You are a query enhancement specialist..."""),  # shortened for brevity
        ("human", "Original query: {query}")
    ])

    RAG_RESPONSE = ChatPromptTemplate.from_messages([
        ("system", """You are a helpful assistant. Answer the user's question based on the provided context from the knowledge base.
        
        Context: {context}
        
        If the context doesn't contain relevant information, say so clearly."""),
        ("human", "{query}")
    ])

    WEB_RESPONSE = ChatPromptTemplate.from_messages([
        ("system", """You are a helpful assistant. Answer the user's question based on the provided web search results.
        
        Search Results: {search_results}
        
        Provide a comprehensive answer..."""),
        ("human", "{query}")
    ])

    DIRECT_RESPONSE = ChatPromptTemplate.from_messages([
        ("system", """You are a helpful AI assistant. Answer the user's question directly using your knowledge and reasoning capabilities."""),
        ("human", "{query}")
    ])

if __name__ == "__main__":
    prompt = Prompts.query_router()
    print(prompt.format(query="What is the architecture of the Omani AI system?"))


System: You are a query router. Analyze the query and decide which path to take:

PATHS:
1. "RAG" - For queries about specific knowledge base content, documents, or domain expertise
2. "WEB" - For current events, real-time information, recent news, or trending topics
3. "DIRECT" - For general conversation, creative tasks, opinions, or reasoning without specific facts

DECISION CRITERIA:
- RAG: Domain-specific questions, technical documentation, specific facts from knowledge base
- WEB: Questions with temporal keywords (latest, current, recent, today), current events, real-time data
- DIRECT: General chat, creative writing, opinions, mathematical reasoning, casual conversation

Knowledge Base contains:
- resume of an AI developer

Respond with only one word: RAG, WEB, or DIRECT
Human: Query: {query}
