# ⚠️ Risk Analyser Agent with Azure AI Agents & CosmosDB

This notebook demonstrates how to create an intelligent risk analysis agent using Azure AI Agents Service with Semantic Kernel. The agent specializes in fraud detection and risk assessment for insurance claims by connecting to Azure Cosmos DB to analyze claimant history, claim patterns, and risk indicators.

## What You'll Learn:
- 🤖 How to create Azure AI Agents for risk analysis
- 🕵️ Building fraud detection and risk assessment logic
- 🗄️ Integrating Cosmos DB for claim history analysis  
- 📊 Implementing risk scoring algorithms
- 🔍 Identifying fraud indicators and suspicious patterns
- ⚖️ Providing evidence-based risk assessments


## 📋 Required Packages and Imports

This cell imports all necessary packages for Azure AI Agents with Semantic Kernel and checks the environment setup.

In [1]:
# Let's import everything we need and set up our environment
import asyncio
import os
from typing import Annotated

from azure.identity.aio import DefaultAzureCredential
from semantic_kernel.agents import AzureAIAgent, AzureAIAgentSettings, AzureAIAgentThread
from semantic_kernel.functions import kernel_function

print("📦 All packages imported successfully!")
print("\n🔧 Environment check:")

# Check if we're in a Jupyter environment for async handling
try:
    # Check if we're in Jupyter
    get_ipython()
    print("✅ Jupyter environment detected - async code will work properly")
except NameError:
    print("ℹ️  Running outside Jupyter - using asyncio.run() for async functions")

print("\n🎉 Setup complete! Ready to create your first agent.")

📦 All packages imported successfully!

🔧 Environment check:
✅ Jupyter environment detected - async code will work properly

🎉 Setup complete! Ready to create your first agent.


## 🔍 Environment Variables Check

This cell checks if all required Azure environment variables are set and displays their status.

In [2]:
required_vars = ['AZURE_OPENAI_ENDPOINT', 'AZURE_OPENAI_KEY', 'AZURE_OPENAI_DEPLOYMENT_NAME', 'AI_FOUNDRY_PROJECT_ENDPOINT', 'MODEL_DEPLOYMENT_NAME']
missing_vars = []

for var in required_vars:
    if var not in os.environ:
        missing_vars.append(var)
    else:
        print(f"✅ {var} is set")

if missing_vars:
    print(f"\n❌ Missing environment variables: {missing_vars}")
    print("\n🔧 Please set them using:")
    for var in missing_vars:
        print(f"   os.environ['{var}'] = 'your_value_here'")
else:
    print("\n🎉 All environment variables are properly configured!")

✅ AZURE_OPENAI_ENDPOINT is set
✅ AZURE_OPENAI_KEY is set
✅ AZURE_OPENAI_DEPLOYMENT_NAME is set
✅ AI_FOUNDRY_PROJECT_ENDPOINT is set
✅ MODEL_DEPLOYMENT_NAME is set

🎉 All environment variables are properly configured!


In [3]:
# Check if Azure CLI is logged in
import subprocess
import shutil
import json

def is_az_logged_in():
    az_path = shutil.which("az")
    if not az_path:
        print("❌ Azure CLI (az) not found in PATH.")
        return False

    try:
        result = subprocess.run(
            [az_path, "account", "show", "--output", "json"],
            capture_output=True,
            text=True,
            check=True
        )
        account_info = json.loads(result.stdout)
        print(f"✅ Logged in as: {account_info['user']['name']}")
        return True
    except subprocess.CalledProcessError:
        print("❌ Not logged in or error during az call. Please log in using 'az login' from the terminal.")
        return False

_ = is_az_logged_in()


✅ Logged in as: andrehe@microsoft.com


## 🗄️ CosmosDB Plugin Implementation

This cell creates a production-ready Cosmos DB plugin that connects to Azure Cosmos DB for retrieving insurance claim documents. Includes multiple kernel functions for querying by claim_id, document ID, custom SQL queries, field searches, and container management.

In [4]:
import json
from typing import List, Dict, Any, Optional

