# Amazon Bedrock AgentCore Runtime での Strands Agents を使用したストリーミングレスポンス

## 概要

このチュートリアルでは、Amazon Bedrock AgentCore Runtime を使用してストリーミングレスポンスを実装する方法を学習します。この例では、利用可能になった部分的な結果をストリーミングする方法を実証し、大量のコンテンツを生成する操作や重要な処理時間を要する操作に対してより応答性の高いユーザー体験を提供します。

### チュートリアル詳細

|項目| 詳細|
|:--------------------|-|:---------------------------------------------------------------------------|
| チュートリアル タイプ | ストリーミング機能付き対話型|
| エージェント タイプ  | シングル         |
| エージェントフレームワーク | Strands Agents |
| LLM モデル          | Anthropic Claude Sonnet 3.7 |
| チュートリアル コンポーネント | AgentCore Runtime、Strands Agent、Amazon Bedrockモデルでのストリーミングレスポンス |
| チュートリアル領域   | 横断的                                                                   |
| サンプルの複雑さ      | 簡単                                                                     |
| 使用 SDK            | Amazon BedrockAgentCore Python SDK and boto3|

### チュートリアル アーキテクチャ

このチュートリアルでは、ストリーミングエージェントをAgentCore runtimeにデプロイする方法について説明します。

デモンストレーション目的として、ストリーミング機能を備えたAmazon BedrockモデルでStrands Agentを使用します。

この例では、2つのツール（`get_weather` と `get_time`）を持つシンプルなエージェントを使用しますが、ストリーミングレスポンス機能を追加します。
    
<div style="text-align:left">
    <img src="images/architecture_runtime.png" width="60%"/>
</div>

### チュートリアルの主な機能

* Amazon Bedrock AgentCore Runtime でのエージェントからのストリーミングレスポンス
* リアルタイムでの部分的な結果の配信
* ストリーミング機能を持つ Amazon Bedrock モデルの使用
* 非同期ストリーミングサポートを持つ Strands Agents の使用

## 前提条件

このチュートリアルを実行するには、以下が必要です：
* Python 3.10+
* AWS 認証情報
* Amazon Bedrock AgentCore SDK
* Strands Agents
* 実行中の Docker

In [None]:
!pip install --force-reinstall -U -r requirements.txt --quiet

## AgentCore Runtime でのデプロイ用ストリーミングエージェントの準備

次に、ストリーミングエージェントを AgentCore Runtime にデプロイしましょう。ストリーミング機能は、エントリーポイント関数で非同期ジェネレーターやyield文を使用する際に、AgentCore SDKによって自動的に処理されます。

ストリーミング実装の要点：
* エントリーポイント関数には `async def` を使用
* 利用可能になったチャンクをストリーミングするには `yield` を使用
* AgentCore SDK は自動的に Server-Sent Events（SSE）形式を処理
* クライアントは Content-Type: text/event-stream レスポンスを受信

### Amazon Bedrockモデルとストリーミング機能を持つ Strands Agents
Amazon Bedrockモデルを使用したStrands Agentのストリーミング実装を見てみましょう。

In [None]:
%%writefile strands_claude_streaming.py
from strands import Agent, tool
from strands_tools import calculator # Import the calculator tool
import argparse
import json
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands.models import BedrockModel
import asyncio
from datetime import datetime

app = BedrockAgentCoreApp()

# Create a custom tool 
@tool
def weather():
    """ Get weather """ # Dummy implementation
    return "sunny"

@tool
def get_time():
    """ Get current time """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
model = BedrockModel(
    model_id=model_id,
)
agent = Agent(
    model=model,
    tools=[
        calculator, weather, get_time
    ],
    system_prompt="""You're a helpful assistant. You can do simple math calculations, 
    tell the weather, and provide the current time."""
)

