# 05 Tool
- https://docs.langchain.com/oss/python/langchain/tools
- Tool이란?
    - 도구는 에이전트가 **행동을 수행**하기 위해 호출하는 구성 요소
    - **명확하게 정의된 입력과 출력**을 통해 세계와 상호작용할 수 있도록 하여 모델 역량을 확장
    - 도구는 **호출 가능한 함수**와 그 **입력 스키마**를 캡슐화합니다.
    - 이 값들은 호환 가능한 채팅 모델에 전달되어, 모델이 **도구를 호출할지 여부**와 **어떤 인수**로 사용할지 결정할 수 있습니다.
    - 이러한 상황에서 툴 호출은 모델이 지정된 **입력 스키마에 부합하는 요청을 생성**할 수 있게 합니다.

## Tool Definition

- 간단한 도구 정의
    - `@tool` 을 통한 도구 정의
    - 기본적으로 함수의 docstring은 도구가 설명되어 모델이 언제 사용해야 하는지 이해할 수 있도록 돕습니다
    - 타입 힌트는 도구의 입력 스키마를 정의하기 때문에 필요합니다. 모델이 도구의 목적을 이해할 수 있도록 간결해야 합니다.

In [1]:
from langchain.tools import tool

@tool
def search_database(query: str, limit: int = 10) -> str:
    """Search the customer database for records matching the query.

    Args:
        query: Search terms to look for
        limit: Maximum number of results to return
    """
    return f"Found {limit} results for '{query}'"

In [2]:
from rich.pretty import pprint as rpprint
rpprint(search_database)

- 사용자 지정 이름

In [3]:
@tool("web_search")  # Custom name - 오버라이드
def search(query: str) -> str:
    """Search the web for information."""
    return f"Results for: {query}"

print(search.name)  # web_search

web_search


In [4]:
rpprint(search)

- 사용자 지정 도구 설명

In [5]:
@tool("calculator", description="Performs arithmetic calculations. Use this for any math problems.")
def calc(expression: str) -> str:
    """Evaluate mathematical expressions."""
    return str(eval(expression))

In [6]:
rpprint(calc)

- Pydantic을 활용한 도구 정의

In [7]:
from pydantic import BaseModel, Field
from typing import Literal

class WeatherInput(BaseModel):
    """Input for weather queries."""
    location: str = Field(description="City name or coordinates")
    units: Literal["celsius", "fahrenheit"] = Field(
        default="celsius",
        description="Temperature unit preference"
    )
    include_forecast: bool = Field(
        default=False,
        description="Include 5-day forecast"
    )

@tool(args_schema=WeatherInput)
def get_weather(location: str, units: str = "celsius", include_forecast: bool = False) -> str:
    """Get current weather and optional forecast."""
    temp = 22 if units == "celsius" else 72
    result = f"Current weather in {location}: {temp} degrees {units[0].upper()}"
    if include_forecast:
        result += "\nNext 5 days: Sunny"
    return result

In [8]:
rpprint(get_weather)

In [16]:
get_weather.args_schema.model_json_schema()

{'description': 'Input for weather queries.',
 'properties': {'location': {'description': 'City name or coordinates',
   'title': 'Location',
   'type': 'string'},
  'units': {'default': 'celsius',
   'description': 'Temperature unit preference',
   'enum': ['celsius', 'fahrenheit'],
   'title': 'Units',
   'type': 'string'},
  'include_forecast': {'default': False,
   'description': 'Include 5-day forecast',
   'title': 'Include Forecast',
   'type': 'boolean'}},
 'required': ['location'],
 'title': 'WeatherInput',
 'type': 'object'}

- Json 스키마를 활용한 도구 정의

In [17]:
weather_schema = {
    "type": "object",
    "properties": {
        "location": {"type": "string"},
        "units": {"type": "string"},
        "include_forecast": {"type": "boolean"}
    },
    "required": ["location", "units", "include_forecast"]
}

@tool(args_schema=weather_schema)
def get_weather(location: str, units: str = "celsius", include_forecast: bool = False) -> str:
    """Get current weather and optional forecast."""
    temp = 22 if units == "celsius" else 72
    result = f"Current weather in {location}: {temp} degrees {units[0].upper()}"
    if include_forecast:
        result += "\nNext 5 days: Sunny"
    return result

In [18]:
rpprint(get_weather)

## Tool Runtime

- Tool Runtime이란?
    - State: 실행을 거치는 변경 가능한 데이터(e.g., messages, counters, custom fields)
    - Context: 사용자 ID, 세션 세부 정보, 애플리케이션 특화 구성 등 변경 불가능한 구성
    - Store: 대화 전반에 걸친 지속적인 장기 기억
    - Stream Writer: 도구가 실행될 때 custom updates를 스트리밍합니다
    - Config: 실행을 위한 `RunnableConfig`
    - Tool Call ID: 현재 도구의 고유 ID

