# Token calculator
`토큰 비용 계산기`

### 모델 종류:
- gemini
    - gemini-2.5-flash-lite
    - gemini-2.5 flash
    - gemini-2.5-pro
- openai
    - gpt-5-nano
    - gpt-5-mini
    - gpt-5
    - gpt-5-pro
- claude
    - claude-opus-4-1
    - claude-opus-4
    - claude-sonnet-4-5
    - claude-sonnet-4
    - claude-haiku-4-5
    - claude-haiku-3-5
    - claude-haiku-3

### 가용 함수들
- `util/calculate_price_if_message` : AIMessage 입력
- `util/calculate_price_if_messages` : list(AnyMessage) 입력

In [2]:
from util.test_agent import todo_agent, HumanMessage

result = todo_agent.invoke({
    "messages": [HumanMessage("Refactor the authentication module to use async/await and ensure all tests pass")]
})

In [3]:
from util.token_calc import calculate_price_of_messages

calculate_price_of_messages(result["messages"], True)

[다중 메시지] 입력 토큰 비용: 0.00040, 출력 토큰 비용: 0.00014, 총비용: 0.00053
[다중 메시지] 입력 토큰 비용: 0.00043, 출력 토큰 비용: 0.00017, 총비용: 0.00060


(0.0008313, 0.00030500000000000004)

## 커스터미아징 클래스 기반 미들웨어
- 요청들에 대해서 직접 비용 조회
- state에 비용 계산값들 업데이트
- agent의 after_model 훅, wrap_tool_call 훅 체크
- agent의 사고 매번 추적 가능
- tool call의 경우, Command로 리턴값 조정한 함수만 사용가능. => 이렇게 지정한 함수는 해당 미들웨어 없이 오동작하니 주의!!
- 리턴값에서 messages list에 처음에 toolmessage를 넣고(원래 응답) llm 호출결과를 추가하여 해당 메시지들을 추가
- 이후 추가된 메시지들을 읽어 계산하고, 원래 반환되어야 하는 toolmessage를 정산 반환하도록 함
- 해당 로직이 없는 일반 툴의 경우(결과물이 toolmessage인 것), 미들웨어를 그대로 통과.
- tool call에서 해당 추적을 위해서는 기존 toolmessage에 해당 메시지들을 추가해야 하며, 테스트 등을 위해 툴 함수를 별개로 만들어 보는게 좋을듯?

In [1]:
from collections.abc import Callable
import operator
from typing import Annotated, Any, NotRequired

from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import ToolMessage, AIMessage
from langgraph.prebuilt.tool_node import ToolCallRequest
from langgraph.runtime import Runtime
from langgraph.types import Command


class ToolTrackingState(AgentState):
    total_input_cost: NotRequired[Annotated[float, operator.add]]
    total_output_cost: NotRequired[Annotated[float, operator.add]]


class TokenTrackingMiddleware(AgentMiddleware[ToolTrackingState]):
    """Agent와 Tool 실행 시 토큰 비용을 자동 추적하는 미들웨어"""
    def __init__(self, debug: bool = False) -> None:
        self.debug = debug
    
    def _calculate_price(self, messages: list):
        from util.token_calc import calculate_price_of_messages
        return calculate_price_of_messages(messages, self.debug)

    def after_agent(self, state: ToolTrackingState, runtime: Runtime[None]) -> dict[str, Any] | None:
        in_c = state.get("total_input_cost", 0.0)
        out_c = state.get("total_output_cost", 0.0)
        
        if self.debug:
            print(f"[State Result]\n    total_input_cost: {in_c:.5f}\n    total_output_cost: {out_c:.5f}")
        
        return None
    
    # def before_model(self, state: ToolTrackingState, runtime: Runtime[None]) -> dict[str, Any] | None:
    #     print(f"before_model information: {state.get("total_input_cost")}, {state.get("total_output_cost")}")
    #     ...

    def after_model(self, state: ToolTrackingState, runtime: Runtime[None]) -> dict[str, Any] | None:
        recent_message = state["messages"][-1]
        
        if not isinstance(recent_message, AIMessage):
            return None

        in_p, out_p = self._calculate_price([recent_message])

        return {
            "total_input_cost": in_p,
            "total_output_cost": out_p
        }
    
    def wrap_tool_call(
        self,
        request: ToolCallRequest,
        handler: Callable[[ToolCallRequest], ToolMessage | Command]
    ) -> ToolMessage | Command:
        if self.debug:
            print(f"Executing tool: {request.tool_call['name']}")
            print(f"Arguments: {request.tool_call['args']}")
        
        try:
            result = handler(request)

            if isinstance(result, ToolMessage):
                return result
            
            if not isinstance(result.update, dict):
                raise

            target_messages = result.update["messages"][1:]
            in_p, out_p = self._calculate_price(target_messages)

            tool_message = result.update["messages"][0]
            if not isinstance(tool_message, ToolMessage):
                raise

            result.update["messages"] = [tool_message]
            print("Tool execution completed")
            return Command(update={
                **result.update,
                "total_input_cost": in_p,
                "total_output_cost": out_p
            })
        except Exception as e:
            print(f"Tool failed: {e}")
            raise