@app.entrypoint
async def strands_agent_bedrock_streaming(payload):
    """
    Invoke the agent with streaming capabilities
    This function demonstrates how to implement streaming responses
    with AgentCore Runtime using async generators
    """
    user_input = payload.get("prompt")
    print("User input:", user_input)
    
    try:
        # Stream each chunk as it becomes available
        async for event in agent.stream_async(user_input):
            if "data" in event:
                yield event["data"]
            
    except Exception as e:
        # Handle errors gracefully in streaming context
        error_response = {"error": str(e), "type": "stream_error"}
        print(f"Streaming error: {error_response}")
        yield error_response

if __name__ == "__main__":
    app.run()

## AgentCore Runtime でのストリーミングの理解

AgentCore Runtime でストリーミングを使用する場合、いくつかのことが自動的に実行されます：

### Server-Sent Events（SSE）形式
* AgentCore SDK は、yielded データを自動的に SSE 形式に変換します
* 各 yield は SSE ストリーム内の `data: ` イベントになります
* Content-Type は自動的に `text/event-stream` に設定されます

### クライアント処理
* クライアントはエージェントが要求を処理する際にリアルタイムで更新を受信します
* これにより、プログレッシブレスポンス表示とより良いユーザー体験が可能になります
* クライアントは完全なレスポンスが準備される前に、部分的な結果を処理できます

### エラー処理
* ストリーミングレスポンスには適切なエラー処理を含める必要があります
* エラーはストリームの一部として yield することができます
* ストリームは、関数が完了するか未処理の例外が発生した場合に終了します

## ストリーミングエージェントの AgentCore Runtime へのデプロイ

`CreateAgentRuntime` オペレーションは包括的な設定オプションをサポートし、コンテナイメージ、環境変数、暗号化設定を指定できます。また、プロトコル設定（HTTP、MCP）と認証メカニズムを構成して、クライアントがエージェントとどのように通信するかを制御することもできます。

**注意:** オペレーションのベストプラクティスは、CI/CDパイプラインとIaCを使用してコードをコンテナとしてパッケージ化し、ECRにプッシュすることです。

このチュートリアルでは、Amazon Bedrock AgentCode Python SDK を使用して、アーティファクトを簡単にパッケージ化し、AgentCore runtime にデプロイします。

### AgentCore Runtime デプロイメントの設定

次に、スターターツールキットを使用して、エントリーポイント、先ほど作成した実行ロール、および requirements ファイルでAgentCore Runtimeデプロイメントを設定します。また、起動時にAmazon ECRリポジトリを自動作成するようにスターターキットを設定します。

設定ステップ中に、アプリケーションコードに基づいてdockerファイルが生成されます

<div style="text-align:left">
    <img src="images/configure.png" width="60%"/>
</div>

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime
from boto3.session import Session
boto_session = Session()
region = boto_session.region_name
region

agentcore_runtime = Runtime()

response = agentcore_runtime.configure(
    entrypoint="strands_claude_streaming.py",
    auto_create_execution_role=True,
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=region,
    agent_name="strands_claude_streaming"
)
response

### ストリーミングエージェントの AgentCore Runtime への起動

dockerファイルができたので、ストリーミングエージェントをAgentCore Runtimeに起動しましょう。これによりAmazon ECRリポジトリとAgentCore Runtimeが作成されます

<div style="text-align:left">
    <img src="images/launch.png" width="85%"/>
</div>

In [None]:
launch_result = agentcore_runtime.launch()

### AgentCore Runtime ステータスの確認
AgentCore Runtime をデプロイしたので、そのデプロイメントステータスを確認しましょう

In [None]:
import time

status_response = agentcore_runtime.status()
status = status_response.endpoint['status']
end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']
while status not in end_status:
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']
    print(status)
status

### ストリーミング機能付き AgentCore Runtime の呼び出し

最後に、ペイロードでAgentCore Runtimeを呼び出し、ストリーミングレスポンスを受信することができます

<div style="text-align:left">
    <img src="images/invoke.png" width="85%"/>
</div>

In [None]:
invoke_response = agentcore_runtime.invoke({
    "prompt": 
    "what the weather is like?"
})
invoke_response

### ストリーミング用 boto3 での AgentCore Runtime の呼び出し