![Tool Runtime](../assets/Tool_Runtime.png)

- Accessing state
    - Chat Model에는 ToolRuntime 매개변수가 보이지 않습니다.

In [19]:
from langchain.tools import tool, ToolRuntime

# Access the current conversation state
@tool
def summarize_conversation(
    runtime: ToolRuntime # ToolRuntime 추가
) -> str:
    """Summarize the conversation so far."""
    messages = runtime.state["messages"]

    human_msgs = sum(1 for m in messages if m.__class__.__name__ == "HumanMessage")
    ai_msgs = sum(1 for m in messages if m.__class__.__name__ == "AIMessage")
    tool_msgs = sum(1 for m in messages if m.__class__.__name__ == "ToolMessage")

    return f"Conversation has {human_msgs} user messages, {ai_msgs} AI responses, and {tool_msgs} tool results"

# Access custom state fields
@tool
def get_user_preference(
    pref_name: str,
    runtime: ToolRuntime  # ToolRuntime parameter is not visible to the model!!
) -> str:
    """Get a user preference value."""
    preferences = runtime.state.get("user_preferences", {})
    return preferences.get(pref_name, "Not set")

- Updating state
    - Command를 사용하여 Update를 처리 가능합니다.

In [32]:
from langgraph.types import Command
from langchain.messages import RemoveMessage
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langchain.tools import tool, ToolRuntime

# Update the conversation history by removing all messages
@tool
def clear_conversation() -> Command:
    """Clear the conversation history."""

    return Command(
        update={
            "messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)],
        }
    )

# Update the user_name in the agent state
@tool
def update_user_name(
    new_name: str,
    runtime: ToolRuntime
) -> Command:
    """Update the user's name."""
    return Command(update={"user_name": new_name})

- Context 접근
    - `runtime.context` 를 통해 사용자 ID, 세션 세부 정보, 애플리케이션별 구성 등 변경 불가능한 구성 및 맥락 데이터를 접근할 수 있습니다.

In [20]:
from dotenv import load_dotenv

load_dotenv()

True

In [22]:
from dataclasses import dataclass
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime


USER_DATABASE = {
    "user123": {
        "name": "Alice Johnson",
        "account_type": "Premium",
        "balance": 5000,
        "email": "alice@example.com"
    },
    "user456": {
        "name": "Bob Smith",
        "account_type": "Standard",
        "balance": 1200,
        "email": "bob@example.com"
    }
}

@dataclass
class UserContext:
    user_id: str

@tool
def get_account_info(runtime: ToolRuntime) -> str:
    """Get the current user's account information."""
    user_id = runtime.context.user_id # 컨텍스트 가져옴.

    if user_id in USER_DATABASE:
        user = USER_DATABASE[user_id]
        return f"Account holder: {user['name']}\nType: {user['account_type']}\nBalance: ${user['balance']}"
    return "User not found"

model = ChatOpenAI(model="gpt-4o")
agent = create_agent(
    model,
    tools=[get_account_info],
    context_schema=UserContext,
    system_prompt="You are a financial assistant."
)

result = agent.invoke(
    {"messages": [{"role": "user", "content": "What's my current balance?"}]},
    context=UserContext(user_id="user123")
)

In [23]:
rpprint(result)

- Memory (Store)
    - 스토어를 통해 대화 간 지속적인 데이터에 접근할 수 있습니다. 저장소는 runtime.store 을 통해 접근할 수 있으며, 사용자 전용 또는 애플리케이션별 데이터를 저장하고 검색할 수 있습니다.

In [24]:
from typing import Any
from langgraph.store.memory import InMemoryStore
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime


# Access memory
@tool
def get_user_info(user_id: str, runtime: ToolRuntime) -> str:
    """Look up user info."""
    store = runtime.store
    user_info = store.get(("users",), user_id)
    return str(user_info.value) if user_info else "Unknown user"

# Update memory
@tool
def save_user_info(user_id: str, user_info: dict[str, Any], runtime: ToolRuntime) -> str:
    """Save user info."""
    store = runtime.store
    store.put(("users",), user_id, user_info)
    return "Successfully saved user info."

store = InMemoryStore()
agent = create_agent(
    model,
    tools=[get_user_info, save_user_info],
    store=store
)

# First session: save user info
first = agent.invoke({
    "messages": [{"role": "user", "content": "Save the following user: userid: abc123, name: Foo, age: 25, email: foo@langchain.dev"}]
})

# Second session: get user info
second = agent.invoke({
    "messages": [{"role": "user", "content": "Get user info for user with id 'abc123'"}]
})
# Here is the user info for user with ID "abc123":
# - Name: Foo
# - Age: 25
# - Email: foo@langchain.dev

In [25]:
rpprint(first)

