# Advanced processing of Strands Agents Response

Strands Agents allows you to intercept and process events as they happen during agent execution using two methods: 
    
- **Async iterators**: ideal for asynchronous frameworks like FastAPI, aiohttp, or Django Channels. For these environments, the SDK offers the `stream_async` method which returns an asynchronous iterator. 
- **Callback handlers**: allow you to intercept and process events as they happen during agent execution. This enables real-time monitoring, custom output formatting, and integration with external systems.

In this lab we will use async iterators and callback handlers for streaming to handles call to the agent. This will be provisioned at an AWS event for adhoc setup you can follow the sections below or go to Streaming Agent Response 



## Agent Details
<div style="float: left; margin-right: 20px; ">
    
|Feature             |Description                                        |
|--------------------|---------------------------------------------------|
|Feature used        |async iterators, callback handlers                 |
|Agent Structure     |single agent architecture                          |
|Native tools used   |calculator                                         |
|Custom tools created|Weather forecast                                   |

</div>

## Architecture

<div style="text-align:left;">
    <img src="images/architecture.png" width="65%" />
</div>

## Key Features
* Async Iterators for Streaming
* Callback Handlers


## Setup and prerequisites

### Prerequisites
* Python 3.10+
* AWS account
* Anthropic Claude 3.7 enabled on Amazon Bedrock

Let's now install the requirement packages for our Strands Agent Agent

In [1]:
# installing pre-requisites
!pip install -r requirements.txt



### Importing dependency packages

Now let's import the dependency packages

In [2]:
import asyncio

import httpx
import nest_asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from strands import Agent, tool
from strands_tools import calculator

## Method 1 - Async Iterators for Streaming


Strands Agents provides support for asynchronous iterators through the `stream_async` method, enabling real-time streaming of agent responses in asynchronous environments like web servers, APIs, and other async applications.

Since we are show casing this example in a notebook, we need to apply `nest_asyncio` to allow nested use of `asyncio.run` and `loop.run_until_complete`

In [3]:
nest_asyncio.apply()

### Creating and invoking agent with stream_async

Let's now create our agent with a built-in calculator tool and no `callback_handler`. We will use the `stream_async` method to iteract over the streamed agent events

In [6]:
# Initialize our agent without a callback handler
agent = Agent(
    model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",  # Optional: Specify the model ID
    tools=[calculator], 
    callback_handler=None)

# Async function that iterators over streamed agent events


async def process_streaming_response():
    agent_stream = agent.stream_async("Calculate derivative of sqrt(sin(x))")
    async for event in agent_stream:
        print(event)


# Run the agent
asyncio.run(process_streaming_response())