In [2]:
from util.test_agent import plan_tool, info_tool, llm
from langchain.agents import create_agent
from langchain.messages import HumanMessage

class State(ToolTrackingState):
    destination: str

tour_agent_1 = create_agent(
    model=llm,
    tools=[plan_tool, info_tool],
    middleware=[TokenTrackingMiddleware(debug=True)],
    state_schema=State,
    system_prompt="당신은 한국의 여행사 직원입니다. 도구를 활용하여 고객의 여행 계획을 수립하세요." \
    "계획 수립에 앞서 먼저 여행지에 대한 조사를 수행하세요.",
)

# for chunk in tour_agent_1.stream({
#     "messages": [HumanMessage("제주도 여행 계획을 세워주세요. 부모님을 모시고 효도 관광을 하고 싶습니다.")]
# }):
#     print(chunk)

response = tour_agent_1.invoke({
    "messages": [HumanMessage("제주도 여행 계획을 세워주세요. 부모님을 모시고 효도 관광을 하고 싶습니다.")]
})

# print(f"input cost: {response["total_input_cost"]:.5f}, output cost: {response["total_output_cost"]:.5f}\ntotal: {response["total_input_cost"]+response["total_output_cost"]:.5f}")

[단일 메시지] 입력 토큰 비용: 0.00006, 출력 토큰 비용: 0.00038, 총비용: 0.00044
Executing tool: info_tool
Arguments: {'query': '제주도'}
[단일 메시지] 입력 토큰 비용: 0.00006, 출력 토큰 비용: 0.00020, 총비용: 0.00026
[단일 메시지] 입력 토큰 비용: 0.00065, 출력 토큰 비용: 0.00122, 총비용: 0.00187
Tool execution completed
[단일 메시지] 입력 토큰 비용: 0.00022, 출력 토큰 비용: 0.00222, 총비용: 0.00244
[State Result]
    total_input_cost: 0.00099
    total_output_cost: 0.00403


In [3]:
print(response.keys())
print(f"input cost: {response["total_input_cost"]:.5f}, output cost: {response["total_output_cost"]:.5f}\ntotal: {response["total_input_cost"]+response["total_output_cost"]:.5f}")
for message in response["messages"]:
    print(message.type)
    print(message.content[:100])

dict_keys(['messages', 'total_input_cost', 'total_output_cost'])
input cost: 0.00099, output cost: 0.00403
total: 0.00502
human
제주도 여행 계획을 세워주세요. 부모님을 모시고 효도 관광을 하고 싶습니다.
ai

tool
제주도는 정말 매력적인 곳이죠! 제주 바다와 땅이 선물하는 맛있는 먹거리, 자연과 문화가 어우러진 아름다운 명소, 그리고 독특한 섬 문화와 아픈 역사가 공존하는 곳입니다.

**먹
ai
부모님과 함께하는 효도 관광을 위한 제주도 여행 계획을 세워드리겠습니다.

**여행 테마:** 자연 속 휴식과 힐링, 맛있는 음식 즐기기, 제주의 역사와 문화 체험

**추천 명소


## 반복 수행하며 평균 비용을 계산하는 함수

In [4]:
from langgraph.graph.state import CompiledStateGraph

def get_mean_price(agent: CompiledStateGraph, initial_state: dict, rep: int) -> None:
    store = []
    
    try:
        for _ in range(rep):
            response = agent.invoke(initial_state)
            store.append((response["total_input_cost"], response["total_output_cost"]))
        
        in_p = sum(x[0] for x in store) / rep
        out_p = sum(x[1] for x in store) / rep
        total_p = in_p + out_p
        
        min_in = min(x[0] for x in store)
        max_in = max(x[0] for x in store)
        min_out = min(x[1] for x in store)
        max_out = max(x[1] for x in store)
        min_total = min(x[0] + x[1] for x in store)
        max_total = max(x[0] + x[1] for x in store)
        
        print(f"Input  - Min: {min_in:.5f} | Average: {in_p:.5f} | Max: {max_in:.5f}")
        print(f"Output - Min: {min_out:.5f} | Average: {out_p:.5f} | Max: {max_out:.5f}")
        print(f"Total  - Min: {min_total:.5f} | Average: {total_p:.5f} | Max: {max_total:.5f}")
    except Exception as e:
        print(e)

In [6]:
class State(ToolTrackingState):
    destination: str

tour_agent_1 = create_agent(
    model=llm,
    tools=[plan_tool, info_tool],
    middleware=[TokenTrackingMiddleware()],
    state_schema=State,
    system_prompt="당신은 한국의 여행사 직원입니다. 도구를 활용하여 고객의 여행 계획을 수립하세요." \
    "계획 수립에 앞서 먼저 여행지에 대한 조사를 수행하세요.",
)

initial_state = {
    "messages": [HumanMessage("제주도 여행 계획을 세워주세요. 부모님을 모시고 효도 관광을 하고 싶습니다.")]
}

get_mean_price(tour_agent_1, initial_state, 3)

Tool execution completed
Tool execution completed
Tool execution completed
Tool execution completed
Input  - Min: 0.00085 | Average: 0.00129 | Max: 0.00181
Output - Min: 0.00369 | Average: 0.00637 | Max: 0.01075
Total  - Min: 0.00454 | Average: 0.00765 | Max: 0.01256
