In [None]:
import os
import json
import logging
from pathlib import Path
import pypdf
from docx import Document as DocxDocument
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from langchain.memory import ConversationBufferMemory
from agno.agent import Agent
from agno.tools.reasoning import ReasoningTools
from agno.memory.v2.memory import Memory
from agno.memory.v2.db.sqlite import SqliteMemoryDb
import sqlite3
from datetime import datetime

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)



In [None]:
# Azure OpenAI Configuration
AZURE_CONFIG = {
    "api_key": "",
    "endpoint": "",
    "api_version": "2024-12-01-preview",
    "embedding_deployment": "text-embedding-ada-002",
    "gpt_deployment": "gpt-4o"
}

# Initialize Azure OpenAI clients
llm = AzureChatOpenAI(
    openai_api_key=AZURE_CONFIG["api_key"],
    azure_endpoint=AZURE_CONFIG["endpoint"],
    api_version=AZURE_CONFIG["api_version"],
    deployment_name=AZURE_CONFIG["gpt_deployment"],
    temperature=0.7
)

embeddings = AzureOpenAIEmbeddings(
    openai_api_key=AZURE_CONFIG["api_key"],
    azure_endpoint=AZURE_CONFIG["endpoint"],
    api_version=AZURE_CONFIG["api_version"],
    deployment=AZURE_CONFIG["embedding_deployment"]
)

In [None]:


# Custom tool for document summarization
class DocumentSummaryTool:
    name = "document_summary"
    description = "Summarizes a document chunk for concise understanding."
    
    def run(self, document_text: str) -> str:
        """Summarize a document chunk using the LLM."""
        try:
            prompt = f"Summarize the following document text in 2-3 sentences:\n{document_text}"
            summary = llm.invoke(prompt).content
            logger.info("Generated document summary")
            return summary
        except Exception as e:
            logger.error(f"Error summarizing document: {e}")
            return "Unable to summarize document."

# InsuranceRAGSystem: Manages document ingestion, embedding, and retrieval
class InsuranceRAGSystem:
    def __init__(self, data_dir="data", db_dir="chroma_db"):
        self.data_dir = Path(data_dir)
        self.db_dir = Path(db_dir)
        self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
        self.vectorstore = None
        logger.info("Initializing InsuranceRAGSystem")
        self.embed_documents()

    def initialize_data_dir(self):
        """Create data directory and sample document if empty."""
        if not self.data_dir.exists():
            self.data_dir.mkdir()
            self.create_sample_document()
        elif not any(self.data_dir.iterdir()):
            self.create_sample_document()
        logger.info(f"Data directory initialized at {self.data_dir}")

    def create_sample_document(self):
        """Create a sample insurance document if none exist."""
        sample_content = """
        # Insurance Basics
        ## Auto Insurance Types
        - **Liability Coverage**: Covers damages to others if you're at fault in an accident.
        - **Collision Coverage**: Covers damage to your car from a collision.
        - **Comprehensive Coverage**: Covers non-collision damage (e.g., theft, natural disasters).
        - **Personal Injury Protection (PIP)**: Covers medical expenses for you and your passengers.
        ## Home Insurance Claims
        - **Step 1**: Contact your insurance provider immediately.
        - **Step 2**: Document the damage with photos and videos.
        - **Step 3**: Submit a claim form with detailed descriptions.
        """
        with open(self.data_dir / "sample_insurance_info.txt", "w") as f:
            f.write(sample_content)
        logger.info("Created sample document")

    def load_documents(self):
        """Load and process documents from data directory."""
        documents = []
        for file_path in self.data_dir.iterdir():
            if file_path.suffix.lower() in [".pdf", ".txt", ".docx"]:
                content = self.read_file(file_path)
                if content:
                    doc = Document(page_content=content, metadata={"source": str(file_path)})
                    documents.append(doc)
                    logger.info(f"Loaded document: {file_path}")
        return documents

    def read_file(self, file_path):
        """Read content from PDF, text, or Word files."""
        try:
            if file_path.suffix.lower() == ".pdf":
                with open(file_path, "rb") as f:
                    pdf = pypdf.PdfReader(f)
                    text = "".join(page.extract_text() for page in pdf.pages if page.extract_text())
                    return text
            elif file_path.suffix.lower() == ".txt":
                with open(file_path, "r", encoding="utf-8") as f:
                    return f.read()
            elif file_path.suffix.lower() == ".docx":
                doc = DocxDocument(file_path)
                return "\n".join(paragraph.text for paragraph in doc.paragraphs if paragraph.text)
            return ""
        except Exception as e:
            logger.error(f"Error reading {file_path}: {e}")
            return ""

    def embed_documents(self):
        """Embed all documents and store in ChromaDB."""
        try:
            self.initialize_data_dir()
            documents = self.load_documents()
            if not documents:
                self.create_sample_document()
                documents = self.load_documents()
            
            chunks = self.text_splitter.split_documents(documents)
            self.vectorstore = Chroma.from_documents(
                documents=chunks,
                embedding=embeddings,
                persist_directory=str(self.db_dir)
            )
            self.vectorstore.persist()
            logger.info(f"Embedded {len(chunks)} document chunks into {self.db_dir}")
        except Exception as e:
            logger.error(f"Error embedding documents: {e}")
            raise

    def search(self, query):
        """Search for relevant documents based on query."""
        if not self.vectorstore:
            logger.warning("Vectorstore not initialized")
            return []
        try:
            docs = self.vectorstore.similarity_search(query, k=3)
            logger.info(f"Retrieved {len(docs)} documents for query: {query}")
            return docs
        except Exception as e:
            logger.error(f"Error searching documents: {e}")
            return []

    def stats(self):
        """Return statistics about stored documents."""
        if not self.vectorstore:
            return "No documents loaded."
        count = self.vectorstore._collection.count()
        logger.info(f"Vectorstore stats: {count} documents")
        return f"Number of documents: {count}"

    def reset(self):
        """Reset the vectorstore."""
        try:
            if self.db_dir.exists():
                for item in self.db_dir.iterdir():
                    if item.is_file():
                        item.unlink()
                    elif item.is_dir():
                        for subitem in item.iterdir():
                            subitem.unlink()
                        item.rmdir()
                self.db_dir.rmdir()
            self.embed_documents()
            logger.info("Vectorstore reset and re-embedded")
        except Exception as e:
            logger.error(f"Error resetting vectorstore: {e}")