AgentCore Runtime が作成されたので、任意の AWS SDK で呼び出すことができます。ストリーミングレスポンスの場合、Server-Sent Events 形式を処理する必要があります。

In [None]:
import boto3
import json
from IPython.display import Markdown, display

agent_arn = launch_result.agent_arn
agentcore_client = boto3.client(
    'bedrock-agentcore',
    region_name=region
)

# For streaming responses, we need to handle the EventStream
boto3_response = agentcore_client.invoke_agent_runtime(
    agentRuntimeArn=agent_arn,
    qualifier="DEFAULT",
    payload=json.dumps({"prompt": "How much is 2+1"})
)

# Check if the response is streaming
if "text/event-stream" in boto3_response.get("contentType", ""):
    print("Processing streaming response with boto3:")
    content = []
    for line in boto3_response["response"].iter_lines(chunk_size=1):
        if line:
            line = line.decode("utf-8")
            if line.startswith("data: "):
                data = line[6:].replace('"', '')  # Remove "data: " prefix
                print(f"Received streaming chunk: {data}")
                content.append(data.replace('"', ''))
    
    # Display the complete streamed response
    full_response = " ".join(content)
    display(Markdown(full_response))
else:
    # Handle non-streaming response
    try:
        events = []
        for event in boto3_response.get("response", []):
            events.append(event)
    except Exception as e:
        events = [f"Error reading EventStream: {e}"]
    
    if events:
        try:
            response_data = json.loads(events[0].decode("utf-8"))
            display(Markdown(response_data))
        except:
            print(f"Raw response: {events[0]}")

## ストリーミングレスポンスの利点

ストリーミングレスポンスには、いくつかの重要な利点があります：

### ユーザー体験
* **即座のフィードバック**: 利用可能になった部分的な結果をユーザーがすぐに確認
* **パフォーマンス感の向上**: 総時間が同じでも、レスポンスがより高速に感じられる
* **プログレッシブ表示**: 長いレスポンスを段階的に表示可能

### 技術的利点
* **メモリ効率**: すべてをメモリに読み込むことなく、大きなレスポンスを処理
* **タイムアウト防止**: 長時間実行される操作でのタイムアウトを回避
* **リアルタイム処理**: 利用可能になったリアルタイムデータの処理

### 使用例
* **コンテンツ生成**: 長文の記述、レポート、ドキュメント
* **データ分析**: 複雑な計算からの段階的な結果
* **マルチステップワークフロー**: 複雑なエージェント推論を通じての進捗表示
* **リアルタイム監視**: 監視エージェントからのライブ更新

## クリーンアップ（オプション）

作成された AgentCore Runtime をクリーンアップしましょう

In [None]:
launch_result.ecr_uri, launch_result.agent_id, launch_result.ecr_uri.split('/')[1]

In [None]:
agentcore_control_client = boto3.client(
    'bedrock-agentcore-control',
    region_name=region
)
ecr_client = boto3.client(
    'ecr',
    region_name=region
)

runtime_delete_response = agentcore_control_client.delete_agent_runtime(
    agentRuntimeId=launch_result.agent_id,
)

response = ecr_client.delete_repository(
    repositoryName=launch_result.ecr_uri.split('/')[1],
    force=True
)

# おめでとうございます！

Amazon Bedrock AgentCore Runtime を使用したストリーミングエージェントの実装とデプロイに成功しました！

## 学んだこと：
* 非同期ジェネレーターを使用したストリーミングレスポンスの実装方法
* AgentCore Runtime が自動的に SSE 形式を処理する方法
* クライアント側でのストリーミングレスポンスの処理方法
* ユーザー体験とパフォーマンスにおけるストリーミングの利点

## 次のステップ：
* 使用事例に合わせたさまざまなストリーミングパターンの実験
* 複雑なマルチステップワークフロー用のカスタムストリーミングロジックの実装
* ストリーミングと他の AgentCore 機能（Memory や Gateway など）の組み合わせの探索
* より良い UX のためのクライアント側ストリーミング視覚化の実装の検討