# Example for usage ARc Policy via a Strands Agent

Install requirements

In [None]:
%pip install -r requirements.txt --no-cache-dir --force-reinstall

In [None]:
import IPython

IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
# Import libraries
import os
import json
import requests
import boto3
import time
from boto3.session import Session
from strands.tools import tool

# Get boto session
boto_session = Session()

## Setup for the agent

### Create Code for the Agent
Create agents folder if it's not created.

In [None]:
![ ! -d "agents" ] && mkdir agents

In [None]:
%%writefile agents/claimvalidation-agent.py
import os
import logging
import asyncio
import boto3
import json
import uvicorn
from datetime import datetime
from strands import Agent
from strands.models import BedrockModel
from strands.tools import tool
from strands.multiagent.a2a import A2AServer
from fastapi import FastAPI
from strands.hooks import HookProvider, HookRegistry, MessageAddedEvent, BeforeModelCallEvent, BeforeToolCallEvent
from pydantic import BaseModel
from botocore.config import Config as BotocoreConfig
from strands.telemetry import StrandsTelemetry
from findings_utils import extract_reasoning_findings
from strands_tools import retrieve

# Configure the root strands logger
# logging.getLogger("strands").setLevel(logging.DEBUG)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Add a handler to see the logs
logging.basicConfig(
    format="%(levelname)s | %(name)s | %(message)s", 
    handlers=[logging.StreamHandler()]
)

# Setup tracing - commented out for now as this adds a lot of trace output that really isn't interesting
# NOTE: To send the OTEL data to an ADOT collector, additional exporter needs to be used
StrandsTelemetry().setup_console_exporter()

# AWS Configuration
AWS_REGION = "us-east-1"
# MODEL_ID = "anthropic.claude-sonnet-4-5-20250929-v1:0"

# Initialize Bedrock client to verify connectivity
bedrock_client = boto3.client(
    service_name="bedrock-runtime",
    region_name=AWS_REGION
)

print(f"✓ AWS Bedrock configured for region: {AWS_REGION}")
# print(f"✓ Using model: {MODEL_ID}")

# Supply the pre-installed polciy and guardrail IDs
ARC_POLICY_ARN = "arn:aws:bedrock:us-east-1:505766489067:automated-reasoning-policy/413037g0my75"
GUARDRAIL_ID = "aq9ga2k7be2c"
GUARDRAIL_VERSION = "DRAFT"
KNOWLEDGE_BASE_ID = "2KSYYY00BD"
# NOTE: the default model for Strands is us.anthropic.claude-sonnet-4-20250514-v1:0

# Setup the environment for the agent and tool
# Allow for the metadata to be retrieved on sources from the KB
os.environ['RETRIEVE_ENABLE_METADATA_DEFAULT'] = 'true'
# Allow for the retrieve tool to interact with the KB
os.environ['KNOWLEDGE_BASE_ID'] = KNOWLEDGE_BASE_ID

