In [1]:
import os
from dotenv import load_dotenv
from llama_index.core import Settings
load_dotenv()

True

In [2]:
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding

# for Azure OpenAI model
api_key = os.getenv('AZURE_OPENAI_API_KEY')
azure_endpoint = os.getenv('AZURE_OPENAI_ENDPOINT')
gpt_api_version = os.getenv('AZURE_GPT_API_VERSION')
embedding_api_version = os.getenv('AZURE_EMBEDDING_API_VERSION')

llm = AzureOpenAI(
    model="gpt-4o",
    deployment_name="gpt-4o",
    api_key=api_key,
    azure_endpoint=azure_endpoint,
    api_version=gpt_api_version,
)
embed_model = AzureOpenAIEmbedding(
    model="text-embedding-3-small",
    deployment_name="text-embedding-3-small",
    api_key=api_key,
    azure_endpoint=azure_endpoint,
    api_version=embedding_api_version,
)   

Settings.llm = llm
Settings.embed_model = embed_model

In [3]:
from llama_index.core.workflow import Context
import json
    
async def read_dun_and_bradstreet() -> str:
    
    """Read existing dun and bradstreet sample data in JSON format and return it as text."""
    try:
        with open("../data/stage3/dun&bradstreet.json", "r") as file:
            data = json.load(file)
        # Print the JSON structure for debugging
        print(f"Found {len(data)} submissions in the database")
                
        # If the data is a list of submissions instead of a dictionary
        if isinstance(data, list):
            formatted_data = {}
            for i, submission in enumerate(data):
                formatted_data[f"Submission_{i+1}"] = submission
            data = formatted_data

            # Convert JSON data to formatted text
            result = []
            for key, value in data.items():
                if isinstance(value, dict):
                    result.append(f"{key}:")
                    for sub_key, sub_value in value.items():
                        result.append(f"  {sub_key}: {sub_value}")
                else:
                    result.append(f"{key}: {value}")
            
            return "\n".join(result)
    except Exception as e:
        return f"Error reading JSON file: {str(e)}"
    
async def read_internal_company_check() -> str:
    
    """Read internal company check sample data in JSON format and return it as text."""
    try:
        with open("../data/stage3/internalcompanycheck.json", "r") as file:
            data = json.load(file)
        # Print the JSON structure for debugging
        print(f"Found {len(data)} submissions in the database")
                
        # If the data is a list of submissions instead of a dictionary
        if isinstance(data, list):
            formatted_data = {}
            for i, submission in enumerate(data):
                formatted_data[f"Submission_{i+1}"] = submission
            data = formatted_data

            # Convert JSON data to formatted text
            result = []
            for key, value in data.items():
                if isinstance(value, dict):
                    result.append(f"{key}:")
                    for sub_key, sub_value in value.items():
                        result.append(f"  {sub_key}: {sub_value}")
                else:
                    result.append(f"{key}: {value}")
            
            return "\n".join(result)
    except Exception as e:
        return f"Error reading JSON file: {str(e)}"
    
async def read_companies_house() -> str:
    
    """Read companies house sample data in JSON format and return it as text."""
    try:
        with open("../data/stage3/companyhouse.json", "r") as file:
            data = json.load(file)
        # Print the JSON structure for debugging
        print(f"Found {len(data)} submissions in the database")
                
        # If the data is a list of submissions instead of a dictionary
        if isinstance(data, list):
            formatted_data = {}
            for i, submission in enumerate(data):
                formatted_data[f"Submission_{i+1}"] = submission
            data = formatted_data

            # Convert JSON data to formatted text
            result = []
            for key, value in data.items():
                if isinstance(value, dict):
                    result.append(f"{key}:")
                    for sub_key, sub_value in value.items():
                        result.append(f"  {sub_key}: {sub_value}")
                else:
                    result.append(f"{key}: {value}")
            
            return "\n".join(result)
    except Exception as e:
        return f"Error reading JSON file: {str(e)}"
    
