# Policies Tool Migration to Cosmos DB

This notebook demonstrates migrating the policies tool from in-memory data to Cosmos DB with MongoDB API.

## Migration Goals
- Replace in-memory policy data with Cosmos DB storage
- Implement production-ready policy lookup functionality  
- Test policy retrieval scenarios
- Ensure data consistency and error handling

## Policy Data Structure
- Policy details (coverage, premiums, deductibles)
- Claims history
- Coverage information
- Billing data

## Setup and Imports

In [1]:
import os
import sys
import asyncio
import datetime
from typing import Optional, Dict, Any, List

# Add the project root to Python path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..', '..'))
sys.path.insert(0, project_root)

# Import Cosmos DB manager
from src.cosmosdb.manager import CosmosDBMongoCoreManager
from utils.ml_logging import get_logger

logger = get_logger("policies_migration")
print("Imports successful!")
print(f"Project root: {project_root}")

Imports successful!
Project root: /Users/jinle/Repos/_AIProjects/art-voice-agent-accelerator


## Initialize Cosmos DB Connection

In [2]:
# Initialize Cosmos DB for policies
try:
    cosmos = CosmosDBMongoCoreManager(
        database_name="voice_agent_db",
        collection_name="policies"  # Different collection for policy data
    )
    print("Cosmos DB connection established")
    print(f"Database: voice_agent_db")
    print(f"Collection: policies")
except Exception as e:
    print(f"Failed to connect to Cosmos DB: {e}")
    cosmos = None

Cosmos DB connection established
Database: voice_agent_db
Collection: policies


## Create Sample Policy Data

Define comprehensive policy data including Auto, Home, and Life insurance policies.

In [3]:
# Define comprehensive policy data
sample_policies = [
    {
        "_id": "POL-A10001",
        "policy_id": "POL-A10001",
        "policyholder_name": "Jane Smith",
        "policy_type": "Auto Insurance",
        "coverage": {
            "liability": "$100,000/$300,000",
            "collision": "$1,000 deductible",
            "comprehensive": "$500 deductible",
            "personal_injury_protection": "$50,000"
        },
        "vehicles": [
            {
                "year": 2019,
                "make": "Honda",
                "model": "Civic",
                "vin": "1HGBH41JXMN109186"
            }
        ],
        "premium": {
            "monthly": 125.50,
            "annual": 1506.00
        },
        "status": "active",
        "effective_date": "2023-01-15",
        "expiration_date": "2024-01-15",
        "claims_history": [
            {
                "claim_id": "CLM-A10001-9876",
                "date": "2023-06-15",
                "type": "collision",
                "amount": 3200.00,
                "status": "closed"
            }
        ],
        "created_at": datetime.datetime.utcnow().isoformat() + "Z",
        "updated_at": datetime.datetime.utcnow().isoformat() + "Z"
    },
    {
        "_id": "POL-A20002",
        "policy_id": "POL-A20002",
        "policyholder_name": "Alice Brown",
        "policy_type": "Home Insurance",
        "coverage": {
            "dwelling": "$350,000",
            "personal_property": "$175,000",
            "liability": "$300,000",
            "medical_payments": "$5,000",
            "loss_of_use": "$70,000"
        },
        "property": {
            "address": "123 Oak Street, Chicago, IL 60601",
            "type": "Single Family Home",
            "year_built": 1995,
            "square_feet": 2200
        },
        "premium": {
            "monthly": 183.33,
            "annual": 2200.00
        },
        "deductible": 1000,
        "status": "active",
        "effective_date": "2023-03-01",
        "expiration_date": "2024-03-01",
        "claims_history": [
            {
                "claim_id": "CLM-A20002-3344",
                "date": "2023-08-20",
                "type": "water damage",
                "amount": 5800.00,
                "status": "closed"
            }
        ],
        "coverage_details": {
            "rental_reimbursement": {
                "coverage": "$100/day for up to 24 months",
                "description": "Coverage for temporary housing expenses if your home becomes uninhabitable due to a covered loss"
            }
        },
        "created_at": datetime.datetime.utcnow().isoformat() + "Z",
        "updated_at": datetime.datetime.utcnow().isoformat() + "Z"
    },
    {
        "_id": "POL-A30003",
        "policy_id": "POL-A30003",
        "policyholder_name": "Alice Brown",
        "policy_type": "Life Insurance",
        "coverage": {
            "death_benefit": "$500,000",
            "type": "Term Life",
            "term_length": "20 years"
        },
        "premium": {
            "monthly": 45.00,
            "annual": 540.00
        },
        "beneficiaries": [
            {"name": "John Brown", "relationship": "Spouse", "percentage": 100}
        ],
        "status": "active",
        "effective_date": "2022-03-10",
        "expiration_date": "2042-03-10",
        "claims_history": [],
        "created_at": datetime.datetime.utcnow().isoformat() + "Z",
        "updated_at": datetime.datetime.utcnow().isoformat() + "Z"
    }
]