{'init_event_loop': True}
{'start': True}
{'start_event_loop': True}
{'event': {'messageStart': {'role': 'assistant'}}}
{'event': {'contentBlockDelta': {'delta': {'text': "I'll calculate the"}, 'contentBlockIndex': 0}}}
{'data': "I'll calculate the", 'delta': {'text': "I'll calculate the"}, 'agent': <strands.agent.agent.Agent object at 0x7f6116e46690>, 'event_loop_cycle_id': UUID('b67ab6cf-26e1-4792-8d58-6528b203195f'), 'request_state': {}, 'event_loop_cycle_trace': <strands.telemetry.metrics.Trace object at 0x7f6116a096d0>, 'event_loop_cycle_span': NonRecordingSpan(SpanContext(trace_id=0x00000000000000000000000000000000, span_id=0x0000000000000000, trace_flags=0x00, trace_state=[], is_remote=False))}
{'event': {'contentBlockDelta': {'delta': {'text': ' derivative of sqrt(sin(x'}, 'contentBlockIndex': 0}}}
{'data': ' derivative of sqrt(sin(x', 'delta': {'text': ' derivative of sqrt(sin(x'}, 'agent': <strands.agent.agent.Agent object at 0x7f6116e46690>, 'event_loop_cycle_id': UUID('b67a

{'message': {'role': 'user', 'content': [{'toolResult': {'status': 'success', 'content': [{'text': 'Result: cos(x)/(2*sqrt(sin(x)))'}], 'toolUseId': 'tooluse_FjX47jAMTTyaPQexrs-QIA'}}]}}
{'start': True}
{'start': True}
{'start_event_loop': True}
{'event': {'messageStart': {'role': 'assistant'}}}
{'event': {'contentBlockDelta': {'delta': {'text': 'The derivative of'}, 'contentBlockIndex': 0}}}
{'data': 'The derivative of', 'delta': {'text': 'The derivative of'}, 'agent': <strands.agent.agent.Agent object at 0x7f6116e46690>, 'event_loop_cycle_id': UUID('d27e3db4-8a60-4156-8928-3f710e59012e'), 'request_state': {}, 'event_loop_cycle_trace': <strands.telemetry.metrics.Trace object at 0x7f61229c9400>, 'event_loop_cycle_span': NonRecordingSpan(SpanContext(trace_id=0x00000000000000000000000000000000, span_id=0x0000000000000000, trace_flags=0x00, trace_state=[], is_remote=False)), 'model': <strands.models.bedrock.BedrockModel object at 0x7f6154356ab0>, 'system_prompt': None, 'messages': [{'role

###  Tracking event loop lifecycle

This example illustrates the event loop lifecycle and how events relate to each other. It's useful for understanding the flow of execution in the Strands Agent:

Let's create some printing format code to better analyse the agent stream events. We will continue to use the same agent for it

In [8]:
## Async function that iterators over streamed agent events

async def process_streaming_response():
    agent_stream = agent.stream_async("What is the derivative of sqrt(sin(x))?")
    async for event in agent_stream:
        # Track event loop lifecycle
        if event.get("init_event_loop", False):
            print("🔄 Event loop initialized")
        elif event.get("start_event_loop", False):
            print("▶️ Event loop cycle starting")
        elif event.get("start", False):
            print("📝 New cycle started")
        elif "message" in event:
            print(f"📬 New message created: {event['message']['role']}")
        elif event.get("force_stop", False):
            print(
                f"🛑 Event loop force-stopped: {event.get('force_stop_reason', 'unknown reason')}"
            )

        # Track tool usage
        if "current_tool_use" in event and event["current_tool_use"].get("name"):
            tool_name = event["current_tool_use"]["name"]
            print(f"🔧 Using tool: {tool_name}")

        # Show only a snippet of text to keep output clean
        if "data" in event:
            # Only show first 20 chars of each chunk for demo purposes
            data_snippet = event["data"][:20] + (
                "..." if len(event["data"]) > 20 else ""
            )
            print(f"📟 Text: {data_snippet}")

    return event["result"]


# Run the agent
asyncio.run(process_streaming_response())

🔄 Event loop initialized
📝 New cycle started
▶️ Event loop cycle starting
📟 Text: I'll calculate the
📟 Text:  derivative of sqrt
📟 Text: (sin(x
📟 Text: ))
📟 Text:  for you.
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
🔧 Using tool: calculator
📬 New message created: assistant


📬 New message created: user
📝 New cycle started
📝 New cycle started
▶️ Event loop cycle starting
📟 Text: The derivative of
📟 Text:  √sin(x
📟 Text: ) is:

cos
📟 Text: (x)/(2·√sin
📟 Text: (x))

This result ca...
📟 Text:  be written as:
cos(...
📟 Text: )/(2·sin(x)
📟 Text: ^(1/2))

The
📟 Text:  calculation uses th...
📟 Text: 
- For the
📟 Text:  outer function f(u
📟 Text: ) = √u, the
📟 Text:  derivative is f'(u)...
📟 Text:  1/(2√u)
📟 Text: 
- For the inner fun...
📟 Text:  = sin(x), the deriv...
📟 Text:  u' = cos(x
📟 Text: )
- Applying the cha...
📟 Text: : f'(
📟 Text: u)·u' =
📟 Text:  [1/(2√
📟 Text: sin(x))]
📟 Text: ·cos(x) = cos(
📟 Text: x)/(2
📟 Text: ·√sin(x))
📬 New message created: assistant


AgentResult(stop_reason='end_turn', message={'role': 'assistant', 'content': [{'text': "The derivative of √sin(x) is:\n\ncos(x)/(2·√sin(x))\n\nThis result can also be written as:\ncos(x)/(2·sin(x)^(1/2))\n\nThe calculation uses the chain rule:\n- For the outer function f(u) = √u, the derivative is f'(u) = 1/(2√u)\n- For the inner function u = sin(x), the derivative is u' = cos(x)\n- Applying the chain rule: f'(u)·u' = [1/(2√sin(x))]·cos(x) = cos(x)/(2·√sin(x))"}]}, metrics=EventLoopMetrics(cycle_count=6, tool_metrics={'calculator': ToolMetrics(tool={'toolUseId': 'tooluse_gXwQDgD3QzqZNNM9eh8Rbw', 'name': 'calculator', 'input': {'mode': 'derive', 'wrt': 'x', 'expression': 'sqrt(sin(x))'}}, call_count=3, success_count=3, error_count=0, total_time=0.016903400421142578)}, cycle_durations=[2.897745132446289, 1.2306797504425049, 3.173321008682251], traces=[<strands.telemetry.metrics.Trace object at 0x7f6116a096d0>, <strands.telemetry.metrics.Trace object at 0x7f61229c9400>, <strands.telemetry

In [11]:
# Async function that iterators over streamed agent events

async def process_streaming_response():
    agent_stream = agent.stream_async("What is the capital of France and what are the largest numbers in any system?")
    async for event in agent_stream:
        # Track event loop lifecycle
        if event.get("init_event_loop", False):
            print("🔄 Event loop initialized")
        elif event.get("start_event_loop", False):
            print("▶️ Event loop cycle starting")
        elif event.get("start", False):
            print("📝 New cycle started")
        elif "message" in event:
            print(f"📬 New message created: {event['message']['role']}")
        elif event.get("force_stop", False):
            print(
                f"🛑 Event loop force-stopped: {event.get('force_stop_reason', 'unknown reason')}"
            )

        # Track tool usage
        if "current_tool_use" in event and event["current_tool_use"].get("name"):
            tool_name = event["current_tool_use"]["name"]
            print(f"🔧 Using tool: {tool_name}")

        # Show only a snippet of text to keep output clean
        if "data" in event:
            # Only show first 20 chars of each chunk for demo purposes
            data_snippet = event["data"][:20] + (
                "..." if len(event["data"]) > 20 else ""
            )
            print(f"📟 Text: {data_snippet}")

    return event["result"]


# Run the agent
asyncio.run(process_streaming_response())

🔄 Event loop initialized
📝 New cycle started
▶️ Event loop cycle starting
📟 Text: I'll answer
📟 Text:  both of your questi...
📟 Text: .

1.
📟 Text:  The capital of Fran...
📟 Text:  is Paris.
📟 Text: 

2.
📟 Text:  Regarding the large...
📟 Text:  various systems:

I...
📟 Text:  the International
📟 Text:  (Short
📟 Text:  Scale) System
📟 Text: :
* Million
📟 Text:  = 10^
📟 Text: 6
📟 Text: 
* Billion =
📟 Text:  10^9
📟 Text: 
* Trillion
📟 Text:  = 10^
📟 Text: 12
*
📟 Text:  Quadrillion =
📟 Text:  10^15
📟 Text: 
* Quint
📟 Text: illion = 10
📟 Text: ^18
*
📟 Text:  Sextillion
📟 Text:  = 10^
📟 Text: 21
*
📟 Text:  Septillion = 
📟 Text: 10^24
📟 Text: 
* Octillion
📟 Text:  = 10^
📟 Text: 27
*
📟 Text:  Nonillion = 
📟 Text: 10^30
📟 Text: 
* Decillion
📟 Text:  = 10^
📟 Text: 33
* Cent
📟 Text: illion = 10
📟 Text: ^303
📟 Text: 

In the Hindu-
📟 Text: Arabic System:
📟 Text: 
* Lakh
📟 Text:  = 10^
📟 Text: 5
*
📟 Text:  Crore = 
📟 Text: 10^7
📟 Text: 
* Arab = 
📟 Text: 10^9
📟 Text: 
* Kha
📟 Text: rab = 10
📟 Text:

AgentResult(stop_reason='end_turn', message={'role': 'assistant', 'content': [{'text': "I'll answer both of your questions.\n\n1. The capital of France is Paris.\n\n2. Regarding the largest numbers in various systems:\n\nIn the International (Short Scale) System:\n* Million = 10^6\n* Billion = 10^9\n* Trillion = 10^12\n* Quadrillion = 10^15\n* Quintillion = 10^18\n* Sextillion = 10^21\n* Septillion = 10^24\n* Octillion = 10^27\n* Nonillion = 10^30\n* Decillion = 10^33\n* Centillion = 10^303\n\nIn the Hindu-Arabic System:\n* Lakh = 10^5\n* Crore = 10^7\n* Arab = 10^9\n* Kharab = 10^11\n* Neel = 10^13\n* Padma = 10^15\n* Shankh = 10^17\n\nIn Mathematics, there are even larger named numbers:\n* Googol = 10^100\n* Googolplex = 10^(10^100) (impossibly large)\n* Graham's Number (far larger than a googolplex)\n\nThere's also Infinity (∞) which isn't a specific number but represents a quantity without bound.\n\nSome of the largest named numbers were created by mathematicians specifically to co

### FastAPI Integration

You can also integrate your `stream_async` with FastAPI to create a streaming endpoint to your applications. For this, we will add a `weather_forecast` tool to our agent. The architecture update looks as following

<div style="text-align:left;">
    <img src="images/architecture_2.png" width="65%" />
</div>

In [12]:
# Tool definition

@tool
def weather_forecast(city: str, days: int = 3) -> str:
    return f"Weather forecast for {city} for the next {days} days..."


# FastAPI app
app = FastAPI()


class PromptRequest(BaseModel):
    prompt: str


@app.post("/stream")
async def stream_response(request: PromptRequest):
    async def generate():
        agent = Agent(tools=[calculator, weather_forecast], callback_handler=None)
        try:
            async for event in agent.stream_async(request.prompt):
                if "data" in event:
                    yield event["data"]
        except Exception as e:
            yield f"Error: {str(e)}"

    return StreamingResponse(generate(), media_type="text/plain")


# Function to start server without blocking


async def start_server():
    config = uvicorn.Config(app, host="0.0.0.0", port=8001, log_level="info")
    server = uvicorn.Server(config)
    await server.serve()


# Run server as background task
if "server_task" not in globals():
    server_task = asyncio.create_task(start_server())
    await asyncio.sleep(0.1)  # Give server time to start

print("✅ Server is running at http://0.0.0.0:8001")

INFO:     Started server process [26740]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8001 (Press CTRL+C to quit)


✅ Server is running at http://0.0.0.0:8001
INFO:     127.0.0.1:60670 - "POST /stream HTTP/1.1" 200 OK


#### Invoking the FastAPI agent
And we can now invoke the agent with a prompt 

In [13]:
async def fetch_stream():
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://0.0.0.0:8001/stream",
            json={"prompt": "What is weather in NYC?"},
        ) as response:
            async for line in response.aiter_lines():
                if line.strip():  # Skip empty lines
                    print("Received:", line)


await fetch_stream()

Received: I'll check the current weather forecast for New York City for you.Here is the current weather forecast for New York City (NYC) for the next 3 days. Let me know if you'd like information for a different time period or if you have any other questions about the weather!


## Method 2 - Callback Handlers for streaming

Callback handlers are a powerful feature of the Strands Agents that allow you to intercept and process events as they happen during agent execution. This enables real-time monitoring, custom output formatting, and integration with external systems.



Callback handlers receive events in real-time as they occur during an agent's lifecycle:

- Text generation from the model
- Tool selection and execution
- Reasoning process
- Errors and completions


Let's now create a custom callback handler function that formats the event inputs to highlight tool usage and model output. To do so, we will again use the agent with a calculator tool only

<div style="text-align:left;">
    <img src="images/architecture.png" width="65%" />
</div>

In [14]:
def custom_callback_handler(**kwargs):
    # Process stream data
    if "data" in kwargs:
        print(f"MODEL OUTPUT: {kwargs['data']}")
    elif "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"):
        print(f"\nUSING TOOL: {kwargs['current_tool_use']['name']}")


# Create an agent with custom callback handler
agent = Agent(   
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",  # Optional: Specify the model ID
        tools=[calculator], 
        callback_handler=custom_callback_handler)

agent("Calculate 2+2")

MODEL OUTPUT: I can
MODEL OUTPUT:  help you calculate 
MODEL OUTPUT: 2+2 
MODEL OUTPUT: using the calculator tool
MODEL OUTPUT: .

USING TOOL: calculator

USING TOOL: calculator

USING TOOL: calculator

USING TOOL: calculator

USING TOOL: calculator


MODEL OUTPUT: The answer
MODEL OUTPUT:  to
MODEL OUTPUT:  2+2
MODEL OUTPUT:  is 
MODEL OUTPUT: 4.


AgentResult(stop_reason='end_turn', message={'role': 'assistant', 'content': [{'text': 'The answer to 2+2 is 4.'}]}, metrics=EventLoopMetrics(cycle_count=2, tool_metrics={'calculator': ToolMetrics(tool={'toolUseId': 'tooluse_YAW6icKBS3uBEv1DlMg9KA', 'name': 'calculator', 'input': {'expression': '2+2'}}, call_count=1, success_count=1, error_count=0, total_time=0.004476308822631836)}, cycle_durations=[0.8183438777923584], traces=[<strands.telemetry.metrics.Trace object at 0x7f611629be60>, <strands.telemetry.metrics.Trace object at 0x7f612014a900>], accumulated_usage={'inputTokens': 3978, 'outputTokens': 85, 'totalTokens': 4063}, accumulated_metrics={'latencyMs': 2303}), state={})

### Congratulations!

In this notebook you learned how to stream your agents outputs using async iteractors and callback handlers. 