class CosmosDBPlugin:
    """
    A production-ready Cosmos DB plugin that connects to real Azure Cosmos DB.
    This plugin retrieves actual JSON documents from your database.
    """
    
    def __init__(self, endpoint: str = None, key: str = None, database_name: str = "MyDatabase", container_name: str = "MyContainer"):
        """
        Initialize the Cosmos DB plugin with connection details.
        For production, use environment variables or Azure Key Vault for credentials.
        """
        self.endpoint = endpoint or os.environ.get("COSMOS_ENDPOINT")
        self.key = key or os.environ.get("COSMOS_KEY") 
        self.database_name = "insurance_claims"
        self.container_name = "crash_reports"
        
        # Validate required connection parameters
        if not self.endpoint:
            print("⚠️ COSMOS_DB_ENDPOINT environment variable not set")
        if not self.key:
            print("⚠️ COSMOS_DB_KEY environment variable not set")
            print("💡 Consider using Managed Identity in production instead of keys")
    
    def _get_cosmos_client(self):
        """Create and return a Cosmos DB client."""
        try:
            from azure.cosmos import CosmosClient
            return CosmosClient(self.endpoint, self.key)
        except ImportError:
            raise ImportError("azure-cosmos package not installed. Run: pip install azure-cosmos")
        except Exception as e:
            raise Exception(f"Failed to create Cosmos DB client: {str(e)}")
    
    @kernel_function(description="Retrieve a document by claim_id from Cosmos DB using cross-partition query")
    def get_document_by_claim_id(
        self, 
        claim_id: Annotated[str, "The claim_id to retrieve (not the partition key)"]
    ) -> Annotated[str, "JSON document from Cosmos DB"]:
        """Retrieve a document by its claim_id using a cross-partition query."""
        try:
            client = self._get_cosmos_client()
            database = client.get_database_client(self.database_name)
            container = database.get_container_client(self.container_name)
            
            # Use SQL query to find document by claim_id across all partitions
            query = "SELECT * FROM c WHERE c.claim_id = @claim_id"
            parameters = [{"name": "@claim_id", "value": claim_id}]
            
            items = list(container.query_items(
                query=query,
                parameters=parameters,
                enable_cross_partition_query=True,
                max_item_count=1  # We expect only one document with this claim_id
            ))
            
            if not items:
                return f"❌ No document found with claim_id '{claim_id}' in container '{self.container_name}'"
            
            # Return the first (and should be only) matching document
            document = items[0]
            return json.dumps(document, indent=2, ensure_ascii=False)
            
        except Exception as e:
            error_msg = str(e)
            if "Unauthorized" in error_msg or "401" in error_msg:
                return f"❌ Authentication failed. Please check your Cosmos DB credentials."
            elif "Forbidden" in error_msg or "403" in error_msg:
                return f"❌ Access denied. Please check your Cosmos DB permissions."
            else:
                return f"❌ Error retrieving document by claim_id '{claim_id}': {error_msg}"
    
    @kernel_function(description="Retrieve a JSON document by partition key and document ID from Cosmos DB")
    def get_document_by_id(
        self, 
        document_id: Annotated[str, "The document ID to retrieve"],
        partition_key: Annotated[str, "The partition key value (optional, will use cross-partition query if not provided)"] = None
    ) -> Annotated[str, "JSON document from Cosmos DB"]:
        """Retrieve a specific document by its ID and optionally partition key from Cosmos DB."""
        try:
            client = self._get_cosmos_client()
            database = client.get_database_client(self.database_name)
            container = database.get_container_client(self.container_name)
            
            if partition_key:
                # Direct read using partition key - most efficient
                item = container.read_item(item=document_id, partition_key=partition_key)
                return json.dumps(item, indent=2, ensure_ascii=False)
            else:
                # Cross-partition query when partition key is unknown
                query = "SELECT * FROM c WHERE c.id = @document_id"
                parameters = [{"name": "@document_id", "value": document_id}]
                
                items = list(container.query_items(
                    query=query,
                    parameters=parameters,
                    enable_cross_partition_query=True,
                    max_item_count=1
                ))
                
                if not items:
                    return f"❌ Document with ID '{document_id}' not found in container '{self.container_name}'"
                
                return json.dumps(items[0], indent=2, ensure_ascii=False)
            
        except Exception as e:
            error_msg = str(e)
            if "NotFound" in error_msg or "404" in error_msg:
                return f"❌ Document with ID '{document_id}' not found in container '{self.container_name}'"
            elif "Unauthorized" in error_msg or "401" in error_msg:
                return f"❌ Authentication failed. Please check your Cosmos DB credentials."
            elif "Forbidden" in error_msg or "403" in error_msg:
                return f"❌ Access denied. Please check your Cosmos DB permissions."
            else:
                return f"❌ Error retrieving document: {error_msg}"
    
    @kernel_function(description="Query documents with a custom SQL query in Cosmos DB")
    def query_documents(
        self, 
        sql_query: Annotated[str, "SQL query to execute (e.g., 'SELECT * FROM c WHERE c.category = \"electronics\"')"]
    ) -> Annotated[str, "Query results as JSON"]:
        """Execute a custom SQL query against the Cosmos DB container."""
        try:
            client = self._get_cosmos_client()
            database = client.get_database_client(self.database_name)
            container = database.get_container_client(self.container_name)
            
            # Execute the query
            items = list(container.query_items(
                query=sql_query,
                enable_cross_partition_query=True  # Enable if your query spans partitions
            ))
            
            if not items:
                return f"🔍 No documents found matching query: {sql_query}"
            
            # Return results as formatted JSON
            result = {
                "query": sql_query,
                "count": len(items),
                "results": items
            }
            
            return json.dumps(result, indent=2, ensure_ascii=False)
            
        except Exception as e:
            error_msg = str(e)
            if "Syntax error" in error_msg:
                return f"❌ SQL syntax error in query: {sql_query}\nError: {error_msg}"
            else:
                return f"❌ Error executing query: {error_msg}"
    
    @kernel_function(description="Get container information and statistics")
    def get_container_info(self) -> Annotated[str, "Container information and statistics"]:
        """Get information about the Cosmos DB container."""
        try:
            client = self._get_cosmos_client()
            database = client.get_database_client(self.database_name)
            container = database.get_container_client(self.container_name)
            
            # Get container properties
            container_props = container.read()
            
            # Get approximate document count (this is an estimate)
            count_query = "SELECT VALUE COUNT(1) FROM c"
            count_items = list(container.query_items(
                query=count_query,
                enable_cross_partition_query=True
            ))
            document_count = count_items[0] if count_items else "Unknown"
            
            info = {
                "database": self.database_name,
                "container": self.container_name,
                "partition_key": container_props.get("partitionKey", {}).get("paths", ["Unknown"]),
                "approximate_document_count": document_count,
                "indexing_policy": container_props.get("indexingPolicy", {}).get("indexingMode", "Unknown")
            }
            
            return json.dumps(info, indent=2)
            
        except Exception as e:
            return f"❌ Error getting container info: {str(e)}"
    
    @kernel_function(description="List recent documents (up to 100) from Cosmos DB")
    def list_recent_documents(
        self, 
        limit: Annotated[int, "Maximum number of documents to return (default: 10, max: 100)"] = 10
    ) -> Annotated[str, "List of recent documents"]:
        """List recent documents from the container."""
        try:
            # Ensure limit is within bounds
            limit = max(1, min(limit, 100))
            
            client = self._get_cosmos_client()
            database = client.get_database_client(self.database_name)
            container = database.get_container_client(self.container_name)
            
            # Query for documents (ordered by _ts if available)
            query = f"SELECT TOP {limit} * FROM c ORDER BY c._ts DESC"
            
            items = list(container.query_items(
                query=query,
                enable_cross_partition_query=True
            ))
            
            if not items:
                return "📭 No documents found in the container"
            
            result = {
                "container": self.container_name,
                "count": len(items),
                "documents": items
            }
            
            return json.dumps(result, indent=2, ensure_ascii=False)
            
        except Exception as e:
            return f"❌ Error listing documents: {str(e)}"
    
    @kernel_function(description="Search documents by field value")
    def search_by_field(
        self, 
        field_name: Annotated[str, "The field name to search in (e.g., 'name', 'category', 'status')"],
        field_value: Annotated[str, "The value to search for"]
    ) -> Annotated[str, "Documents matching the search criteria"]:
        """Search for documents where a specific field matches a value."""
        try:
            client = self._get_cosmos_client()
            database = client.get_database_client(self.database_name)
            container = database.get_container_client(self.container_name)
            
            # Use parameterized query for better security and performance
            query = f"SELECT * FROM c WHERE c.{field_name} = @field_value"
            parameters = [{"name": "@field_value", "value": field_value}]
            
            items = list(container.query_items(
                query=query,
                parameters=parameters,
                enable_cross_partition_query=True
            ))
            
            if not items:
                return f"🔍 No documents found where {field_name} = '{field_value}'"
            
            result = {
                "search_criteria": f"{field_name} = '{field_value}'",
                "count": len(items),
                "documents": items
            }
            
            return json.dumps(result, indent=2, ensure_ascii=False)
            
        except Exception as e:
            return f"❌ Error searching documents: {str(e)}"