print("Sample policy data created:")
for policy in sample_policies:
    print(f"  - {policy['policy_id']}: {policy['policy_type']} for {policy['policyholder_name']}")

Sample policy data created:
  - POL-A10001: Auto Insurance for Jane Smith
  - POL-A20002: Home Insurance for Alice Brown
  - POL-A30003: Life Insurance for Alice Brown


## Insert Policy Data into Cosmos DB

In [4]:
async def insert_policy_data():
    """Insert sample policy data into Cosmos DB."""
    if not cosmos:
        print("Cosmos DB not initialized")
        return False
    
    try:
        for policy in sample_policies:
            try:
                result = await asyncio.to_thread(
                    cosmos.upsert_document,
                    document=policy,
                    query={"_id": policy["_id"]}
                )
                print(f"Upserted: {policy['policy_id']} ({policy['policyholder_name']})")
            except Exception as e:
                print(f"Failed to insert {policy['policy_id']}: {e}")
        
        print("Sample policy data inserted successfully!")
        return True
        
    except Exception as e:
        print(f"Failed to insert policy data: {e}")
        return False

# Run the insertion
if cosmos:
    await insert_policy_data()
else:
    print("Cannot insert data - Cosmos DB not available")

Upserted: POL-A10001 (Jane Smith)
Upserted: POL-A20002 (Alice Brown)
Upserted: POL-A30003 (Alice Brown)
Sample policy data inserted successfully!


## Test Policy Retrieval

In [5]:
async def test_policy_retrieval():
    """Test policy retrieval scenarios from Cosmos DB."""
    if not cosmos:
        print("Cannot test retrieval - Cosmos DB not available")
        return
    
    print("Testing Policy Retrieval Scenarios")
    print("=" * 40)
    
    # Test 1: Retrieve by Policy ID
    print("\nTest 1: Retrieve by Policy ID")
    try:
        policy = await asyncio.to_thread(
            cosmos.read_document,
            query={"policy_id": "POL-A20002"}
        )
        if policy:
            print(f"Found: {policy['policy_id']} - {policy['policy_type']} for {policy['policyholder_name']}")
            print(f"   Status: {policy['status']}")
            print(f"   Premium: ${policy['premium']['monthly']}/month")
        else:
            print("Policy not found")
    except Exception as e:
        print(f"Error: {e}")
    
    # Test 2: Retrieve by Policyholder Name
    print("\nTest 2: Retrieve policies by Policyholder Name")
    try:
        policies = await asyncio.to_thread(
            cosmos.find_documents,
            query={"policyholder_name": "Alice Brown"}
        )
        print(f"Found {len(policies)} policies for Alice Brown:")
        for policy in policies:
            print(f"   - {policy['policy_id']}: {policy['policy_type']}")
    except Exception as e:
        print(f"Error: {e}")
    
    # Test 3: Retrieve by Policy Type
    print("\nTest 3: Retrieve by Policy Type")
    try:
        auto_policies = await asyncio.to_thread(
            cosmos.find_documents,
            query={"policy_type": "Auto Insurance"}
        )
        print(f"Found {len(auto_policies)} Auto Insurance policies:")
        for policy in auto_policies:
            print(f"   - {policy['policy_id']}: {policy['policyholder_name']}")
    except Exception as e:
        print(f"Error: {e}")
    
    # Test 4: Non-existent Policy
    print("\nTest 4: Non-existent Policy")
    try:
        policy = await asyncio.to_thread(
            cosmos.read_document,
            query={"policy_id": "POL-NONEXISTENT"}
        )
        if policy:
            print("Unexpected: Found non-existent policy")
        else:
            print("Correctly returned None for non-existent policy")
    except Exception as e:
        print(f"Error: {e}")
    
    print("\nPolicy retrieval testing completed!")