# Define a notification hook to listen to events and then process the result and call
# Automated Reasoning attached via the Guardrail and report on the findings.  This
# can be used possibly re-write the output or add a flag on if the output is correct.
# Define a notification hook to listen to events and then process the result and call
# Automated Reasoning attached via the Guardrail and report on the findings.  This
# can be used possibly re-write the output or add a flag on if the output is correct.
class NotifyOnlyGuardrailsHook(HookProvider):
    
    def __init__(self, guardrail_id: str, guardrail_version: str, arc_policy_arn: str):
        self.guardrail_id = guardrail_id
        self.guardrail_version = guardrail_version
        self.arc_policy_arn = arc_policy_arn
        self.bedrock_client = boto3.client("bedrock-runtime")
        self.input = ''
        self.claim_valid = True
        self.findings = ''
        self.policy_definition = {}
        self.before_tool_event_flag = False
        self.before_model_event_flag = False

        if self.arc_policy_arn:
            try:
                bedrock_client = boto3.client('bedrock')
                response = bedrock_client.export_automated_reasoning_policy_version(policyArn=self.arc_policy_arn)
                self.policy_definition = response.get('policyDefinition', {})
            except Exception as e:
                print(f"Error getting policy definition: {str(e)}")
                raise

    def register_hooks(self, registry: HookRegistry) -> None:
        registry.add_callback(BeforeModelCallEvent, self.before_model_event)
        registry.add_callback(BeforeToolCallEvent, self.before_tool_event)
        registry.add_callback(MessageAddedEvent, self.message_added)

    def message_added(self, event: MessageAddedEvent) -> None:
        if self.before_tool_event_flag:
            # Since a tool was called, just ignore this message addition
            self.before_tool_event_flag = False
            return
        
        # Get the content
        content = "".join(block.get("text", "") for block in event.message.get("content", []))

        # Determine the source
        if event.message.get("role") == "user":
            # Store the input for later usage and allow the loop to continue to process
            self.input = content
            return

        if not content:
            return
            #do something 

        # Capture if this is the first time that findings will be created
        first_findings = (not self.findings)

        # Format a request to send to the guardrail
        content_to_validate = [
            {"text": {"text": self.input, "qualifiers": ["query"]}},
            {"text": {"text": content, "qualifiers": ["guard_content"]}}
        ]
        
        # Call the guardrail
        response = self.bedrock_client.apply_guardrail(
            guardrailIdentifier=self.guardrail_id,
            guardrailVersion=self.guardrail_version,
            source="OUTPUT",
            content=content_to_validate
        )

        # Determine if the output is correct
        self.findings = extract_reasoning_findings(response, self.policy_definition)
         
        assessments = response.get("assessments", [])
        if assessments and len(assessments):
            self.claim_valid = False

        # Add information to the output
        if self.findings and first_findings:
            new_output = content
            new_output = new_output + f"\n*** FINDINGS: ***:\n{self.findings}"
            new_output = new_output + f"\n*** CLAIM VALID: ***:\n{self.claim_valid}"
            event.message["content"][0]["text"] = new_output
        
    def before_model_event(self, event: BeforeModelCallEvent) -> None:
        self.before_model_event_flag = True

    def before_tool_event(self, event: BeforeToolCallEvent) -> None:
        self.before_tool_event_flag = True

# Create structured output
class StructuredOutputModel(BaseModel):
    claim_valid: bool
    content: str
    findings: str

agent_instructions="""You are an expert automotive claims validaiton specialist that determines if the users auto insurance claim is valid based on the provided information and details within the policy contract.
    
You will be provided with JSON data that has claim information and vehicle damage information, you should:
1. Extract from the JSON data required claim information to be validated
2. Focus on time of event and time of claim creation
3. Focus on claims and coverage inconsistencies

Your responses should :
- If your response is "Valid claim", then output the provide the full JSON structure provided from the input
- If you response is "Invalid claim", then output the response "This Claim is Invalid and no appraisal nor settlement is required"
- If you response is "Invalid claim", then provide clear explanation on why is invalid in the output
- In cases where a clear outcome is not present, recommend the user to check with their insurance agent directly. 

Take your time to think though the answer and evalute carefully."

"""

# Create agent with the guardrail-protected model
agent = Agent(
    name="Claims Validator - ARC",
    description="A Single agent with Claims Validation tools capabilities",
    hooks=[NotifyOnlyGuardrailsHook(GUARDRAIL_ID, GUARDRAIL_VERSION, ARC_POLICY_ARN)],
    tools=[retrieve],
    system_prompt=agent_instructions
)

################# A2A ################
app = FastAPI()
runtime_url = os.environ.get('AGENTCORE_RUNTIME_URL', 'http://127.0.0.1:9000/')
host, port = "0.0.0.0", 9000

a2a_server = A2AServer(
    agent=agent,
    http_url=runtime_url,
    serve_at_root=True,
)

# Respond to pings
@app.get("/ping")
def ping():
    return {"status": "healthy"}

# Any request, call the server
app.mount("/", a2a_server.to_fastapi_app())

# If this is running from main, start the server
if __name__ == "__main__":
    uvicorn.run(app, host=host, port=port)

################# A2A ################

Let's write a requirements.txt file with dependencies that are needed for the agent.

