# Strands Agent SDK

In [None]:
import os
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys, os
module_path = "../.."
sys.path.append(os.path.abspath(module_path))

In [None]:
import os
from strands import Agent
from strands.models import BedrockModel
from src.utils.bedrock import bedrock_info
from botocore.config import Config

## 1. Utils

### 1.1 Get llm model by inference type

In [None]:
def get_model(**kwargs):

    llm_type = kwargs["llm_type"]
    cache_type = kwargs["cache_type"]
    enable_reasoning = kwargs["enable_reasoning"]

    if llm_type in ["claude-sonnet-3-7", "claude-sonnet-4"]:
        
        if llm_type == "claude-sonnet-3-7": model_name = "Claude-V3-7-Sonnet-CRI"
        elif llm_type == "claude-sonnet-4": model_name = "Claude-V4-Sonnet-CRI"

        ## BedrockModel params: https://strandsagents.com/latest/api-reference/models/?h=bedrockmodel#strands.models.bedrock.BedrockModel
        llm = BedrockModel(
            model_id=bedrock_info.get_model_id(model_name=model_name),
            streaming=True,
            max_tokens=8192*5,
            stop_sequences=["\n\nHuman"],
            temperature=1 if enable_reasoning else 0.01, 
            additional_request_fields={
                "thinking": {
                    "type": "enabled" if enable_reasoning else "disabled", 
                    **({"budget_tokens": 8192} if enable_reasoning else {}),
                }
            },
            cache_prompt=cache_type, # None/ephemeral/defalut
            #cache_tools: Cache point type for tools
            boto_client_config=Config(
                read_timeout=900,
                connect_timeout=900,
                retries=dict(max_attempts=50, mode="adaptive"),
            )
        )   
    elif llm_type == "claude-sonnet-3-5-v-2":
        ## BedrockModel params: https://strandsagents.com/latest/api-reference/models/?h=bedrockmodel#strands.models.bedrock.BedrockModel
        llm = BedrockModel(
            model_id=bedrock_info.get_model_id(model_name="Claude-V3-5-V-2-Sonnet-CRI"),
            streaming=True,
            max_tokens=8192,
            stop_sequences=["\n\nHuman"],
            temperature=0.01,
            cache_prompt=cache_type, # None/ephemeral/defalut
            #cache_tools: Cache point type for tools
            boto_client_config=Config(
                read_timeout=900,
                connect_timeout=900,
                retries=dict(max_attempts=50, mode="standard"),
            )
        )
    else:
        raise ValueError(f"Unknown LLM type: {llm_type}")

    return llm

### 1.2 Create agent

In [None]:
from datetime import datetime
from strands.agent.conversation_manager import SlidingWindowConversationManager