# Create an instance of the real Cosmos DB plugin
cosmos_plugin = CosmosDBPlugin()

## 🚀 Test Claim Reviewer Agent

This cell creates and tests an Azure AI Agent configured as a claim assessor that connects to Cosmos DB to retrieve claim information and perform detailed damage evaluation and cost assessment with validation capabilities.

In [5]:
async def test_corrected_cosmosdb_agent():
    """Test the agent with the corrected Cosmos DB plugin that handles claim_id properly."""
    
    print("🚀 Testing Agent with Corrected Cosmos DB Plugin")
    print("=" * 60)
    
    try:
        # Create client for the agent
        client = AzureAIAgent.create_client(
            credential=DefaultAzureCredential(), 
            endpoint=os.environ.get("AI_FOUNDRY_PROJECT_ENDPOINT")
        )
        
        # Create agent definition with updated instructions
        # Fixed: Agent name must match pattern '^[0-9A-Za-z_-]+$' (no spaces allowed)
        agent_definition = await client.agents.create_agent(
            model=os.environ.get("MODEL_DEPLOYMENT_NAME"),
            name="risk-analyser",  # Changed from "Cosmos DB Claim Assistant"
            instructions="""You are a risk analyst specializing in fraud detection and risk assessment for insurance claims.

        Your responsibilities:
        - Analyze claimant history and claim frequency patterns.
        - Identify potential fraud indicators.
        - Assess risk factors based on incident details.
        - Evaluate supporting documentation quality.
        - Provide risk scoring and recommendations.

        Potencial risk and fraud indicators:
        - High frequency of claims from the same claimant.

        Focus on objective risk factors and provide evidence-based assessments.
        End your assessment with risk level: LOW_RISK, MEDIUM_RISK, or HIGH_RISK."""
        )
        
        # Create agent with the corrected Cosmos DB plugin
        claim_agent = AzureAIAgent(
            client=client,
            definition=agent_definition,
            plugins=[cosmos_plugin]  # Use the corrected plugin instance
        )
        
        print(f"✅ Created agent: {claim_agent.name}")
        
        # Test questions specifically for claim retrieval
        test_questions = [
            "Retrieve information for CL001"
        ]
        
        print(f"\n💬 Testing with {len(test_questions)} questions...")
        print("=" * 60)
        
        claim_thread = None
        
        for i, question in enumerate(test_questions, 1):
            print(f"\n{i}. 👤 User: {question}")
            
            try:
                response = await claim_agent.get_response(
                    messages=question,
                    thread=claim_thread
                )
                
                print(f"   🤖 Agent: {response}")
                claim_thread = response.thread
                print("-" * 60)
                
            except Exception as e:
                print(f"   ❌ Error with question {i}: {str(e)}")
                print("-" * 60)
        
        print(f"\n✅ Agent test completed!")
        
        agent = await client.agents.get_agent(claim_agent.id)
        print(f"Agent ID: {agent.id}")
        print(f"Agent Name: {agent.name}")
        # Ensure we await the async delete to avoid RuntimeWarning
        await client.agents.delete_agent(agent.id)

    except Exception as e:
        print(f"❌ Error testing agent: {str(e)}")
        import traceback
        traceback.print_exc()