# InsuranceMultiAgentSystem: Manages collaborative agents with enhanced Agno features
class InsuranceMultiAgentSystem:
    def __init__(self, rag_system, llm):
        self.rag_system = rag_system
        self.llm = llm
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        try:
            self.sqlite_db = SqliteMemoryDb(table_name="insurance_memories", db_file="tmp/agent.db")
            logger.info("Initialized SQLite memory at tmp/agent.db")
        except Exception as e:
            logger.error(f"Error initializing SQLite memory: {e}")
            raise
        self.initialize_agents()

    def initialize_agents(self):
        """Initialize the team of agents with Agno features."""
        # Define agent instructions
        knowledge_retriever_instructions = [
            "You are a KnowledgeRetriever. Use the provided context to answer the user's question.",
            "Focus on retrieving accurate information from insurance documents.",
            "Use reasoning to ensure relevance and accuracy.",
            "Summarize document content if it is lengthy."
        ]
        claims_specialist_instructions = [
            "You are a ClaimsSpecialist. Provide detailed guidance on handling insurance claims.",
            "Explain steps clearly, including any documentation or processes required.",
            "Use reasoning to address edge cases and potential user concerns."
        ]
        policy_advisor_instructions = [
            "You are a PolicyAdvisor. Offer expert advice on insurance policy options.",
            "Explain policy types, coverage details, and considerations for choosing them.",
            "Use reasoning to tailor advice to the user's query."
        ]
        customer_service_instructions = [
            "You are a CustomerService agent. Greet the user warmly and coordinate responses.",
            "Ensure the user's question is clearly understood before passing it to other agents.",
            "Use memory to personalize responses based on past interactions."
        ]
        lead_agent_instructions = [
            "You are a LeadAgent coordinating a team of specialized insurance agents.",
            "Collect responses from the KnowledgeRetriever, ClaimsSpecialist, PolicyAdvisor, and CustomerService agents.",
            "Use reasoning to summarize and refine their inputs into a clear, concise, and accurate response.",
            "Return the response in JSON format with fields: 'answer' (string), 'sources' (list of strings), and 'confidence' (float between 0 and 1)."
        ]

        # Initialize sub-agents with ReasoningTools and DocumentSummaryTool
        self.knowledge_retriever = Agent(
            name="KnowledgeRetriever",
            model=self.llm,
            instructions=knowledge_retriever_instructions,
            tools=[ReasoningTools(add_instructions=True), DocumentSummaryTool()],
            memory=Memory(db=self.sqlite_db),
            enable_agentic_memory=True,
            description="Retrieves and summarizes information from insurance documents.",
            markdown=True
        )
        self.claims_specialist = Agent(
            name="ClaimsSpecialist",
            model=self.llm,
            instructions=claims_specialist_instructions,
            tools=[ReasoningTools(add_instructions=True)],
            memory=Memory(db=self.sqlite_db),
            enable_agentic_memory=True,
            description="Specializes in insurance claim processes.",
            markdown=True
        )
        self.policy_advisor = Agent(
            name="PolicyAdvisor",
            model=self.llm,
            instructions=policy_advisor_instructions,
            tools=[ReasoningTools(add_instructions=True)],
            memory=Memory(db=self.sqlite_db),
            enable_agentic_memory=True,
            description="Advises on insurance policy options.",
            markdown=True
        )
        self.customer_service = Agent(
            name="CustomerService",
            model=self.llm,
            instructions=customer_service_instructions,
            tools=[ReasoningTools(add_instructions=True)],
            memory=Memory(db=self.sqlite_db),
            enable_agentic_memory=True,
            description="Coordinates responses and greets users.",
            markdown=True
        )

        # Initialize lead agent with team
        self.lead_agent = Agent(
            name="LeadAgent",
            model=self.llm,
            team=[self.knowledge_retriever, self.claims_specialist, self.policy_advisor, self.customer_service],
            instructions=lead_agent_instructions,
            tools=[ReasoningTools(add_instructions=True)],
            memory=Memory(db=self.sqlite_db),
            enable_agentic_memory=True,
            description="Coordinates the insurance agent team and provides JSON responses.",
            markdown=True
        )
        logger.info("Agents initialized with ReasoningTools, DocumentSummaryTool, and SQLite memory")

    def _store_message(self, message: str, response: str):
        """Store a message and response in SQLite database."""
        try:
            conn = sqlite3.connect("tmp/agent.db")
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS insurance_memories (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    message TEXT,
                    response TEXT,
                    timestamp TEXT
                )
            """)
            cursor.execute(
                "INSERT INTO insurance_memories (message, response, timestamp) VALUES (?, ?, ?)",
                (message, response, datetime.now().isoformat())
            )
            conn.commit()
            conn.close()
            logger.info("Stored message in SQLite database")
        except Exception as e:
            logger.error(f"Error storing message: {e}")

    def _retrieve_messages(self, limit: int = 3) -> List[Dict[str, str]]:
        """Retrieve recent messages from SQLite database."""
        try:
            conn = sqlite3.connect("tmp/agent.db")
            cursor = conn.cursor()
            cursor.execute(
                "SELECT message, response FROM insurance_memories ORDER BY timestamp DESC LIMIT ?",
                (limit,)
            )
            messages = [{"message": row[0], "response": row[1]} for row in cursor.fetchall()]
            conn.close()
            logger.info(f"Retrieved {len(messages)} messages from SQLite database")
            return messages
        except Exception as e:
            logger.warning(f"Error retrieving messages: {e}")
            return []

    def answer_question(self, question):
        """Process a question through the agent team."""
        try:
            # Retrieve recent memory messages
            past_memories = self._retrieve_messages(limit=3)
            memory_context = "\n".join(
                [f"Past Q: {m['message']}\nPast A: {m['response']}" for m in past_memories]
            )
            if memory_context:
                logger.info("Retrieved recent past memories for personalization")
            
            # Retrieve relevant documents
            docs = self.rag_system.search(question)
            context = "\n".join([doc.page_content for doc in docs])
            sources = [doc.metadata['source'] for doc in docs]
            
            # CustomerService initiates the conversation
            cs_prompt = (
                f"Greetings! I'm here to help with your insurance question: {question}\n"
                f"Past interactions:\n{memory_context}\n"
                f"Instructions: {self.customer_service.instructions[0]}"
            )
            cs_response = self.llm.invoke(cs_prompt).content
            
            # KnowledgeRetriever uses document context and summarizes if needed
            doc_summary = DocumentSummaryTool().run(context) if context else "No relevant documents found."
            kr_prompt = (
                f"Question: {question}\n"
                f"Document Summary: {doc_summary}\n"
                f"Raw Context: {context}\n"
                f"Instructions: {self.knowledge_retriever.instructions[0]}"
            )
            kr_response = self.llm.invoke(kr_prompt).content
            
            # ClaimsSpecialist and PolicyAdvisor provide specialized input
            cspecial_prompt = f"Question: {question}\nInstructions: {self.claims_specialist.instructions[0]}"
            cspecial_response = self.llm.invoke(cspecial_prompt).content
            
            pa_prompt = f"Question: {question}\nInstructions: {self.policy_advisor.instructions[0]}"
            pa_response = self.llm.invoke(pa_prompt).content
            
            # LeadAgent summarizes with JSON output
            inputs = (
                f"CustomerService: {cs_response}\n"
                f"KnowledgeRetriever: {kr_response}\n"
                f"ClaimsSpecialist: {cspecial_response}\n"
                f"PolicyAdvisor: {pa_response}"
            )
            json_response = self.llm.invoke(
                f"Question: {question}\nInputs from team:\n{inputs}\nInstructions: {self.lead_agent.instructions[0]}\n"
                "Return the response in JSON format with fields: 'answer' (string), 'sources' (list of strings), and 'confidence' (float)."
            ).content
            
            try:
                response_dict = json.loads(json_response)
                if not isinstance(response_dict, dict) or 'answer' not in response_dict:
                    response_dict = {
                        'answer': json_response,
                        'sources': sources,
                        'confidence': 0.9
                    }
            except json.JSONDecodeError:
                response_dict = {
                    'answer': json_response,
                    'sources': sources,
                    'confidence': 0.9
                }
            
            # Update LangChain and SQLite memory
            self.memory.save_context({"input": question}, {"output": response_dict['answer']})
            self._store_message(question, response_dict['answer'])
            
            logger.info(f"Processed question: {question}")
            return response_dict, docs
        except Exception as e:
            logger.error(f"Error processing question: {e}")
            return {"answer": "An error occurred while processing your question.", "sources": [], "confidence": 0.0}, []



In [None]:
# Main program
def main():
    # Initialize systems
    rag_system = InsuranceRAGSystem()
    multi_agent_system = InsuranceMultiAgentSystem(rag_system, llm)

    # Test with sample questions
    sample_questions = [
        "What types of auto insurance should I consider?",
        "How do I file a home insurance claim?"
    ]

    print("Testing with sample questions:")
    for question in sample_questions:
        print(f"\nQuestion: {question}")
        response, docs = multi_agent_system.answer_question(question)
        print(f"Answer: {response['answer']}")
        print(f"Sources: {response['sources']}")
        print(f"Confidence: {response['confidence']}")
        print("Relevant Documents:")
        for doc in docs:
            print(f"- {doc.metadata['source']}: {doc.page_content[:100]}...")

    # Interactive mode
    print("\nInteractive Mode - Type your question or use commands: stats, search [query], reload, reset, quit")
    while True:
        user_input = input("\nYour input: ").strip()
        if user_input.lower() == "quit":
            break
        elif user_input.lower() == "stats":
            print(rag_system.stats())
        elif user_input.lower().startswith("search "):
            query = user_input[7:].strip()
            docs = rag_system.search(query)
            print("Search Results:")
            for doc in docs:
                print(f"- {doc.metadata['source']}: {doc.page_content[:100]}...")
        elif user_input.lower() == "reload":
            rag_system.embed_documents()
            print("Documents reloaded.")
        elif user_input.lower() == "reset":
            rag_system.reset()
            print("Vectorstore reset.")
        else:
            response, docs = multi_agent_system.answer_question(user_input)
            print(f"Answer: {response['answer']}")
            print(f"Sources: {response['sources']}")
            print(f"Confidence: {response['confidence']}")
            print("Relevant Documents:")
            for doc in docs:
                print(f"- {doc.metadata['source']}: {doc.page_content[:100]}...")

if __name__ == "__main__":
    main()