In [None]:
%%writefile agents/requirements.txt
boto3
botocore
bedrock-agentcore
bedrock-agentcore-starter-toolkit
strands-agents
strands-agents[a2a]
strands-agents-tools
pyyaml
ddgs
PyYAML
PyPDF2
opentelemetry-sdk
opentelemetry-exporter-otlp
opentelemetry-instrumentation-boto
opentelemetry-sdk-extension-aws
pydantic 
httpx
fastapi
uvicorn[standard]
requests

###  Deploy to AgentCore Runtime

#### Setup Cognito User Pool

In [None]:
from helpers.utils import setup_cognito_user_pool, reauthenticate_user

print("Setting up Amazon Cognito user pool...")
cognito_config = (
    setup_cognito_user_pool()
)  # You'll get your bearer token from this output cell.
print("Cognito setup completed ✓")

#### Create IAM Role for the Agents

In [None]:
from helpers.utils import create_agentcore_runtime_execution_role, AWS_CLAIMSVALIDATION_ROLE_NAME

execution_role_arn = create_agentcore_runtime_execution_role(AWS_CLAIMSVALIDATION_ROLE_NAME)

In [None]:
execution_role_arn

##### Configure and deploy our  agent:

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime

agentcore_runtime_claimvalidation_agent = Runtime()
claimvalidation_agent_name="aws_claimvalidation_assistant"

region = boto_session.region_name

# Configure the deployment
response_claimvalidation_agent = agentcore_runtime_claimvalidation_agent.configure(
    entrypoint="agents/claimvalidation-agent.py",
    execution_role=execution_role_arn,
    auto_create_ecr=True,
    requirements_file="agents/requirements.txt",
    region=region,
    agent_name=claimvalidation_agent_name,
    authorizer_configuration={
        "customJWTAuthorizer": {
            "allowedClients": [cognito_config.get("client_id")],
            "discoveryUrl": cognito_config.get("discovery_url"),
        }
    },
    protocol="A2A",
)

print("Configuration completed:", response_claimvalidation_agent)




In [None]:
launch_result_claimvalidation = agentcore_runtime_claimvalidation_agent.launch()
print("Launch completed:", launch_result_claimvalidation.agent_arn)

claimvalidation_agent_arn = launch_result_claimvalidation.agent_arn

In [None]:
status_response = agentcore_runtime_claimvalidation_agent.status()
status = status_response.endpoint["status"]

print(f"Final status: {status}")

#### Export and save outputs

Export variables to be used in next notebooks:

In [None]:
CLAIMSVALIDATION_AGENT_ID = launch_result_claimvalidation.agent_id
CLAIMSVALIDATION_AGENT_ARN = launch_result_claimvalidation.agent_arn
CLAIMSVALIDATION_AGENT_NAME = claimvalidation_agent_name


CLAIMSVALIDATION_COGNITO_CLIENT_ID = cognito_config.get("client_id")
CLAIMSVALIDATION_COGNITO_SECRET = cognito_config.get("client_secret")
CLAIMSVALIDATION_DISCOVERY_URL = cognito_config.get("discovery_url")

%store CLAIMSVALIDATION_AGENT_ID
%store CLAIMSVALIDATION_AGENT_ARN
%store CLAIMSVALIDATION_AGENT_NAME
%store CLAIMSVALIDATION_COGNITO_CLIENT_ID
%store CLAIMSVALIDATION_COGNITO_SECRET
%store CLAIMSVALIDATION_DISCOVERY_URL

In [None]:
from helpers.utils import put_ssm_parameter, SSM_CLAIMSVALIDATION_AGENT_ARN

put_ssm_parameter(SSM_CLAIMSVALIDATION_AGENT_ARN, claimvalidation_agent_arn)

### Invoking A2A agents

Firstly, let's refresh the auth token:

In [None]:
bearer_token = reauthenticate_user(
    cognito_config.get("client_id"), 
    cognito_config.get("client_secret")
)

In [None]:
cognito_config.get("client_id")

In [None]:
CLAIMSVALIDATION_COGNITO_CLIENT_ID