print("🎯 Ready to test! Run the cell below to execute the test:")
print("await test_corrected_cosmosdb_agent()")

🎯 Ready to test! Run the cell below to execute the test:
await test_corrected_cosmosdb_agent()


In [7]:
# Execute the corrected agent test
await test_corrected_cosmosdb_agent()

🚀 Testing Agent with Corrected Cosmos DB Plugin
✅ Created agent: risk-analyser

💬 Testing with 1 questions...

1. 👤 User: Retrieve information for CL001
✅ Created agent: risk-analyser

💬 Testing with 1 questions...

1. 👤 User: Retrieve information for CL001
   🤖 Agent: Claim ID: CL001 involves two reported vehicle incidents associated with claimant John Peterson.

Incident 1 (July 17, 2025):
- Vehicle: 2004 Honda Accord, Silver
- Location: Parking lot, 2325 Main Street, Springfield, OH
- Incident: Vehicle legally parked; struck on front and driver's side by gray pickup.
- Damage: Front bumper detached, severe dents and scraping on driver’s side front quarter; damaged front wheel and tire; paint transfer and deformities on driver's door; headlamp and grille damage; possible hidden mechanical damage.
- Witness: Melissa Grant
- Police report filed with Springfield Police Dept, report no. 25-52814.
- Repair shop: Springfield Auto Body.
- Attachments include photos, police report, and repai