# Operational guardrails

This notebook demonstrates several types of operational guardrails. These include implementing a limit on the number of steps and tool calls an agent can make and using fine-grained access control.

## A note on rate limiting and token limits

If you want to also impose rate limits (e.g., limiting the number of times an agent is invoked or how many times the agent calls an LLM) or token limits, you can use a standard LLM Gateway in combination with an API Gateway. These gateways offer these features out of the box in many cases.

## A note on an agent circuit breaker

You may wish to use a circuit breaker so that you can interrupt an agent that is misbehaving. The most reliable way to do this is at the infrastructure runtime layer. For example, if you run your agents in kubernetes, you can use standard kubernetes commands to terminate a misbehaving pod.

## Limiting the number of steps an agent takes

We may wish to limit how many steps an agent can take or how many tool calls it can invoke. This puts some safety limits in the system to guard against malicious or accidental cases where the agent would take too long to run and consume too many tokens.

Implementing these limits can be done in the agent software layer. In this example, we'll use Strands hooks to check whether we've exceeded the defined limits.

There is no direct cost to this technique, as it happens entirely in the agent software layer.

### Helper classes

In [1]:
import json
import os
from dataclasses import dataclass
from typing import Optional

from strands import Agent, tool
from strands.hooks import HookProvider, HookRegistry
from strands.hooks.events import BeforeInvocationEvent, AfterInvocationEvent
from strands.experimental.hooks.events import (
    BeforeToolInvocationEvent,
    AfterToolInvocationEvent,
    BeforeModelInvocationEvent,
    AfterModelInvocationEvent,
)
from strands.models import BedrockModel

In [2]:
class StepLimitExceededException(Exception):
    """Exception raised when step limits are exceeded."""
    pass

In [3]:
@dataclass
class StepLimits:
    """Configuration for step limits."""
    max_tool_calls: int = 5
    max_llm_calls: int = 10

In [4]:
class StepLimitingHooks(HookProvider):
    """Hook provider that enforces step limits on agent execution."""
    
    def __init__(self, limits: StepLimits):
        self.limits = limits
        self.reset_counters()
    
    def reset_counters(self):
        """Reset all counters to zero."""
        self.tool_call_count = 0
        self.llm_call_count = 0
    
    def register_hooks(self, registry: HookRegistry) -> None:
        """Register all hooks with the registry."""
        # Reset counters at the start of each request
        registry.add_callback(BeforeInvocationEvent, self.reset_request_counters)
        registry.add_callback(AfterInvocationEvent, self.log_final_counts)
        
        # Track tool calls
        registry.add_callback(BeforeToolInvocationEvent, self.before_tool_call)
        registry.add_callback(AfterToolInvocationEvent, self.after_tool_call)
        
        # Track LLM calls
        registry.add_callback(BeforeModelInvocationEvent, self.before_llm_call)
        registry.add_callback(AfterModelInvocationEvent, self.after_llm_call)
    
    def reset_request_counters(self, event: BeforeInvocationEvent) -> None:
        """Reset counters at the start of each request."""
        self.reset_counters()
        print(f"\n🚀 Starting new request for agent: {event.agent.name}")
        print(f"📊 Limits - Tool calls: {self.limits.max_tool_calls}, LLM calls: {self.limits.max_llm_calls}")
    
    def log_final_counts(self, event: AfterInvocationEvent) -> None:
        """Log final counts at the end of the request."""
        print(f"\n✅ Request completed for agent: {event.agent.name}")
        print(f"📈 Final counts - Tool calls: {self.tool_call_count}/{self.limits.max_tool_calls}, "
              f"LLM calls: {self.llm_call_count}/{self.limits.max_llm_calls}")
    
    def before_tool_call(self, event: BeforeToolInvocationEvent) -> None:
        """Check tool call limit before executing a tool."""
        if self.tool_call_count >= self.limits.max_tool_calls:
            raise StepLimitExceededException(
                f"🚫 Tool call limit exceeded: {self.tool_call_count}/{self.limits.max_tool_calls}"
            )
        
        self.tool_call_count += 1
        tool_name = getattr(event.tool_use, 'name', 'unknown') if event.tool_use else "unknown"
        print(f"🔧 Tool call {self.tool_call_count}/{self.limits.max_tool_calls}: {tool_name}")
    
    def after_tool_call(self, event: AfterToolInvocationEvent) -> None:
        """Log tool call completion."""
        tool_name = getattr(event.tool_use, 'name', 'unknown') if event.tool_use else "unknown"
        if event.exception:
            print(f"❌ Tool call failed: {tool_name} - {event.exception}")
        else:
            print(f"✅ Tool call completed: {tool_name}")
    
    def before_llm_call(self, event: BeforeModelInvocationEvent) -> None:
        """Check LLM call limit before making a model inference."""
        if self.llm_call_count >= self.limits.max_llm_calls:
            raise StepLimitExceededException(
                f"🚫 LLM call limit exceeded: {self.llm_call_count}/{self.limits.max_llm_calls}"
            )
        
        self.llm_call_count += 1
        print(f"🧠 LLM call {self.llm_call_count}/{self.limits.max_llm_calls}")
    
    def after_llm_call(self, event: AfterModelInvocationEvent) -> None:
        """Log LLM call completion."""
        if event.exception:
            print(f"❌ LLM call failed: {event.exception}")
        elif event.stop_response:
            print(f"✅ LLM call completed, stop reason: {event.stop_response.stop_reason}")
    
    def get_current_counts(self) -> tuple[int, int]:
        """Get current counts of tool and LLM calls."""
        return self.tool_call_count, self.llm_call_count