In [None]:
class Colors:
    BLUE = '\033[94m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    RED = '\033[91m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
    END = '\033[0m'

def apply_prompt_template(prompt_name: str, prompt_context={}) -> str:
    
    system_prompts = open(os.path.join("./prompts", f"{prompt_name}.md")).read()    
    #system_prompts = open(os.path.join(os.path.dirname(__file__), f"{prompt_name}.md")).read()
    context = {"CURRENT_TIME": datetime.now().strftime("%a %b %d %Y %H:%M:%S %z")}
    context.update(prompt_context)
    system_prompts = system_prompts.format(**context)
        
    return system_prompts

class ConversationEditor(SlidingWindowConversationManager):

    """
    Manager that only operates on overflow.

        Args:
            window_size (int, optional): Maximum number of messages to retain when
                context overflow occurs. Defaults to 20.
            should_truncate_results (bool, optional): If True, truncate large tool
                results with a placeholder message when overflow happens. If False,
                preserve full tool results but remove more historical messages.
                Defaults to True.
    """

    def __init__(self, window_size=7, should_truncate_results=False):
        super().__init__(
            window_size=window_size,
            should_truncate_results=should_truncate_results
        )

    def apply_management(self, agent, **kwargs):
        """After each event loop - do nothing"""
        print("None")
        pass

    def reduce_context(self, agent, e=None, **kwargs):
        """Only on overflow - use parent class's reduce_context"""
        print(f"⚠️ Overflow occurred! Cleaning up {len(agent.messages)} messages...")

        # 부모 클래스의 reduce_context는 should_truncate_results를 자동으로 처리
        super().reduce_context(agent, e, **kwargs)

        print(f"✅ Cleanup complete: {len(agent.messages)} messages remaining")

In [None]:
def get_agent(**kwargs):

    agent_name, system_prompts = kwargs["agent_name"], kwargs["system_prompts"]
    agent_type = kwargs.get("agent_type", "claude-sonnet-3-7")
    enable_reasoning = kwargs.get("enable_reasoning", False)
    prompt_cache_info = kwargs.get("prompt_cache_info", (False, None)) # (True, "default")
    tools = kwargs.get("tools", None)
    streaming = kwargs.get("streaming", True)
    
    context_overflow_window_size = kwargs.get("context_overflow_window_size", 15)
    context_overflow_should_truncate_results = kwargs.get("context_overflow_should_truncate_results", False)

    prompt_cache, cache_type = prompt_cache_info
    if prompt_cache: print(f"{Colors.GREEN}{agent_name.upper()} - Prompt Cache Enabled{Colors.END}")
    else: print(f"{Colors.GREEN}{agent_name.upper()} - Prompt Cache Disabled{Colors.END}")

    llm = get_model(llm_type=agent_type, cache_type=cache_type, enable_reasoning=enable_reasoning)
    llm.config["streaming"] = streaming

    agent = Agent(
        model=llm,
        system_prompt=system_prompts,
        tools=tools,
        conversation_manager=ConversationEditor(
            window_size=context_overflow_window_size,
            should_truncate_results=context_overflow_should_truncate_results
        ),
        callback_handler=None # async iterator로 대체 하기 때문에 None 설정
    )

    return agent

### 1.3 Response with streaming

In [None]:
import traceback
from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

In [None]:
class ColoredStreamingCallback(StreamingStdOutCallbackHandler):
    COLORS = {
        'blue': '\033[94m',
        'green': '\033[92m',
        'yellow': '\033[93m',
        'red': '\033[91m',
        'purple': '\033[95m',
        'cyan': '\033[96m',
        'white': '\033[97m',
    }
    
    def __init__(self, color='blue'):
        super().__init__()
        self.color_code = self.COLORS.get(color, '\033[94m')
        self.reset_code = '\033[0m'
    
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"{self.color_code}{token}{self.reset_code}", end="", flush=True)

In [None]:
async def process_streaming_response(agent, message):
    callback_reasoning, callback_answer = ColoredStreamingCallback('purple'), ColoredStreamingCallback('white')
    response = {"text": "","reasoning": "", "signature": "", "tool_use": None, "cycle": 0}
    try:
        agent_stream = agent.stream_async(message)
        async for event in agent_stream:
            if "reasoningText" in event:
                response["reasoning"] += event["reasoningText"]
                callback_reasoning.on_llm_new_token(event["reasoningText"])
            elif "reasoning_signature" in event:
                response["signature"] += event["reasoning_signature"]
            elif "data" in event:
                response["text"] += event["data"]
                callback_answer.on_llm_new_token(event["data"])
            elif "current_tool_use" in event and event["current_tool_use"].get("name"):
                response["tool_use"] = event["current_tool_use"]["name"]
                if "event_loop_metrics" in event:
                    if response["cycle"] != event["event_loop_metrics"].cycle_count:
                        response["cycle"] = event["event_loop_metrics"].cycle_count
                        callback_answer.on_llm_new_token(f' \n## Calling tool: {event["current_tool_use"]["name"]} - # Cycle: {event["event_loop_metrics"].cycle_count}\n')
    except Exception as e:
        print(f"Error in streaming response: {e}")
        print(traceback.format_exc())  # Detailed error logging
    
    return agent, response

In [None]:
async def process_streaming_response_yeild(agent, message):
    callback_reasoning, callback_answer = ColoredStreamingCallback('purple'), ColoredStreamingCallback('white')
    response = {"text": "","reasoning": "", "signature": "", "tool_use": None, "cycle": 0}
    try:
        agent_stream = agent.stream_async(message)
        async for event in agent_stream:
            if "reasoningText" in event:
                response["reasoning"] += event["reasoningText"]
                callback_reasoning.on_llm_new_token(event["reasoningText"])
            elif "reasoning_signature" in event:
                response["signature"] += event["reasoning_signature"]
            elif "data" in event:
                response["text"] += event["data"]
                callback_answer.on_llm_new_token(event["data"])
            elif "current_tool_use" in event and event["current_tool_use"].get("name"):
                response["tool_use"] = event["current_tool_use"]["name"]
                if "event_loop_metrics" in event:
                    if response["cycle"] != event["event_loop_metrics"].cycle_count:
                        response["cycle"] = event["event_loop_metrics"].cycle_count
                        callback_answer.on_llm_new_token(f' \n## Calling tool: {event["current_tool_use"]["name"]} - # Cycle: {event["event_loop_metrics"].cycle_count}\n')
    except Exception as e:
        print(f"Error in streaming response: {e}")
        print(traceback.format_exc())  # Detailed error logging
    
    return agent, response

## 2. Usage

### 2.1 Agent definition

- system prompt

In [None]:
%%writefile ./prompts/task_agent.md
---
CURRENT_TIME: {CURRENT_TIME}
---

You are Bedrock-Manus, a friendly AI assistant developed by AWS AIML Specialist SA Dongjin Jang.
You specialize in handling greetings, small talk, and knowledge-based question answering using available tools.

## Available Tools

You have access to the following tools that you should use when appropriate:

### 1. RAG Tool (rag_tool)
**When to use**: Use this tool when users ask questions that require information from a knowledge base or document collection. This includes:
- Questions about specific topics that might be documented
- Requests for factual information that could be in indexed documents
- Queries about policies, procedures, or technical documentation
- Any question where you need to retrieve and reference specific information

**What it does**: Performs Retrieval-Augmented Generation (RAG) by searching through indexed documents in OpenSearch and generating contextual answers based on retrieved information.

**Input**: A query string containing the user's question

**Example scenarios**:
- "What is the investment return rate for maturity repayment?"
- "Can you explain the company's vacation policy?"
- "How does the authentication system work?"

### 2. Python REPL Tool (python_repl_tool)
**When to use**: Use this tool when users need to execute Python code or perform data analysis:
- Running Python scripts or code snippets
- Data analysis and calculations
- Testing code functionality
- Mathematical computations

**What it does**: Executes Python code in a REPL environment and returns the output

**Input**: Python code string

### 3. Bash Tool (bash_tool) 
**When to use**: Use this tool when users need to execute system commands or perform file operations:
- Running shell commands
- File system operations (ls, mkdir, etc.)
- System information queries
- Development tasks requiring command line operations

**What it does**: Executes bash commands and returns the output

**Input**: A bash command string

## Tool Usage Guidelines

1. **Assess the user's request** - Determine if the question requires tool usage
2. **Choose the appropriate tool** - Select based on the type of information needed
3. **Use RAG tool for knowledge queries** - When the user asks about topics that might be in your knowledge base
4. **Use Python REPL for code execution** - When the user needs to run Python code or perform calculations
5. **Use Bash tool for system operations** - When the user needs to interact with the system
6. **Provide helpful responses** - Always explain the results in a user-friendly way

## Response Style

- Be friendly and conversational
- Provide clear, helpful answers
- When using tools, explain what you're doing and why
- If a tool doesn't provide the needed information, acknowledge this and offer alternatives
- Always prioritize user experience and clarity

Remember to use tools proactively when they can help answer user questions more accurately or completely.

In [None]:
agent = get_agent(
    agent_name="task_agent",
    system_prompts=apply_prompt_template(prompt_name="task_agent", prompt_context={}),
    agent_type="claude-sonnet-4",  # claude-sonnet-3-7, claude-sonnet-4
    prompt_cache_info=(True, "default"),  # enable prompt caching for reasoning agent
    streaming=True,
)

### 2.2 Execution

In [None]:
import asyncio
import nest_asyncio
nest_asyncio.apply()

In [None]:
message = "안녕 나는 장동진이야"
agent, response = asyncio.run(process_streaming_response(agent, message))


In [None]:
from typing import Dict, Any
async def _convert_to_agentcore_event(
    strands_event: Dict[str, Any],
    agent_name: str,
    session_id: str
) -> Dict[str, Any]:
    """Strands 이벤트를 AgentCore 스트리밍 형식으로 변환"""
    
    base_event = {
        "timestamp": datetime.now().isoformat(),
        "session_id": session_id,
        "agent_name": agent_name,
        "source": "strands_data_analysis_graph"
    }
    
    # 텍스트 데이터 이벤트
    if "data" in strands_event:
        return {
            **base_event,
            "type": "agent_text_stream",
            "event_type": "text_chunk",
            "data": strands_event["data"],
            "chunk_size": len(strands_event["data"])
        }
    
    # 도구 사용 이벤트
    elif "current_tool_use" in strands_event:
        tool_info = strands_event["current_tool_use"]
        return {
            **base_event,
            "type": "agent_tool_stream",
            "event_type": "tool_use",
            "tool_name": tool_info.get("name", "unknown"),
            "tool_id": tool_info.get("toolUseId"),
            "tool_input": tool_info.get("input", {})
        }
    
    # 추론 이벤트
    elif "reasoning" in strands_event and strands_event.get("reasoning"):
        return {
            **base_event,
            "type": "agent_reasoning_stream",
            "event_type": "reasoning",
            "reasoning_text": strands_event.get("reasoningText", "")[:200]
        }
    
    return None

In [None]:
async def process_agent_stream(agent, message):
    coordinator_result = ""
    agent_stream = agent.stream_async(message)
    session_id = "123"

    async for event in agent_stream:
        #Strands 이벤트를 AgentCore 형식으로 변환
        agentcore_event = await _convert_to_agentcore_event(
            event, "coordinator", session_id
        )
        if agentcore_event:
            yield agentcore_event

            # 결과 텍스트 누적
            if agentcore_event.get("event_type") == "text_chunk":
                coordinator_result += agentcore_event.get("data", "")

async def node(agent, message):
    async for event in process_agent_stream(agent, message):
        yield(event)

# 실행 함수 정의 - async generator를 소비하는 coroutine
async def main():
    message = "안녕 나는 장동진이야"
    async for event in node(agent, message):
        print(f"Event: {event}")

# 실행
asyncio.run(main())

## 3. Tools

In [None]:
from src.tools import python_repl_tool, bash_tool, rag_tool

In [None]:
get_agent?


In [None]:
agent = get_agent(
    agent_name="task_agent",
    system_prompts=apply_prompt_template(prompt_name="task_agent", prompt_context={}),
    agent_type="claude-sonnet-4",  # claude-sonnet-3-7, claude-sonnet-4
    enable_reasoning=True,
    prompt_cache_info=(True, "default"),  # enable prompt caching for reasoning agent
    streaming=True,
    tools=[python_repl_tool, bash_tool, rag_tool],
)

In [None]:
message = "./prompts 디렉토리에 어떤 파일이 있는지 확인해 줄래?"
agent, response = asyncio.run(process_streaming_response(agent, message))

In [None]:
message = "Hello world 를 프린팅하는 파이썬 코드를 작성하고 실행시켜 줄래?"
agent, response = asyncio.run(process_streaming_response(agent, message))

In [None]:
message = "만기 상환에 따른 수익률을 알려줄래?"
#message = "만기 상환에 따른 수익률을 알려줄래?, 툴 결과를 받아서 정리하지말고 '완료' 라고만 말해줘"
agent, response = asyncio.run(process_streaming_response(agent, message))

## 4. built-in utility

In [None]:
from pprint import pprint

### 4.1 Check agent

- Syetem prompt

In [None]:
system_prompt = agent.system_prompt
pprint(system_prompt)

- Message history

In [None]:
agent_messages = agent.messages
pprint(agent_messages)

- observility

In [None]:
pprint(agent.event_loop_metrics)

- Resume

In [None]:
llm_ = BedrockModel(
    model_id=bedrock_info.get_model_id(model_name="Claude-V3-7-Sonnet-CRI"),
    streaming=True,
    max_tokens=8192,
    stop_sequencesb=["\n\nHuman"],
    temperature=0.01,
    cache_prompt=None, # None/ephemeral/defalut
    #cache_tools: Cache point type for tools
    boto_client_config=Config(
        read_timeout=900,
        connect_timeout=900,
        retries=dict(max_attempts=50, mode="standard"),
    )
)


agent_ = Agent(
    model=llm_,
    tools=[python_repl_tool, bash_tool],
    system_prompt=system_prompt,
    messages=agent_messages,
    callback_handler=None # async iterator로 대체 하기 때문에 None 설정
)

In [None]:
message = "이어서 대화 하는거 맞니?"
agent, response = asyncio.run(process_streaming_response(agent_, message))

### 4.2 [Conversation management](https://strandsagents.com/latest/documentation/docs/user-guide/concepts/agents/conversation-management/?h=conversa)

As conversations grow, managing this context becomes increasingly important for several reasons:

- **Token Limits**: Language models have fixed context windows (maximum tokens they can process)
- **Performance**: Larger contexts require more processing time and resources
- **Relevance**: Older messages may become less relevant to the current conversation
- **Coherence**: Maintaining logical flow and preserving important information


#### 4.2.1. SlidingWindowConversationManager
고정된 수의 최근 메시지를 유지하는 슬라이딩 윈도우 전략을 구현합니다. Agent 클래스에서 기본적으로 사용하는 대화 매니저입니다.

In [None]:
from strands.agent.conversation_manager import SlidingWindowConversationManager

In [None]:
# Create a conversation manager with custom window size
conversation_manager = SlidingWindowConversationManager(
    window_size=3,  # Maximum number of messages to keep
    should_truncate_results=True, # Enable truncating the tool result when a message is too large for the model's context window 
)

In [None]:
agent.conversation_manager = conversation_manager

In [None]:
message = "안녕 나는 장동진이야"
agent, response = asyncio.run(process_streaming_response(agent, message))
print ("\n")
pprint (agent.messages)

#### 3.1.2. SummarizingConversationManager

오래된 메시지를 요약하여 중요한 정보를 보존하면서 컨텍스트 한계 내에서 대화를 관리합니다.

**주요 설정:**

| 파라미터 | 타입 | 기본값 | 설명 |
|---------|------|--------|------|
| `summary_ratio` | `float` | `0.3` | 컨텍스트 축소 시 요약할 메시지 비율 (0.1~0.8 범위) |
| `preserve_recent_messages` | `int` | `10` | 항상 유지할 최근 메시지 수 |
| `summarization_agent` | `Agent` | `None` | 요약 생성용 커스텀 에이전트 (system_prompt와 동시 사용 불가) |
| `summarization_system_prompt` | `str` | `None` | 요약용 커스텀 시스템 프롬프트 (agent와 동시 사용 불가) |

> **기본 요약 방식**: 커스텀 설정이 없을 경우, 주요 토픽, 사용된 도구, 기술적 정보를 3인칭 형태의 구조화된 불릿 포인트로 요약합니다.

In [None]:
from strands.agent.conversation_manager import SummarizingConversationManager

In [None]:
# Custom system prompt for technical conversations
custom_system_prompt = """
You are summarizing a technical conversation. Create a concise bullet-point summary that:
- Focuses on code changes, architectural decisions, and technical solutions
- Preserves specific function names, file paths, and configuration details
- Omits conversational elements and focuses on actionable information
- Uses technical terminology appropriate for software development

Format as bullet points without conversational language.
"""

conversation_manager = SummarizingConversationManager(
    summary_ratio=0.3,  # Summarize 30% of messages when context reduction is needed
    preserve_recent_messages=3,  # Always keep 10 most recent messages
    summarization_system_prompt=custom_system_prompt
)

In [None]:
agent.conversation_manager = conversation_manager

In [None]:
message = "안녕 나는 장동진이야"
agent, response = asyncio.run(process_streaming_response(agent, message))
print ("\n")
pprint (agent.messages)