async def read_companies_house() -> str:
    
    """Read companies house sample data in JSON format and return it as text."""
    try:
        with open("../data/stage3/companyhouse.json", "r") as file:
            data = json.load(file)
        # Print the JSON structure for debugging
        print(f"Found {len(data)} submissions in the database")
                
        # If the data is a list of submissions instead of a dictionary
        if isinstance(data, list):
            formatted_data = {}
            for i, submission in enumerate(data):
                formatted_data[f"Submission_{i+1}"] = submission
            data = formatted_data

            # Convert JSON data to formatted text
            result = []
            for key, value in data.items():
                if isinstance(value, dict):
                    result.append(f"{key}:")
                    for sub_key, sub_value in value.items():
                        result.append(f"  {sub_key}: {sub_value}")
                else:
                    result.append(f"{key}: {value}")
            
            return "\n".join(result)
    except Exception as e:
        return f"Error reading JSON file: {str(e)}"
    
async def read_company_database() -> str:
    
    """Read companies house sample data in JSON format and return it as text."""
    try:
        with open("../data/stage3/companydatabase.json", "r") as file:
            data = json.load(file)
        # Print the JSON structure for debugging
        print(f"Found {len(data)} submissions in the database")
                
        # If the data is a list of submissions instead of a dictionary
        if isinstance(data, list):
            formatted_data = {}
            for i, submission in enumerate(data):
                formatted_data[f"Submission_{i+1}"] = submission
            data = formatted_data

            # Convert JSON data to formatted text
            result = []
            for key, value in data.items():
                if isinstance(value, dict):
                    result.append(f"{key}:")
                    for sub_key, sub_value in value.items():
                        result.append(f"  {sub_key}: {sub_value}")
                else:
                    result.append(f"{key}: {value}")
            
            return "\n".join(result)
    except Exception as e:
        return f"Error reading JSON file: {str(e)}"

async def write_report(ctx: Context, report_content: str, report_section: str) -> str:
    """Useful for writing and updating a report. Your input should be a markdown formatted report section."""
    current_state = await ctx.get("state")
    if "report_content" not in current_state:
        current_state["report_content"] = {}
    # current_state["report_content"] = report_content
    current_state["report_content"][report_section] = report_content
    await ctx.set("state", current_state)
    return "Report updated."

async def write_email(ctx: Context, email: str) -> str:
    """Useful for writing and updating a report. Your input should be a markdown formatted report section."""
    current_state = await ctx.get("state")
    current_state["customer_email"] = email
    await ctx.set("state", current_state)
    print (f"\n Email content: {email}")
    return "email sent."

async def move_to_next_stage() -> str:
    """Useful for writing and updating a report. Your input should be a markdown formatted report section."""
    print("All good -> moving to data duplication check stage")
    return "Moving to next stage."

In [4]:
from llama_index.core.agent.workflow import FunctionAgent, ReActAgent

dnb_check_agent = FunctionAgent(
    name="DunAndBradStreetAgent",
    description="Useful for doing compliance checks on the submission using Dun and Bradstreet data.",
    system_prompt=(
        """"
        You are a Dun & Bradstreet Compliance Check Agent specialized in verifying insurance submission data against D&B records.

        Your responsibilities:
        1. Thoroughly analyze the broker quote submission
        2. Use the read_dun_and_bradstreet tool to retrieve D&B data for the company in question
        3. Conduct a comprehensive compliance check including:
            - Verify the company exists in D&B records
            - Check company name and address match with D&B data
            - Evaluate business size, revenue, and structure information
            - Assess risk ratings if available
            - Check for any red flags in the D&B report

        4. Document your findings using the write_report tool:
            - Create a section titled "Dun and Bradstreet Compliance Check"
            - Include all verification steps performed
            - Document any discrepancies found
            - Provide a clear compliance determination (Pass/Fail/Needs Additional Information)
            - Include specific references to D&B data points reviewed

        5. After completing your analysis, hand over control to the SanctionCheckAgent for further compliance verification

        Do a detailed compliance check of the submission and ensure you are thorough in your review before writing report section.
        After completing your D&B compliance report section, hand over to the SanctionCheckAgent with your findings for further processing and client communication.
        Always handover to the SanctionCheckAgent. The SanctionCheckAgent will handle the next steps.
        """),
    llm=llm,
    tools=[read_dun_and_bradstreet, write_report],
    can_handoff_to=["SanctionCheckAgent"],
)