### Tools to use with example agent

In [5]:
@tool
def calculator(operation: str, a: float, b: float) -> str:
    """Perform basic mathematical operations.
    
    Args:
        operation: The operation to perform (add, subtract, multiply, divide)
        a: First number
        b: Second number
    
    Returns:
        The result of the operation
    """
    operations = {
        'add': lambda x, y: x + y,
        'subtract': lambda x, y: x - y,
        'multiply': lambda x, y: x * y,
        'divide': lambda x, y: x / y if y != 0 else "Error: Division by zero"
    }
    
    if operation in operations:
        result = operations[operation](a, b)
        return f"{a} {operation} {b} = {result}"
    else:
        return f"Error: Unknown operation '{operation}'"


@tool
def read_file(filename: str) -> str:
    """Read the contents of a file.
    
    Args:
        filename: Path to the file to read
    
    Returns:
        The contents of the file or an error message
    """
    try:
        if not os.path.exists(filename):
            return f"Error: File '{filename}' does not exist"
        
        with open(filename, 'r', encoding='utf-8') as f:
            content = f.read()
            return f"Contents of {filename}:\n{content}"
    except Exception as e:
        return f"Error reading file '{filename}': {str(e)}"


### Define the agent

In [6]:
limits = StepLimits(max_tool_calls=3, max_llm_calls=5)

In [7]:
step_limiter = StepLimitingHooks(limits)

In [49]:
model = BedrockModel(
        model_id="us.amazon.nova-pro-v1:0",
        streaming=False,
        region="us-west-2"  # Adjust region as needed
)

In [50]:
# Create agent with hooks and tools
agent = Agent(
    name="Step limited agent",
    model=model,
    tools=[calculator, read_file],
    hooks=[step_limiter],
    system_prompt="""You are a helpful assistant with access to calculator and file reading tools.
    You should be thorough in your responses and use tools when appropriate.
    However, be aware that there are limits on how many tools you can use and how many times you can call the model."""
)

In [51]:
# Store reference to step limiter for external access
agent._step_limiter = step_limiter

### Test the limits

Let's first try an operation that should exceed our maximum number of tool calls, then a simpler operation that should work.

In [52]:
query = "Use the calculator to: add 5+3, multiply result by 2, then divide by 4, then subtract 1"

In [53]:
try:
    response = agent(query)
    print(f"✅ Completed: {response}")
except StepLimitExceededException as e:
    print(f"🚫 Expected limit exceeded: {e}")
    tool_count, llm_count = agent._step_limiter.get_current_counts()
    print(f"📊 Final counts - Tools: {tool_count}/{limits.max_tool_calls}, LLM: {llm_count}/{limits.max_llm_calls}")



🚀 Starting new request for agent: Step limited agent
📊 Limits - Tool calls: 3, LLM calls: 5
🧠 LLM call 1/5
<thinking> I need to perform a series of mathematical operations in the specified order: addition, multiplication, division, and subtraction. I will use the calculator tool for each operation. </thinking>

Tool #1: calculator
✅ LLM call completed, stop reason: tool_use
🔧 Tool call 1/3: unknown
✅ Tool call completed: unknown
🧠 LLM call 2/5
<thinking> The result of adding 5 and 3 is 8. Next, I need to multiply this result by 2. </thinking> 
Tool #2: calculator
✅ LLM call completed, stop reason: tool_use
🔧 Tool call 2/3: unknown
✅ Tool call completed: unknown
🧠 LLM call 3/5
<thinking> The result of multiplying 8 by 2 is 16. Next, I need to divide this result by 4. </thinking> 
Tool #3: calculator
✅ LLM call completed, stop reason: tool_use
🔧 Tool call 3/3: unknown
✅ Tool call completed: unknown
🧠 LLM call 4/5