In [None]:
from uuid import uuid4
from urllib.parse import quote

def fetch_agent_card(agent_arn):
    # URL encode the agent ARN
    escaped_agent_arn = quote(agent_arn, safe='')

    # Construct the URL
    url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{escaped_agent_arn}/invocations/.well-known/agent-card.json"
    
    # Generate a unique session ID
    session_id = str(uuid4())
    print(f"Generated session ID: {session_id}")

    # Set headers
    headers = {
        'Accept': '*/*',
        'Authorization': f'Bearer {bearer_token}',
        'X-Amzn-Bedrock-AgentCore-Runtime-Session-Id': session_id,
    }

    try:
        # Make the request
        response = requests.get(url, headers=headers, timeout=900)
        response.raise_for_status()

        # Parse and pretty print JSON
        agent_card = response.json()
        print(json.dumps(agent_card, indent=2))

        return agent_card

    except requests.exceptions.RequestException as e:
        print(f"Error fetching agent card: {e}")
        raise e

In [None]:
fetch_agent_card(claimvalidation_agent_arn)

#### Test agents

Now, let's invoke the first agent, using A2A:

In [None]:
import asyncio
import logging
import os
from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.types import Message, Part, Role, TextPart

logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

DEFAULT_TIMEOUT = 300  # set request timeout to 5 minutes

def format_agent_response(response):
    """Extract and format agent response for human readability."""
    # Get the main response text from artifacts
    if response.artifacts and len(response.artifacts) > 0:
        artifact = response.artifacts[0]
        if artifact.parts and len(artifact.parts) > 0:
            return artifact.parts[0].root.text
    
    # Fallback: concatenate all agent messages from history
    agent_messages = [
        msg.parts[0].root.text 
        for msg in response.history 
        if msg.role.value == 'agent' and msg.parts
    ]
    return ''.join(agent_messages)


def create_message(*, role: Role = Role.user, text: str) -> Message:
    return Message(
        kind="message",
        role=role,
        parts=[Part(TextPart(kind="text", text=text))],
        message_id=uuid4().hex,
    )

async def send_sync_message(agent_arn, message: str):
    # Get runtime URL from environment variable
    escaped_agent_arn = quote(agent_arn, safe='')

    # Construct the URL
    runtime_url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{escaped_agent_arn}/invocations/"
    
    # Generate a unique session ID
    session_id = str(uuid4())
    print(f"Generated session ID: {session_id}")

    # Add authentication headers for AgentCore
    headers = {"Authorization": f"Bearer {bearer_token}",
              'X-Amzn-Bedrock-AgentCore-Runtime-Session-Id': session_id}
        
    async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, headers=headers) as httpx_client:
        # Get agent card from the runtime URL
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=runtime_url)
        agent_card = await resolver.get_agent_card()
        print(agent_card)

        # Agent card contains the correct URL (same as runtime_url in this case)
        # No manual override needed - this is the path-based mounting pattern

        # Create client using factory
        config = ClientConfig(
            httpx_client=httpx_client,
            streaming=False,  # Use non-streaming mode for sync response
        )
        factory = ClientFactory(config)
        client = factory.create(agent_card)

        # Create and send message
        msg = create_message(text=message)

        print(msg)

        # With streaming=False, this will yield exactly one result
        async for event in client.send_message(msg):
            if isinstance(event, Message):
                logger.info(event.model_dump_json(exclude_none=True, indent=2))
                return event
            elif isinstance(event, tuple) and len(event) == 2:
                # (Task, UpdateEvent) tuple
                task, update_event = event
                logger.info(f"Task: {task.model_dump_json(exclude_none=True, indent=2)}")
                if update_event:
                    logger.info(f"Update: {update_event.model_dump_json(exclude_none=True, indent=2)}")
                return task
            else:
                # Fallback for other response types
                logger.info(f"Response: {str(event)}")
                return event

In [None]:
result = await send_sync_message(claimvalidation_agent_arn, "I was delivering goods for my company using my personal car. I got into an accident, this was over 3 months ago. Does my policy cover this?")
formatted_output = format_agent_response(result)
print(formatted_output)