sanction_check_agent = FunctionAgent(
    name="SanctionCheckAgent",
    description="Useful for doing sanction compliance checks on the submission using internal company check data.",
    system_prompt=(
        """"
        You are a Sanctions Compliance Check Agent specialized in verifying if insurance submissions have any ties to sanctioned countries or entities.

        Your responsibilities:
        1. Thoroughly analyze the broker quote submission for any potential sanctions compliance issues
        2. Use the read_internal_company_check tool to retrieve data about sanctioned countries and entities
        3. Conduct a comprehensive sanctions compliance check including:
            - Check if the company has operations in sanctioned countries
            - Verify if any company directors or beneficial owners are from sanctioned countries
            - Examine if there are any financial transactions with sanctioned entities
            - Assess any business relationships with sanctioned countries
            - Check for any red flags that might indicate sanctions evasion

        4. Review the existing report created by the Dun & Bradstreet agent
        5. Document your findings using the write_report tool:
            - Create a section titled "Sanctions Compliance Check"
            - Include all verification steps performed
            - Document any potential sanctions violations found
            - Provide a clear compliance determination (Pass/Fail/Needs Additional Information)
            - Include specific references to the sanctioned countries list reviewed

        6. After completing your analysis, hand over control to the CompaniesHouseCheckAgent for further verification

        Ensure your sanctions check is thorough and rigorous as this is a critical compliance requirement. Always document your reasoning clearly, particularly for any borderline cases. After completing your sanctions compliance report section, hand over to the CompaniesHouseCheckAgent with your findings for further processing.
        """),
    llm=llm,
    tools=[read_internal_company_check, write_report],
    can_handoff_to=["CompaniesHouseCheckAgent"],
)

companies_house_check_agent = FunctionAgent(
    name="CompaniesHouseCheckAgent",
    description="Useful for doing compliance checks on the submission using Companies House data.",
    system_prompt=(
        """"
        You are a Companies House Compliance Check Agent specialized in verifying insurance submission data against Companies House records.

        Your responsibilities:
        1. Thoroughly analyze the broker quote submission for any potential compliance issues
        2. Use the read_companies_house tool to retrieve Companies House data for the company in question
        3. Conduct a comprehensive compliance check including:
            - Verify the company exists in Companies House records
            - Check company name and address match with Companies House data
            - Evaluate business size, revenue, and structure information
            - Assess risk ratings if available
            - Check for any red flags in the Companies House report

        4. Document your findings using the write_report tool:
            - Create a section titled "Companies House Compliance Check"
            - Include all verification steps performed
            - Document any discrepancies found
            - Provide a clear compliance determination (Pass/Fail/Needs Additional Information)
            - Include specific references to Companies House data points reviewed

        5. After completing your analysis, hand over control to the CompanyDatabaseCheckAgent for further verification

        Ensure your compliance check is thorough and rigorous as this is a critical requirement. Always document your reasoning clearly, particularly for any borderline cases. After completing your Companies House compliance report section, hand over to the CompanyDatabaseCheckAgent with your findings for further processing.
        """),
    llm=llm,
    tools=[ read_companies_house, write_report],
    can_handoff_to=["CompanyDatabaseCheckAgent"],
)