cycle failed
Traceback (most recent call last):
  File "/Users/rddefauw/Documents/gen-ai-evaluations-workshop/04-workload-specific-evaluations/04-02-Guardrails/.venv/lib/python3.13/site-packages/strands/event_loop/event_loop.py", line 240, in event_loop_cycle
    async for event in events:
        yield event
  File "/Users/rddefauw/Documents/gen-ai-evaluations-workshop/04-workload-specific-evaluations/04-02-Guardrails/.venv/lib/python3.13/site-packages/strands/event_loop/event_loop.py", line 477, in _handle_tool_execution
    async for tool_event in tool_events:
        yield tool_event
  File "/Users/rddefauw/Documents/gen-ai-evaluations-workshop/04-workload-specific-evaluations/04-02-Guardrails/.venv/lib/python3.13/site-packages/strands/tools/executor.py", line 97, in run_tools
    tool_results.extend([worker.result() for worker in workers])
                         ~~~~~~~~~~~~~^^
  File "/Users/rddefauw/Documents/gen-ai-evaluations-workshop/04-workload-specific-evaluations/04-02-G

<thinking> The result of dividing 16 by 4 is 4. Finally, I need to subtract 1 from this result. </thinking> 
Tool #4: calculator
✅ LLM call completed, stop reason: tool_use

✅ Request completed for agent: Step limited agent
📈 Final counts - Tool calls: 3/3, LLM calls: 4/5


EventLoopException: 🚫 Tool call limit exceeded: 3/3

In [54]:
query = "Use the calculator to add 5+3"

In [55]:
try:
    response = agent(query)
    print(f"✅ Completed: {response}")
except StepLimitExceededException as e:
    print(f"🚫 Expected limit exceeded: {e}")
    tool_count, llm_count = agent._step_limiter.get_current_counts()
    print(f"📊 Final counts - Tools: {tool_count}/{limits.max_tool_calls}, LLM: {llm_count}/{limits.max_llm_calls}")


🚀 Starting new request for agent: Step limited agent
📊 Limits - Tool calls: 3, LLM calls: 5
🧠 LLM call 1/5
<thinking> The user has requested to add 5 and 3. I will use the calculator tool to perform this addition. </thinking> 
Tool #5: calculator
✅ LLM call completed, stop reason: tool_use
🔧 Tool call 1/3: unknown
✅ Tool call completed: unknown
🧠 LLM call 2/5
The result of adding 5 and 3 is 8.✅ LLM call completed, stop reason: end_turn

✅ Request completed for agent: Step limited agent
📈 Final counts - Tool calls: 1/3, LLM calls: 2/5
✅ Completed: The result of adding 5 and 3 is 8.



## Security permission checks

Agent identity and permissions is a complex subject. Agents should not automatically inherit the permissions of the invoking principal (user or system), nor should they have a static set of permissions no matter who the invoking principal is.

In this example, we'll use Amazon Verified Permissions to enforce fine-grained access control based on the calling principal.

We implement the same claims management use case inspired by the AWS blog post: [Design secure generative AI application workflows with Amazon Verified Permissions and Amazon Bedrock Agents](https://aws.amazon.com/blogs/machine-learning/design-secure-generative-ai-application-workflows-with-amazon-verified-permissions-and-amazon-bedrock-agents/).

The cost of using AVP is 0.000005 per authorization request. In this example, we typically make one authorization request each time the agent runs.

### System Components

```
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────────┐
│   Strands       │───▶│  Claims Service  │───▶│  Verified           │
│   Agent         │    │  (Authorization  │    │  Permissions        │
│   (4 tools)     │    │   Layer)         │    │  (Cedar Policies)   │
└─────────────────┘    └──────────────────┘    └─────────────────────┘
         │                        │                       │
         │                        ▼                       ▼
         ▼              ┌──────────────────┐    ┌─────────────────────┐
┌─────────────────┐    │   Mock Claims    │    │  Policy Decisions:  │
│ Mock Cognito    │    │   Database       │    │  • ALLOW/DENY       │
│ Authentication  │    │  (In-memory)     │    │  • Policy Reasons   │
│ (JWT tokens)    │    │                  │    │  • Context-aware    │
└─────────────────┘    └──────────────────┘    └─────────────────────┘
```

### Set up some mock policy data

In [56]:
import json
import logging
import uuid
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from jose import jwt
from strands import Agent, tool

In [57]:
# Mock user data and claims database (same as before)
MOCK_USERS = {
    "alice": {
        "user_id": "alice",
        "role": "ClaimsAdjuster", 
        "region": "west",
        "groups": ["ClaimsAdjuster"]
    },
    "bob": {
        "user_id": "bob",
        "role": "ClaimsAdministrator",
        "region": "east", 
        "groups": ["ClaimsAdministrator"]
    }
}
MOCK_CLAIMS = [
    {
        "claim_id": "CLM-001",
        "status": "open",
        "region": "west",
        "owner": "alice",
        "amount": 5000,
        "description": "Vehicle damage claim"
    },
    {
        "claim_id": "CLM-002", 
        "status": "open",
        "region": "east",
        "owner": "bob",
        "amount": 3200,
        "description": "Property damage claim"
    },
    {
        "claim_id": "CLM-003",
        "status": "open", 
        "region": "west",
        "owner": "alice",
        "amount": 7500,
        "description": "Medical claim"
    },
    {
        "claim_id": "CLM-004",
        "status": "closed",
        "region": "east", 
        "owner": "bob",
        "amount": 2100,
        "description": "Minor fender bender"
    }
]

In [58]:
# Cedar policies for the claims management system
CEDAR_POLICIES = {
    "claims_administrator_list": """
permit (
    principal in ClaimsApp::Role::"ClaimsAdministrator",
    action in [ClaimsApp::Action::"ListClaims"],
    resource
);
    """,
    
    "claims_administrator_deny_read": """
forbid (
    principal in ClaimsApp::Role::"ClaimsAdministrator",
    action in [ClaimsApp::Action::"ReadClaim"],
    resource
);
    """,
    
    "claims_adjuster_list_own_region": """
permit (
    principal in ClaimsApp::Role::"ClaimsAdjuster",
    action in [ClaimsApp::Action::"ListClaims"],
    resource
)
when {
    principal has region &&
    resource has region &&
    principal.region == resource.region
};
    """,
    
    "claims_adjuster_read_own": """
permit (
    principal in ClaimsApp::Role::"ClaimsAdjuster",
    action in [ClaimsApp::Action::"ReadClaim"],
    resource
)
when {
    principal has user_id &&
    resource has owner &&
    principal.user_id == resource.owner
};
    """,
    
    "claims_adjuster_update_own": """
permit (
    principal in ClaimsApp::Role::"ClaimsAdjuster", 
    action in [ClaimsApp::Action::"UpdateClaim"],
    resource
)
when {
    principal has user_id &&
    resource has owner &&
    principal.user_id == resource.owner
};
    """
}

In [59]:
# Schema for the policy store
CEDAR_SCHEMA = {
    "ClaimsApp": {
        "entityTypes": {
            "Role": {
                "shape": {
                    "type": "Record",
                    "attributes": {
                        "user_id": {"type": "String"},
                        "role": {"type": "String"},
                        "region": {"type": "String"}
                    }
                }
            },
            "Claim": {
                "shape": {
                    "type": "Record",
                    "attributes": {
                        "claim_id": {"type": "String"},
                        "status": {"type": "String"},
                        "region": {"type": "String"},
                        "owner": {"type": "String"},
                        "amount": {"type": "Long"}
                    }
                }
            }
        },
        "actions": {
            "ListClaims": {
                "appliesTo": {
                    "principalTypes": ["Role"],
                    "resourceTypes": ["Claim"]
                }
            },
            "ReadClaim": {
                "appliesTo": {
                    "principalTypes": ["Role"],
                    "resourceTypes": ["Claim"]
                }
            },
            "UpdateClaim": {
                "appliesTo": {
                    "principalTypes": ["Role"],
                    "resourceTypes": ["Claim"]
                }
            }
        }
    }
}

### Helper classes

In [60]:
class MockAuthenticator:
    """Mock authentication system that simulates Amazon Cognito ID tokens"""
    
    def __init__(self):
        self.secret_key = "mock-secret-key-for-demo"
        
    def generate_id_token(self, username: str) -> str:
        """Generate a mock ID token for the user"""
        if username not in MOCK_USERS:
            raise ValueError(f"User {username} not found")
            
        user_data = MOCK_USERS[username]
        payload = {
            "sub": user_data["user_id"],
            "custom:role": user_data["role"],
            "custom:region": user_data["region"],
            "cognito:groups": user_data["groups"],
            "iss": "mock-cognito",
            "aud": "mock-client-id",
            "exp": datetime.utcnow() + timedelta(hours=1),
            "iat": datetime.utcnow(),
            "token_use": "id"
        }
        
        return jwt.encode(payload, self.secret_key, algorithm="HS256")
    
    def decode_id_token(self, token: str) -> Dict[str, Any]:
        """Decode and validate the ID token"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"], 
                               options={"verify_aud": False})
            return payload
        except Exception as e:
            raise ValueError(f"Invalid token: {e}")

In [61]:
class AWSVerifiedPermissions:
    """Real AWS Verified Permissions implementation"""
    
    def __init__(self, region_name: str = "us-east-1"):
        try:
            self.client = boto3.client('verifiedpermissions', region_name=region_name)
            self.policy_store_id = None
            logger.info("✅ AWS Verified Permissions client initialized successfully")
        except NoCredentialsError:
            logger.error("❌ AWS credentials not found. Please configure AWS CLI or set environment variables.")
            raise
        except Exception as e:
            logger.error(f"❌ Failed to initialize AWS client: {e}")
            raise
    
    def create_policy_store(self) -> str:
        """Create a new policy store with the claims management schema"""
        try:
            # First create policy store with OFF validation mode
            response = self.client.create_policy_store(
                description="Claims Management System - Strands Agent Demo",
                validationSettings={
                    'mode': 'OFF'
                }
            )
            self.policy_store_id = response['policyStoreId']
            logger.info(f"✅ Created policy store: {self.policy_store_id}")
            
            # Set the schema
            logger.info("🔧 Setting schema...")
            try:
                self.client.put_schema(
                    policyStoreId=self.policy_store_id,
                    definition={
                        'cedarJson': json.dumps(CEDAR_SCHEMA)
                    }
                )
                logger.info("✅ Schema applied to policy store")
                
                # Now enable STRICT validation
                logger.info("🔧 Enabling strict validation...")
                self.client.update_policy_store(
                    policyStoreId=self.policy_store_id,
                    validationSettings={
                        'mode': 'STRICT'
                    }
                )
                logger.info("✅ Strict validation enabled")
                
            except ClientError as schema_error:
                logger.error(f"❌ Schema error: {schema_error}")
                logger.info("📝 Schema content being applied:")
                logger.info(json.dumps(CEDAR_SCHEMA, indent=2))
                raise
            
            return self.policy_store_id
            
        except ClientError as e:
            logger.error(f"❌ Failed to create policy store: {e}")
            raise
    
    def create_policies(self):
        """Create the Cedar policies in the policy store"""
        if not self.policy_store_id:
            raise ValueError("Policy store not created yet")
            
        created_policies = []
        
        for policy_name, policy_statement in CEDAR_POLICIES.items():
            try:
                response = self.client.create_policy(
                    policyStoreId=self.policy_store_id,
                    definition={
                        'static': {
                            'statement': policy_statement,
                            'description': f"Policy for {policy_name}"
                        }
                    }
                )
                created_policies.append(response['policyId'])
                logger.info(f"✅ Created policy {policy_name}: {response['policyId']}")
                
            except ClientError as e:
                logger.error(f"❌ Failed to create policy {policy_name}: {e}")
                continue
        
        return created_policies
    
    def is_authorized(self, principal: Dict, action: str, resource: Dict, 
                     context: Optional[Dict] = None) -> Dict[str, Any]:
        """Check authorization using AWS Verified Permissions IsAuthorized API"""
        
        if not self.policy_store_id:
            raise ValueError("Policy store not initialized")
        
        # Create Role entity as the principal (this matches our policies)
        role_entity = {
            'entityType': 'ClaimsApp::Role',
            'entityId': principal['role']
        }
        
        # Role gets the user attributes for policy conditions
        role_attrs = {}
        for key, value in principal.items():
            if isinstance(value, int):
                role_attrs[key] = {'long': value}
            else:
                role_attrs[key] = {'string': str(value)}
        
        # Resource entity
        resource_entity = {
            'entityType': 'ClaimsApp::Claim',
            'entityId': resource.get('claim_id', 'unknown')
        }
        
        # Resource attributes
        resource_attrs = {}
        for key, value in resource.items():
            if key not in ['type', 'claim_id']:
                if isinstance(value, int):
                    resource_attrs[key] = {'long': value}
                else:
                    resource_attrs[key] = {'string': str(value)}
        
        # Build entities list in correct format
        entity_list = [
            {
                'identifier': role_entity,
                'attributes': role_attrs
            },
            {
                'identifier': resource_entity,
                'attributes': resource_attrs
            }
        ]
        
        # Format entities and context correctly
        entities_def = {'entityList': entity_list}
        context_def = {'contextMap': context or {}}
        
        try:
            response = self.client.is_authorized(
                policyStoreId=self.policy_store_id,
                principal=role_entity,  # Principal is the Role
                action={
                    'actionType': 'ClaimsApp::Action',
                    'actionId': action
                },
                resource=resource_entity,
                entities=entities_def,
                context=context_def
            )
            
            return {
                'decision': response['decision'],
                'determiningPolicies': response.get('determiningPolicies', []),
                'errors': response.get('errors', [])
            }
            
        except ClientError as e:
            logger.error(f"❌ Authorization request failed: {e}")
            return {
                'decision': 'DENY',
                'determiningPolicies': [],
                'errors': [str(e)]
            }

In [62]:
class ClaimsService:
    """Claims management service with real AWS Verified Permissions integration"""
    
    def __init__(self, verified_permissions: AWSVerifiedPermissions, 
                 authenticator: MockAuthenticator):
        self.avp = verified_permissions
        self.auth = authenticator
    
    def list_claims(self, id_token: str, status_filter: str = "open") -> List[Dict]:
        """List claims with authorization filtering using AWS Verified Permissions"""
        
        # Decode user info from ID token
        user_info = self.auth.decode_id_token(id_token)
        principal = {
            "user_id": user_info["sub"],
            "role": user_info["custom:role"], 
            "region": user_info["custom:region"]
        }
        
        authorized_claims = []
        
        # Filter claims and check authorization for each
        for claim in MOCK_CLAIMS:
            if claim["status"] != status_filter:
                continue
                
            resource = {
                "type": "Claim",
                "claim_id": claim["claim_id"],
                "region": claim["region"],
                "owner": claim["owner"],
                "status": claim["status"],
                "amount": claim["amount"]
            }
            
            # Check authorization for this specific claim using AWS
            auth_result = self.avp.is_authorized(
                principal=principal,
                action="ListClaims", 
                resource=resource
            )
            
            if auth_result["decision"] == "ALLOW":
                authorized_claims.append({
                    "claim_id": claim["claim_id"],
                    "status": claim["status"],
                    "region": claim["region"], 
                    "amount": claim["amount"]
                })
        
        return authorized_claims
    
    def get_claim_details(self, id_token: str, claim_id: str) -> Optional[Dict]:
        """Get detailed claim information with AWS Verified Permissions authorization"""
        
        user_info = self.auth.decode_id_token(id_token)
        principal = {
            "user_id": user_info["sub"],
            "role": user_info["custom:role"],
            "region": user_info["custom:region"]
        }
        
        # Find the claim
        claim = next((c for c in MOCK_CLAIMS if c["claim_id"] == claim_id), None)
        if not claim:
            return None
            
        resource = {
            "type": "Claim", 
            "claim_id": claim["claim_id"],
            "region": claim["region"],
            "owner": claim["owner"],
            "status": claim["status"],
            "amount": claim["amount"]
        }
        
        # Check authorization using AWS Verified Permissions
        auth_result = self.avp.is_authorized(
            principal=principal,
            action="ReadClaim",
            resource=resource
        )
        
        if auth_result["decision"] == "ALLOW":
            return claim
        else:
            policies = [p.get('policyId', 'Unknown') for p in auth_result.get('determiningPolicies', [])]
            raise PermissionError(f"Access denied to claim {claim_id}. Determining policies: {policies}")
    
    def update_claim_status(self, id_token: str, claim_id: str, new_status: str) -> bool:
        """Update claim status with AWS Verified Permissions authorization"""
        
        user_info = self.auth.decode_id_token(id_token)
        principal = {
            "user_id": user_info["sub"], 
            "role": user_info["custom:role"],
            "region": user_info["custom:region"]
        }
        
        # Find the claim
        claim = next((c for c in MOCK_CLAIMS if c["claim_id"] == claim_id), None)
        if not claim:
            return False
            
        resource = {
            "type": "Claim",
            "claim_id": claim["claim_id"], 
            "region": claim["region"],
            "owner": claim["owner"],
            "status": claim["status"],
            "amount": claim["amount"]
        }
        
        # Check authorization using AWS Verified Permissions
        auth_result = self.avp.is_authorized(
            principal=principal,
            action="UpdateClaim",
            resource=resource
        )
        
        if auth_result["decision"] == "ALLOW":
            claim["status"] = new_status
            return True
        else:
            policies = [p.get('policyId', 'Unknown') for p in auth_result.get('determiningPolicies', [])]
            raise PermissionError(f"Access denied to update claim {claim_id}. Determining policies: {policies}")

In [63]:
# Global services - initialized later
claims_service = None
current_user_token = None
avp_service = None

In [64]:
def setup_policy_store(region: str = "us-east-1") -> str:
    """Set up AWS Verified Permissions policy store with policies"""
    global avp_service
    
    print("🔧 Setting up AWS Verified Permissions policy store...")
    
    avp_service = AWSVerifiedPermissions(region_name=region)
    policy_store_id = avp_service.create_policy_store()
    
    # Wait a moment for the policy store to be ready
    time.sleep(2)
    
    print("📋 Creating Cedar policies...")
    avp_service.create_policies()
    
    print(f"✅ Policy store setup complete: {policy_store_id}")
    return policy_store_id

def initialize_services(policy_store_id: Optional[str] = None, region: str = "us-east-1"):
    """Initialize the claims service with AWS Verified Permissions"""
    global claims_service, avp_service
    
    if not avp_service:
        avp_service = AWSVerifiedPermissions(region_name=region)
        if policy_store_id:
            avp_service.policy_store_id = policy_store_id
            print(f"✅ Using existing policy store: {policy_store_id}")
        else:
            raise ValueError("Policy store ID required when initializing services")
    
    auth = MockAuthenticator()
    claims_service = ClaimsService(avp_service, auth)

### Tools for the agent to use

In [65]:
@tool
def authenticate_user(username: str) -> str:
    """
    Authenticate a user and return their session token.
    
    Args:
        username (str): The username to authenticate (alice or bob)
        
    Returns:
        str: Success message with user role and region
    """
    global current_user_token
    
    if username not in MOCK_USERS:
        return f"Authentication failed: User '{username}' not found"
    
    auth = MockAuthenticator()
    try:
        current_user_token = auth.generate_id_token(username)
        user_data = MOCK_USERS[username]
        return f"Successfully authenticated as {username} (Role: {user_data['role']}, Region: {user_data['region']})"
    except Exception as e:
        return f"Authentication failed: {str(e)}"

@tool 
def list_open_claims() -> str:
    """
    List all open claims that the authenticated user is authorized to see.
    
    Returns:
        str: JSON string of authorized claims or error message
    """
    global current_user_token, claims_service
    
    try:
        if not current_user_token:
            return "Error: Please authenticate first using authenticate_user()"
        
        if not claims_service:
            return "Error: Claims service not initialized. Please run the main application first to set up AWS Verified Permissions."
            
        claims = claims_service.list_claims(current_user_token, "open")
        if claims:
            return f"Found {len(claims)} authorized open claims (via AWS Verified Permissions):\n" + json.dumps(claims, indent=2)
        else:
            return "No open claims found that you are authorized to access"
            
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        logger.error(f"Error in list_open_claims: {e}\nDetails: {error_details}")
        return f"Error listing claims: {str(e)}"

@tool
def get_claim_details(claim_id: str) -> str:
    """
    Get detailed information about a specific claim.
    
    Args:
        claim_id (str): The ID of the claim to retrieve (e.g., 'CLM-001')
        
    Returns:
        str: JSON string of claim details or error message
    """
    global current_user_token, claims_service
    
    try:
        if not current_user_token:
            return "Error: Please authenticate first using authenticate_user()"
        
        if not claims_service:
            return "Error: Claims service not initialized. Please run the main application first to set up AWS Verified Permissions."
            
        claim = claims_service.get_claim_details(current_user_token, claim_id)
        if claim:
            return f"Claim details for {claim_id} (authorized by AWS Verified Permissions):\n" + json.dumps(claim, indent=2)
        else:
            return f"Claim {claim_id} not found"
            
    except PermissionError as e:
        return f"Access denied: {str(e)}"
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        logger.error(f"Error in get_claim_details: {e}\nDetails: {error_details}")
        return f"Error retrieving claim: {str(e)}"

@tool
def update_claim_status(claim_id: str, new_status: str) -> str:
    """
    Update the status of a claim (e.g., from 'open' to 'closed').
    
    Args:
        claim_id (str): The ID of the claim to update
        new_status (str): The new status ('open', 'closed', 'pending', etc.)
        
    Returns:
        str: Success or error message
    """
    global current_user_token, claims_service
    
    try:
        if not current_user_token:
            return "Error: Please authenticate first using authenticate_user()"
        
        if not claims_service:
            return "Error: Claims service not initialized. Please run the main application first to set up AWS Verified Permissions."
            
        success = claims_service.update_claim_status(current_user_token, claim_id, new_status)
        if success:
            return f"Successfully updated claim {claim_id} status to '{new_status}' (authorized by AWS Verified Permissions)"
        else:
            return f"Failed to update claim {claim_id} - claim not found"
            
    except PermissionError as e:
        return f"Access denied: {str(e)}"
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        logger.error(f"Error in update_claim_status: {e}\nDetails: {error_details}")
        return f"Error updating claim: {str(e)}"

In [66]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### Set up policy store

In [69]:
policy_store_id = None
region = "us-east-1"
    
policy_store_id = setup_policy_store(region)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:✅ AWS Verified Permissions client initialized successfully


🔧 Setting up AWS Verified Permissions policy store...


INFO:__main__:✅ Created policy store: RV7xXDTs8DsgEefgS4fox3
INFO:__main__:🔧 Setting schema...
INFO:__main__:✅ Schema applied to policy store
INFO:__main__:🔧 Enabling strict validation...
INFO:__main__:✅ Strict validation enabled
INFO:__main__:✅ Created policy claims_administrator_list: SKRLJN446WngfmK8DQ4ytX


📋 Creating Cedar policies...


INFO:__main__:✅ Created policy claims_administrator_deny_read: Ap5uS8p9Qf3kbRuuevCoaW
INFO:__main__:✅ Created policy claims_adjuster_list_own_region: SsL3etWw33wUPLW3CbEMd6
INFO:__main__:✅ Created policy claims_adjuster_read_own: VwKRDzdfZjku1PXEnTrdDn
INFO:__main__:✅ Created policy claims_adjuster_update_own: 54SRGQzYdYSW4KRzb7ezoF


✅ Policy store setup complete: RV7xXDTs8DsgEefgS4fox3


In [70]:
print("\n🔧 Initializing Claims Management System...")
initialize_services(policy_store_id, region)


🔧 Initializing Claims Management System...


### Create our agent

In [71]:
# Create Strands agent
agent = Agent(
    model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    tools=[authenticate_user, list_open_claims, get_claim_details, update_claim_status],
    system_prompt=f"""
You are a helpful claims management assistant powered by Amazon Verified Permissions for fine-grained 
access control. Authorization decisions are made by AWS Verified Permissions using Cedar policies.

Policy Store ID: {policy_store_id}
Region: {region}

Users must authenticate first before accessing any claims data. There are two types of users:

1. Claims Administrator (like 'bob'): 
   - Can list claims across all regions
   - Cannot read individual claim details or update claims (enforced by Cedar forbid policy)

2. Claims Adjuster (like 'alice'):
   - Can list claims only in their assigned region
   - Can read and update claims they own
   - Cannot access claims from other regions or owners

All authorization decisions are made by AWS Verified Permissions using Cedar policy language.
Always authenticate users first, then help them with their authorized operations.

Available users for demo: 'alice' (Claims Adjuster, West region) and 'bob' (Claims Administrator, East region)
    """.strip()
)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


### Test scenarios

First, we'll authenticate as Alice, and view all claims. Then we'll authenticate as Bob and do the same thing. We'll see that Bob has permission to see more claims.

In [72]:
response = agent("Authenticate me as alice")
print(response.message)

I'll authenticate you as Alice right away.
Tool #1: authenticate_user


  "exp": datetime.utcnow() + timedelta(hours=1),
  "iat": datetime.utcnow(),


You've been successfully authenticated as Alice with the role of Claims Adjuster for the West region. You now have access to:
- List claims in the West region
- View details of claims you own
- Update claims you own

Is there something specific you'd like to do with the claims system now that you're authenticated?{'role': 'assistant', 'content': [{'text': "You've been successfully authenticated as Alice with the role of Claims Adjuster for the West region. You now have access to:\n- List claims in the West region\n- View details of claims you own\n- Update claims you own\n\nIs there something specific you'd like to do with the claims system now that you're authenticated?"}]}


In [73]:
response = agent("List all open claims")
print(response.message)

I'll list all open claims that you're authorized to see.
Tool #2: list_open_claims
Here are the open claims you're authorized to see:

1. Claim ID: CLM-001
   - Status: Open
   - Region: West
   - Amount: $5,000

2. Claim ID: CLM-003
   - Status: Open
   - Region: West
   - Amount: $7,500

As a Claims Adjuster for the West region, you can view and manage claims within your region. Would you like to see more details about any specific claim or take any other action?{'role': 'assistant', 'content': [{'text': "Here are the open claims you're authorized to see:\n\n1. Claim ID: CLM-001\n   - Status: Open\n   - Region: West\n   - Amount: $5,000\n\n2. Claim ID: CLM-003\n   - Status: Open\n   - Region: West\n   - Amount: $7,500\n\nAs a Claims Adjuster for the West region, you can view and manage claims within your region. Would you like to see more details about any specific claim or take any other action?"}]}


In [74]:
response = agent("Authenticate me as bob")
print(response.message)

I'll authenticate you as Bob right away.
Tool #3: authenticate_user


  "exp": datetime.utcnow() + timedelta(hours=1),
  "iat": datetime.utcnow(),


You've been successfully authenticated as Bob with the role of Claims Administrator for the East region. As a Claims Administrator, you have access to:
- List claims across all regions
- However, you cannot read individual claim details or update claims (due to Cedar forbid policy)

Is there something specific you'd like to do with the claims system now that you're authenticated?{'role': 'assistant', 'content': [{'text': "You've been successfully authenticated as Bob with the role of Claims Administrator for the East region. As a Claims Administrator, you have access to:\n- List claims across all regions\n- However, you cannot read individual claim details or update claims (due to Cedar forbid policy)\n\nIs there something specific you'd like to do with the claims system now that you're authenticated?"}]}


In [75]:
response = agent("List all open claims")
print(response.message)

I'll list all open claims that you're authorized to see.
Tool #4: list_open_claims
Here are all the open claims you're authorized to see:

1. Claim ID: CLM-001
   - Status: Open
   - Region: West
   - Amount: $5,000

2. Claim ID: CLM-002
   - Status: Open
   - Region: East
   - Amount: $3,200

3. Claim ID: CLM-003
   - Status: Open
   - Region: West
   - Amount: $7,500

As a Claims Administrator, you can see all open claims across both East and West regions. Remember that while you can view this list, your role doesn't allow you to access individual claim details or update claims due to policy restrictions. Would you like to perform any other actions?{'role': 'assistant', 'content': [{'text': "Here are all the open claims you're authorized to see:\n\n1. Claim ID: CLM-001\n   - Status: Open\n   - Region: West\n   - Amount: $5,000\n\n2. Claim ID: CLM-002\n   - Status: Open\n   - Region: East\n   - Amount: $3,200\n\n3. Claim ID: CLM-003\n   - Status: Open\n   - Region: West\n   - Amount: 