# Run the tests
if cosmos:
    await test_policy_retrieval()
else:
    print("Cannot run tests - Cosmos DB not available")

Testing Policy Retrieval Scenarios

Test 1: Retrieve by Policy ID
Found: POL-A20002 - Home Insurance for Alice Brown
   Status: active
   Premium: $183.33/month

Test 2: Retrieve policies by Policyholder Name
Error: 'CosmosDBMongoCoreManager' object has no attribute 'find_documents'

Test 3: Retrieve by Policy Type
Error: 'CosmosDBMongoCoreManager' object has no attribute 'find_documents'

Test 4: Non-existent Policy


No document found for the given query.


Correctly returned None for non-existent policy

Policy retrieval testing completed!


## Implement Production Policy Tool

Create the production-ready policy lookup function.

In [6]:
async def _get_policy_from_cosmos(policy_id: str, cosmos_manager: CosmosDBMongoCoreManager) -> Optional[Dict[str, Any]]:
    """
    Retrieve policy information from Cosmos DB by policy ID.
    
    Args:
        policy_id: The policy identifier to lookup
        cosmos_manager: Cosmos DB manager instance
        
    Returns:
        Policy data dictionary or None if not found
    """
    try:
        # Query Cosmos DB for the policy
        policy = await asyncio.to_thread(
            cosmos_manager.read_document,
            query={"policy_id": policy_id}
        )
        return policy
        
    except Exception as e:
        logger.error(f"Error retrieving policy {policy_id} from Cosmos DB: {e}")
        return None

print("Production policy lookup function implemented!")

Production policy lookup function implemented!


## Test Production Policy Function

In [7]:
async def test_production_policy_function():
    """Test the production policy lookup function."""
    if not cosmos:
        print("Cannot test policy function - Cosmos DB not available")
        return
    
    test_policy_ids = [
        "POL-A10001",  # Jane's Auto Insurance
        "POL-A20002",  # Alice's Home Insurance  
        "POL-A30003",  # Alice's Life Insurance
        "POL-NONEXISTENT"  # Non-existent policy
    ]
    
    print("Testing Production Policy Function")
    print("=" * 40)
    
    for policy_id in test_policy_ids:
        print(f"\nTesting policy ID: {policy_id}")
        
        try:
            policy = await _get_policy_from_cosmos(policy_id, cosmos)
            
            if policy:
                print(f"Found: {policy['policy_type']} for {policy['policyholder_name']}")
                print(f"   Status: {policy['status']}")
                print(f"   Monthly Premium: ${policy['premium']['monthly']}")
                if policy.get('coverage_details', {}).get('rental_reimbursement'):
                    rental = policy['coverage_details']['rental_reimbursement']
                    print(f"   Rental Coverage: {rental['coverage']}")
            else:
                print("Policy not found")
                
        except Exception as e:
            print(f"Error: {e}")
    
    print("\nProduction policy function testing completed!")

# Run the test
if cosmos:
    await test_production_policy_function()
else:
    print("Cannot run test - Cosmos DB not available")

Testing Production Policy Function

Testing policy ID: POL-A10001
Found: Auto Insurance for Jane Smith
   Status: active
   Monthly Premium: $125.5

Testing policy ID: POL-A20002
Found: Home Insurance for Alice Brown
   Status: active
   Monthly Premium: $183.33
   Rental Coverage: $100/day for up to 24 months

Testing policy ID: POL-A30003
Found: Life Insurance for Alice Brown
   Status: active
   Monthly Premium: $45.0

Testing policy ID: POL-NONEXISTENT


No document found for the given query.


Policy not found

Production policy function testing completed!


## Fix Authentication Data and Test End-to-End Flow