In [26]:
rpprint(second)

- Stream Writer

In [41]:
from langchain.tools import tool, ToolRuntime

@tool
def get_weather(city: str, runtime: ToolRuntime) -> str:
    """Get weather for a given city."""
    writer = runtime.stream_writer

    # Stream custom updates as the tool executes
    writer(f"Looking up data for city: {city}")
    writer(f"Acquired data for city: {city}")

    return f"It's always sunny in {city}!"

# Practice - 업데이트 예정

## Setup

In [None]:
import uuid
import os
import operator
from typing import Literal, TypedDict, Annotated
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from dotenv import load_dotenv

load_dotenv()

In [None]:
# from langchain.tools import tool  # 이미 위에서 import됨
# from langchain_core.tools import ToolException  # 필요시 사용

## Define state schemas

In [None]:
class OverallState(TypedDict):
    topic: str
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]
    best_selected_joke: str

# Define Nodes, Edges

In [None]:
def generate_topics(state: OverallState):
    return {"subjects": ["lions", "elephants", "penguins"]}

def generate_joke(state: OverallState):
    joke_map = {
        "lions": "Why don't lions like fast food? Because they can't catch it!",
        "elephants": "Why don't elephants use computers? They're afraid of the mouse!",
        "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
    }
    return {"jokes": [joke_map[state["subject"]]]}

def continue_to_jokes(state: OverallState):
    """
    작업을 분할하여 각각에 대해 generate_joke를 실행
    Send API로 동적으로 병렬 실행 지시

    중요: 이 함수는 조건부 엣지 함수로 사용됩니다.
    """
    subjects = state["subjects"]
    print(f"[router] {len(subjects)}개 작업을 병렬 처리 시작")

    # 각 작업마다 worker Node 실행
    # Send(노드이름, 개별_상태_업데이트)
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def best_joke(state: OverallState):
    return {"best_selected_joke": "penguins"}

# Build the graph

In [None]:
# 그래프 생성
builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)


# Edge 추가
builder.add_edge(START, "generate_topics")
# Send는 add_conditional_edges 없이 사용 불가능
builder.add_conditional_edges(
    "generate_topics",
    continue_to_jokes, 
    ["generate_joke"] # 동적 라우팅, asyncio.gather() 와 같이 노드 전부 실행 후 다음 노드로 감.
)

# NOTE: generate_joke들이 "모두 완료"되면 best_joke로 이동 (리듀서 패턴)
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)

# 컴파일
graph = builder.compile()

In [None]:
# 그래프 그리기
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

## Test

In [None]:
import time
for step in graph.stream({"topic": "animals"}):
    print(step) # 한번에 하나씩 처리

---

## 병렬 실행 확인

In [None]:
from datetime import datetime
import time

import asyncio

def generate_topics(state: OverallState):
    return {"subjects": ["lions", "elephants", "penguins"]}

async def generate_joke(state: OverallState):
    joke_map = {
        "lions": "Why don't lions like fast food? Because they can't catch it!",
        "elephants": "Why don't elephants use computers? They're afraid of the mouse!",
        "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
    }

    print(f"🔄 {state["subject"]} 시작 ({5}초)")
    await asyncio.sleep(5)
    print(f"✅ {state["subject"]} 완료!")
    return {"jokes": [joke_map[state["subject"]]]}

def continue_to_jokes(state: OverallState):
    """
    작업을 분할하여 각각에 대해 generate_joke를 실행
    Send API로 동적으로 병렬 실행 지시

    중요: 이 함수는 조건부 엣지 함수로 사용됩니다.
    """
    subjects = state["subjects"]
    print(f"[router] {len(subjects)}개 작업을 병렬 처리 시작")

    # 각 작업마다 worker Node 실행
    # Send(노드이름, 개별_상태_업데이트)
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

async def best_joke(state: OverallState):
    print(f"🔄 best joke 선정 시작 ({5}초)")
    await asyncio.sleep(5)
    return {"best_selected_joke": "penguins"}

# 그래프 생성
builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)


# Edge 추가
builder.add_edge(START, "generate_topics")
# Send는 add_conditional_edges 없이 사용 불가능
builder.add_conditional_edges(
    "generate_topics",
    continue_to_jokes, 
    ["generate_joke"] # 동적 라우팅
)

# NOTE: generate_joke들이 "모두 완료"되면 best_joke로 이동
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)

# 컴파일
graph = builder.compile()

In [None]:
start = time.time()
async for step in graph.astream({"topic": "animals"}):
    elapsed = time.time() - start
    print(f"[{elapsed:.2f}초] {step}")

In [None]:
start = time.time()
async for step in graph.astream({"topic": "animals"}, stream_mode="debug"):
    elapsed = time.time() - start
    print(f"[{elapsed:.2f}초] {step}")