Skip to content

[Bug] Java chat token metrics are recorded inside async call boundary #706

@joeyutong

Description

@joeyutong

Search before asking

  • I searched in the issues and found nothing similar.

Description

The Java built-in chat action can record token metrics from inside the async chat callable.

ChatModelAction can execute chatModel.chat(...) through ctx.durableExecuteAsync(...) when chat async execution is enabled. On the Java chat-model path, token metrics are recorded inside BaseChatModelConnection.recordTokenMetrics(...), which is called from the chat-model connection while the chat call is still running.

This is different from the Python built-in chat path. The Python chat action collects token usage from the chat response and calls chat_model._record_token_metrics(...) only after await ctx.durable_execute_async(...) resumes. That keeps metric recording outside the async callable.

The metric group is exposed through RunnerContext#getAgentMetricGroup() / getActionMetricGroup() and Python ctx.agent_metric_group / ctx.action_metric_group. If the runtime metric group is intended to be used only from the operator/mailbox execution path, the Java built-in chat token recording should follow the Python pattern and avoid touching the metric group inside the async call boundary.

Expected behavior:

  • Java and Python built-in chat actions should record token metrics at the same execution boundary.
  • Token metrics for Java chat models should be recorded after the async chat call returns, not inside the durableExecuteAsync callable.
  • The context metric-group getter contract should make it clear whether the returned metric group may be used from async callables, or whether callers must only use it from the operator/mailbox path.

Possible fix directions:

  • Move Java chat token metric recording out of BaseChatModelConnection.recordTokenMetrics(...) / the connection call path and into ChatModelAction after ctx.durableExecuteAsync(...) returns.
  • Mirror the Python approach by carrying token usage in the ChatMessage metadata/extra args and recording it after the chat response is available.
  • Clarify the context metric-group getter contract so future code does not accidentally record metrics inside async callables.

How to reproduce

Use a Java chat model with async chat execution enabled.

  1. ChatModelAction creates a DurableCallable whose call() invokes chatModel.chat(messages, Map.of()).
  2. With chat async enabled, ChatModelAction executes the callable through ctx.durableExecuteAsync(callable).
  3. The Java chat-model connection records token metrics from inside the chat call via BaseChatModelConnection.recordTokenMetrics(...).

The Python path already avoids this by recording token metrics after await ctx.durable_execute_async(...) returns.

Version and environment

Observed in the current repository code on a main-line checkout, local commit 58a18b5bb84b476826f8c03d16a115537b92b0cb.

Relevant files:

  • api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelConnection.java
  • plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
  • python/flink_agents/plan/actions/chat_model_action.py
  • python/flink_agents/api/chat_models/chat_model.py
  • api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java

Flink version in the root pom.xml: 2.2.0.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions