In [None]:
from agents import Agent, Runner, function_tool, ItemHelpers, SQLiteSession

session = SQLiteSession("user_1", "ai-memory.db")

@function_tool
def get_weather(city: str):
    """Get weather by city"""
    return "30 degrees"

agent = Agent(
    name="Assistant Agent",
    instructions="You are a helpful assistant. Use tools when needed to answer questions",
    tools=[get_weather],
)

stream = Runner.run_streamed(
    agent, "Hello how are you? What is the weather in the capital of Spain?"
)

async for event in stream.stream_events():
    if event.type == "raw_response_event":
        continue
    elif event.type == "agent_updated_stream_event":
        print("Agent updated to", event.new_agent.name)
    elif event.type == "run_item_stream_event":
        if event.item.type == "tool_call_item":
            print(event.item.raw_item.to_dict())
        elif event.item.type == "tool_call_output_item":
            print(event.item.output)
        elif event.item.type == "message_output_item":
            print(ItemHelpers.text_message_output(event.item))
    print("=" * 20)

In [None]:
from agents import Agent, Runner, function_tool, ItemHelpers, SQLiteSession

session = SQLiteSession("user_1", "ai-memory.db")

@function_tool
def get_weather(city: str):
    """Get weather by city"""
    return "30 degrees"

agent = Agent(
    name="Assistant Agent",
    instructions="You are a helpful assistant. Use tools when needed to answer questions",
    tools=[get_weather],
)

# 스트리밍 방식
stream = Runner.run_streamed(
    agent, "Hello how are you? What is the weather in the capital of Spain?"
)

message = ""
args = ""

async for event in stream.stream_events():
    if event.type == "raw_response_event":
        event_type = event.data.type
        if event_type == "response.output_text.delta":
            message += event.data.delta
            print(message)
        elif event_type == "response.function_call_arguments.delta":
            args += event.data.delta
            print(args)
        elif event_type == "response.completed":
            message = ""
            args = ""

# 세션을 이용한 메모리 기능
result = await Runner.run(
    agent,
    "What was my name again?",
    session=session,
)

print(result.final_output)

In [None]:
from agents import Agent, Runner, function_tool, ItemHelpers, SQLiteSession

# 세션 초기화 (대화 메모리용)
session = SQLiteSession("user_1", "ai-memory.db")

@function_tool
def get_weather(city: str):
    """Get weather by city"""
    return "30 degrees"

agent = Agent(
    name="Assistant Agent",
    instructions="You are a helpful assistant. Use tools when needed to answer questions",
    tools=[get_weather],
)

# 1. 스트리밍으로 실시간 응답 받기
stream = Runner.run_streamed(
    agent, "Hello how are you? What is the weather in the capital of Spain?"
)

message = ""
args = ""

async for event in stream.stream_events():
    if event.type == "raw_response_event":
        event_type = event.data.type
        if event_type == "response.output_text.delta":
            message += event.data.delta
            print(message)
        elif event_type == "response.function_call_arguments.delta":
            args += event.data.delta
            print(args)
        elif event_type == "response.completed":
            message = ""
            args = ""

# 2. 메모리 기능을 사용한 질문
result = await Runner.run(
    agent,
    "What was my name again?",
    session=session,
)

print(result.final_output)

# 3. 세션에 저장된 대화 내역 확인
conversation_history = await session.get_items()
print(conversation_history)