In [8]:
# Additional comprehensive testing scenarios
async def test_comprehensive_policy_scenarios():
    """Test comprehensive policy scenarios."""
    if not cosmos:
        print("Cosmos DB not available for testing")
        return
    
    print("Testing Comprehensive Policy Scenarios")
    print("=" * 50)
    
    # Test 1: Lookup by policy ID
    print("\nTest 1: Lookup by Policy ID")
    try:
        result = await _get_policy_from_cosmos("POL-A20002", cosmos)
        if result:
            print(f"Found policy: {result['policy_id']}")
            print(f"   Policyholder: {result['policyholder_name']}")
            print(f"   Type: {result['policy_type']}")
            print(f"   Premium: ${result['premium']['monthly']}/month")
        else:
            print("No policy found")
    except Exception as e:
        print(f"Error: {e}")
    
    # Test 2: Invalid policy ID
    print("\nTest 2: Invalid Policy ID")
    try:
        result = await _get_policy_from_cosmos("POL-INVALID", cosmos)
        if result:
            print("Unexpected: Found invalid policy")
        else:
            print("Correctly returned None for invalid policy ID")
    except Exception as e:
        print(f"Error: {e}")

# Run the comprehensive tests
if cosmos:
    await test_comprehensive_policy_scenarios()
else:
    print("Cannot run tests - Cosmos DB not available")

Testing Comprehensive Policy Scenarios

Test 1: Lookup by Policy ID
Found policy: POL-A20002
   Policyholder: Alice Brown
   Type: Home Insurance
   Premium: $183.33/month

Test 2: Invalid Policy ID


No document found for the given query.


Correctly returned None for invalid policy ID


In [9]:
# Fix Authentication Data - Ensure Alice Brown has SSN 1234
async def fix_authentication_data():
    """Fix the authentication data to ensure Alice Brown has SSN 1234."""
    
    # Connect to the policyholders collection (authentication data)
    auth_cosmos = CosmosDBMongoCoreManager(
        database_name="voice_agent_db",
        collection_name="policyholders"  # Authentication collection
    )
    
    print("Fixing Authentication Data")
    print("=" * 50)
    
    # Updated authentication records with correct SSNs
    corrected_auth_data = [
        {
            "_id": "jane_smith",
            "full_name": "Jane Smith",
            "zip": "60601",
            "ssn4": "5678",
            "policy4": "0001",
            "claim4": "9876",
            "phone4": "1078",
            "policy_id": "POL-A10001",
            "created_at": datetime.datetime.utcnow().isoformat() + "Z",
            "updated_at": datetime.datetime.utcnow().isoformat() + "Z"
        },
        {
            "_id": "alice_brown",
            "full_name": "Alice Brown", 
            "zip": "60601",
            "ssn4": "1234",  # Alice gets 1234 (voice agent expects this)
            "policy4": "0002",
            "claim4": "3344", 
            "phone4": "4555",
            "policy_id": "POL-A20002",
            "created_at": datetime.datetime.utcnow().isoformat() + "Z",
            "updated_at": datetime.datetime.utcnow().isoformat() + "Z"
        }
    ]
    
    # Update the authentication database
    for person in corrected_auth_data:
        try:
            result = await asyncio.to_thread(
                auth_cosmos.upsert_document,
                document=person,
                query={"_id": person["_id"]}
            )
            print(f"Updated {person['full_name']}: SSN={person['ssn4']}, Policy={person['policy_id']}")
        except Exception as e:
            print(f"Failed to update {person['full_name']}: {e}")
    
    # Verify the fix
    print(f"\nVerifying Alice Brown authentication data:")
    try:
        alice_record = await asyncio.to_thread(
            auth_cosmos.read_document,
            query={"full_name": "Alice Brown"}
        )
        if alice_record:
            print(f"   Name: {alice_record['full_name']}")
            print(f"   SSN4: {alice_record['ssn4']}")
            print(f"   ZIP: {alice_record['zip']}")
            print(f"   Policy: {alice_record['policy_id']}")
        else:
            print("   Alice Brown not found in authentication database")
    except Exception as e:
        print(f"   Error verifying: {e}")

# Run the fix
await fix_authentication_data()

Fixing Authentication Data
Updated Jane Smith: SSN=5678, Policy=POL-A10001
Updated Alice Brown: SSN=1234, Policy=POL-A20002

Verifying Alice Brown authentication data:
   Name: Alice Brown
   SSN4: 1234
   ZIP: 60601
   Policy: POL-A20002