company_database_check_agent = FunctionAgent(
    name="CompanyDatabaseCheckAgent",
    description="Useful for doing compliance checks on the submission using internal company database data.",
    system_prompt=(
        """
        You are a Company Database Compliance Check Agent specialized in verifying insurance submission data against internal company database records.

        Your responsibilities:
        1. Thoroughly analyze the broker quote submission for any potential compliance issues
        2. Use the read_company_database tool to retrieve internal company database data for the company in question
        3. Conduct a comprehensive compliance check including:
            - Verify the company exists in internal database records
            - Check company name and address match with internal database data
            - Evaluate business size, revenue, and structure information
            - Assess risk ratings if available
            - Check for any red flags in the internal database report

        4. Document your findings using the write_report tool:
            - Create a section titled "Internal Company Database Compliance Check"
            - Include all verification steps performed
            - Document any discrepancies found
            - Provide a clear compliance determination (Pass/Fail/Needs Additional Information)
            - Include specific references to internal database data points reviewed
        
        5. If the company is not found in the internal database, you should approve this submission as it is a new company.

        6. After completing your analysis, hand over control to the EmailAgent for further communication with the broker

        Ensure your compliance check is thorough and rigorous as this is a critical requirement. Always document your reasoning clearly, particularly for any borderline cases. After completing your internal company database compliance report section, hand over to the EmailAgent with your findings for further processing.
        """),
    llm=llm,
    tools=[ read_company_database, write_report],
    can_handoff_to=["EmailAgent"],
)

email_agent = FunctionAgent(
    name="EmailAgent",
    description="Useful for drafting and sending emails to brokers regarding the compliance check results.",
    system_prompt=(
        """
        You are an Email Communication Agent specialized in drafting compliance-related communications to insurance brokers.

        Your responsibilities:
        1. Thoroughly review the complete compliance report generated by previous agents in the workflow
        2. Evaluate compliance status from all sections of the report:
            - Dun & Bradstreet Compliance Check
            - Sanctions Compliance Check
            - Companies House Compliance Check
            - Internal Company Database Compliance Check

        3. Determine overall compliance outcome:
            - If ANY compliance section has a "Fail" status or critical issues:
              * Draft a professional email to the broker using the write_email tool
              * Clearly identify the specific compliance issues found
              * Request additional information or documentation needed to resolve issues
              * Provide clear next steps for resubmission
              * Write in the persona of Yoda, with his distinctive speech pattern and wisdom
            
            - If ALL compliance checks are "Pass" or have only minor issues:
              * Use the move_to_next_stage tool to advance the submission to binding phase

        4. When writing emails as Yoda:
            - Use Yoda's distinctive inverted syntax (e.g., "Concerned about sanctions issues, I am.")
            - Include Yoda's wisdom and philosophical tone
            - Maintain professionalism despite the character persona
            - End with encouraging guidance in Yoda's style

        Remember that your communication represents the company officially, so while adopting Yoda's speech patterns, ensure all information is accurate, compliant with regulations, and provides clear next steps for the broker.
       """
    ),
    llm=llm,
    tools=[write_email, move_to_next_stage]
)

In [5]:
from llama_index.core.agent.workflow import AgentWorkflow

agent_workflow = AgentWorkflow(
    agents=[dnb_check_agent , sanction_check_agent, companies_house_check_agent, company_database_check_agent, email_agent],
    root_agent=dnb_check_agent.name,
    initial_state={
        "report_content": {},
        "customer_email": "not drafted yet."
    },
)

In [6]:
from llama_index.core.agent.workflow import (
    AgentInput,
    AgentOutput,
    ToolCall,
    ToolCallResult,
    AgentStream,
)

handler = agent_workflow.run(
    user_msg=(
        """
        Please triage the following property insurance quote submission from a broker:
        {
        "insurance_broker": "Prime Insurance Brokers",
        "date": "24 April 2025",
        "insurance_company": "Al Ameen Insurance",
        "address": "Office 801, Saffar Tower, Valiasr Street, Tehran, Iran",
        "recipient": "Mr. David Thompson",
        "subject": "Request for Property Insurance Quote for Parsian Evin Hotel Ltd.",
        "client": "Parsian Evin Hotel Ltd.",
        "property_information": {
        "location": "No. 45, Evin Street, Tehran, Iran",
        "type": "Hotel",
        "construction": "Modern design, reinforced concrete and steel, built in 2010, no recent renovations",
        "surface_area": "11,500 m²",
        "occupancy": "150-room hotel, luxury restaurant, and conference facilities"
        },
        "coverage_requirements": {
        "desired_coverage_amount": "IRR 800,000,000,000",
        "coverage_type": ["Fire", "theft", "guest property"],
        "deductibles": "IRR 500,000,000 per incident",
        "additional_coverage": [
            "Business interruption",
            "loss of revenue due to closure",
            "third-party liability"
        ]
        },
        "risk_assessment": {
        "fire_hazards": [
            "Fire alarm and sprinkler system in all rooms",
            "fire exits clearly marked"
        ],
        "natural_disasters": [
            "Low flood risk",
            "not located in an earthquake-prone area",
            "occasional sandstorms"
        ],
        "security_measures": [
            "CCTV surveillance",
            "24/7 security personnel",
            "secure entry systems"
        ]
        },
        "financial_information": {
        "property_value": "IRR 1,000,000,000,000",
        "business_revenue": "IRR 300,000,000,000 annually"
        },
        "contact_person": {
        "name": "Oliver Green",
        "email": "oliver.green@primeinsurance.com",
        "phone": "+971 4 234 5678"
        }
    }
        Please analyze this submission and determine if it meets all compliance check.
        """
    )
)

current_agent = None
current_tool_calls = ""

try:
    async for event in handler.stream_events():
        if (
            hasattr(event, "current_agent_name")
            and event.current_agent_name != current_agent
        ):
            current_agent = event.current_agent_name
            print(f"\n{'='*50}")
            print(f" 🤖 Agent: {current_agent}")
            print(f"{'='*50}\n")

        if isinstance(event, AgentStream):
            if event.delta:
                print(event.delta, end="", flush=True)
        elif isinstance(event, AgentInput):
            print("\n 📥 Input:", event.input)

        elif isinstance(event, AgentOutput):
            if event.response.content:
                print("\n 📤 Output:", event.response.content)
            if event.tool_calls:
                print(
                    "\n 🛠️  Planning to use tools:",
                    [call.tool_name for call in event.tool_calls],
                )
        elif isinstance(event, ToolCallResult):
            print(f" 🔧 Tool Result ({event.tool_name}):")
            print(f"  Arguments: {event.tool_kwargs}")
            print(f"  Output: {event.tool_output}")
        elif isinstance(event, ToolCall):
            print(f" 🔨 Calling Tool: {event.tool_name}")
            print(f"  With arguments: {event.tool_kwargs}")

except Exception as e:
    print(f"An error occurred: {e}")

response = await handler


 🤖 Agent: DunAndBradStreetAgent


 📥 Input: [ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='"\n        You are a Dun & Bradstreet Compliance Check Agent specialized in verifying insurance submission data against D&B records.\n\n        Your responsibilities:\n        1. Thoroughly analyze the broker quote submission\n        2. Use the read_dun_and_bradstreet tool to retrieve D&B data for the company in question\n        3. Conduct a comprehensive compliance check including:\n            - Verify the company exists in D&B records\n            - Check company name and address match with D&B data\n            - Evaluate business size, revenue, and structure information\n            - Assess risk ratings if available\n            - Check for any red flags in the D&B report\n\n        4. Document your findings using the write_report tool:\n            - Create a section titled "Dun and Bradstreet Compliance Check"\n       

****************************************************************************************************************************
# Email
Email content: Subject: Compliance Issues Identified for Property Insurance Submission

Dear Mr. Green,

Concerned about compliance issues, I am. Review your submission for "Parsian Evin Hotel Ltd," I have. Failed the sanctions compliance check, it has. Operates in Iran, a sanctioned country, the company does. Financial transactions involving Iran, compliance risks they pose.

Failed also, the Companies House compliance check has. Registered in Companies House, the company is not. Critical these issues are, for proceeding further.

Provide additional documentation or clarification, you must. Evidence of compliance with sanctions regulations, submit. Proof of registration in Companies House, include. Resolve these issues, only then can we proceed.

Guidance, I offer: consult with your compliance team, you should. Ensure all documentation aligns with regulatory requirements, you must. Resubmit the application, when ready you are.

May the path to resolution be clear for you.

Warm regards,

Yoda