In [11]:
# Test complete voice agent flow
async def test_complete_voice_agent_flow():
    """Test the complete voice agent flow: authentication -> policy lookup -> response generation."""
    
    print("Testing Complete Voice Agent Flow")
    print("=" * 60)
    
    # Simulate incoming call from Alice Brown
    print("Simulating incoming call from Alice Brown (ZIP: 60601, SSN: 1234)")
    print("Caller asks: 'Do I have rental reimbursement coverage?'")
    print("-" * 60)
    
    # Step 1: Authentication
    print("Step 1: Authentication")
    auth_cosmos = CosmosDBMongoCoreManager(
        database_name="voice_agent_db",
        collection_name="policyholders"
    )
    
    try:
        auth_result = await asyncio.to_thread(
            auth_cosmos.read_document,
            query={"full_name": "Alice Brown", "zip": "60601", "ssn4": "1234"}
        )
        
        if auth_result:
            auth_user = auth_result['full_name']
            authenticated_policy = auth_result['policy_id']
            print(f"   Authentication SUCCESS: {auth_user} → {authenticated_policy}")
        else:
            print("   Authentication FAILED")
            return
    except Exception as e:
        print(f"   Authentication ERROR: {e}")
        return
    
    # Step 2: Policy Lookup
    print("Step 2: Policy Lookup")
    try:
        policy_data = await _get_policy_from_cosmos(authenticated_policy, cosmos)
        
        if policy_data:
            policy_owner = policy_data['policyholder_name']
            policy_type = policy_data['policy_type']
            print(f"   Policy Lookup SUCCESS: {authenticated_policy} → {policy_owner} ({policy_type})")
        else:
            print(f"   Policy Lookup FAILED: Policy {authenticated_policy} not found")
            return
    except Exception as e:
        print(f"   Policy Lookup ERROR: {e}")
        return
    
    # Step 3: Data Consistency Check
    print("Step 3: Data Consistency Check")
    if auth_user == policy_owner:
        print(f"   Data Consistency CHECK: {auth_user} = {policy_owner}")
        
        # Step 4: Check for rental reimbursement coverage
        print("Step 4: Coverage Analysis")
        has_rental = False
        if policy_data.get('coverage_details', {}).get('rental_reimbursement'):
            has_rental = True
            rental_info = policy_data['coverage_details']['rental_reimbursement']
            print(f"   Rental Coverage FOUND: {rental_info['coverage']}")
        else:
            print(f"   Rental Coverage NOT FOUND")
        
        # Step 5: Generate response
        print("Step 5: Response Generation")
        if policy_data.get('policy_type') == 'Home Insurance':
            response = f"Hi {auth_user}, I've checked your Home Insurance policy {authenticated_policy}. "
            if has_rental:
                response += "Yes, you do have rental reimbursement coverage."
            else:
                response += "No, your policy does not include rental reimbursement coverage."
        else:
            response = f"Hi {auth_user}, I've checked your {policy_data.get('policy_type')} policy {authenticated_policy}. "
            response += "Rental reimbursement coverage is typically part of auto insurance policies."
        
        print(f"Agent Response: {response}")
        
    else:
        print(f"Data Inconsistency: Auth user ({auth_user}) ≠ Policy owner ({policy_owner})")
        print(f"This would cause confusing responses to the caller!")
    
    print(f"\n" + "=" * 60)
    print(f"FLOW TEST COMPLETE")
    print(f"Expected Result: Alice Brown gets info about HER Home Insurance policy")

# Run the complete test
await test_complete_voice_agent_flow()

Testing Complete Voice Agent Flow
Simulating incoming call from Alice Brown (ZIP: 60601, SSN: 1234)
Caller asks: 'Do I have rental reimbursement coverage?'
------------------------------------------------------------
Step 1: Authentication
   Authentication SUCCESS: Alice Brown → POL-A20002
Step 2: Policy Lookup
   Policy Lookup SUCCESS: POL-A20002 → Alice Brown (Home Insurance)
Step 3: Data Consistency Check
   Data Consistency CHECK: Alice Brown = Alice Brown
Step 4: Coverage Analysis
   Rental Coverage FOUND: $100/day for up to 24 months
Step 5: Response Generation
Agent Response: Hi Alice Brown, I've checked your Home Insurance policy POL-A20002. Yes, you do have rental reimbursement coverage.

FLOW TEST COMPLETE
Expected Result: Alice Brown gets info about HER Home Insurance policy
