Replies: 1 comment
-
|
hi @joeyutong @Sxnan ,I noticed that tool parallel execution hasn't been implemented before. I think it's really necessary to improve the agent's execution efficiency. When you have time, please take a look at the design in this area, I'd really appreciate it. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Motivation
Background
Flink Agents already supports async durable execution for high-latency I/O:
await ctx.durable_execute_async(...)submits work to a thread pool and yields the operator. See Discussion #404.ctx.durableExecuteAsync(DurableCallable)uses the Continuation API to yield the mailbox thread while work runs on an async thread pool. See Discussion #429.The built-in
tool_call_actionlistens toToolRequestEvent, looks up each tool resource, executes it through durable execution, and emitsToolResponseEventafter all tool calls in the batch are processed. See tool_use.md.When the LLM returns multiple tool calls in a single response, each call is typically an independent HTTP / MCP / RPC request. Running them in parallel can significantly reduce end-to-end latency.
The Problem
Today,
ToolCallAction.processToolRequestprocesses tool calls serially:Even with
tool-call.async=true(default) on JDK 21+, the behavior is:durableExecuteAsync.forloop.durableExecuteAsyncreturns.So async execution today achieves inter-action concurrency, not intra-batch tool concurrency. This is consistent with Discussion #429, which explicitly states:
For a
ToolRequestEventwith N independent I/O-bound tools, total latency is approximately the sum of individual latencies instead of the max.Additionally, all tool calls currently share the same durable
functionId:Fine-grained durable execution (Discussion #404) matches calls by
(callIndex, functionId, argsDigest). Reusing"tool-call"for every tool in a batch makes recovery matching fragile when call order or batch composition changes.Goal
Enable **parallel execution of multiple tool calls within a single **
ToolRequestEvent, while:sendEvent, and durable state recording remain on the mailbox thread.tool-call.async=falsefall back to serial execution.Design Goals
RunnerContext(memory, events, metrics) accessed only on mailbox threadNon-Goals (first version)
ToolRequestEvents (already handled by Flink parallelism + async yield)ToolRequestEvent/ToolResponseEventwire formatAPI Design
New Config Option
tool-call.async=falsetool-call.async=true,tool-call.parallel=falsetool-call.async=true,tool-call.parallel=trueExisting
num-async-threads(Discussion #429) caps concurrent async work globally. A batch of N tools may use up to N threads from this pool.Optional future knob:
tool-call.max-parallelismto limit concurrent tools per batch.New RunnerContext Method
Python equivalent:
Per-Tool Durable Identity
Each tool callable must use a stable, unique
functionId:Design Approach
Recommended: Batch Async Durable API (Fan-out / Fan-in)
Extend the runtime with
durableExecuteAllAsyncrather than changing the event model.ToolCallActionbecomes a thin caller.Batch API is the recommended first step. Action Task Split remains a valid future evolution if finer-grained interleaving with other actions is needed.
Core Component Design
ContinuationContext Extension
ContinuationActionExecutor.executeActioncheckshasPendingAsync()instead of onlypendingFuture:ContinuationActionExecutor.executeAllAsync (JDK 21)
JavaRunnerContextImpl.durableExecuteAllAsync
Three phases, all orchestrated from the mailbox thread:
Phase 1 — Prepare (mailbox thread)
For each
DurableCallablein input order:tryGetCachedResult(functionId, argsDigest)— on hit, fill result slot.pendingListwith original index.Phase 2 — Fan-out + Yield (mailbox initiates, pool executes)
ContinuationActionExecutor.executeAllAsync.Phase 3 — Fan-in + Persist (mailbox thread)
For each index in original order:
recordDurableCompletion(functionId, argsDigest, result, exception)— strictly in tool_calls order.ToolCallAction Changes
buildCallablesresolves tools, handles missing-tool errors inline (no async submission for non-existent tools), and assignsfunctionId = "tool-call-" + toolCallId.Execution Flow
Parallel Batch (JDK 21+)
Compare with current serial async:
Recovery Logic
Aligned with Discussion #404 and Discussion #598.
Normal Recovery
On recovery,
processToolRequestre-executes and callsdurableExecuteAllAsyncwith the same ordered callable list:SUCCEEDED/ cached hitFAILED/ cached exceptionPENDING(reconcilable)reconciler()per #598Recording after fan-in always follows tool_calls list order, regardless of which tools completed first in the previous attempt.
Partial Batch Failover
Example: 3 tools, tool 0 and 1 persisted as
SUCCEEDED, tool 2 wasPENDINGwhen failover occurred:PENDINGor miss.PENDING).callRecords).Call-Order Mismatch
If recovery detects
functionId/argsDigestmismatch at any callIndex (Discussion #404):CallResultentries.Using per-tool
functionId = "tool-call-{id}"makes mismatch detection precise when LLM returns different tool sets across retries.Reconciler Integration
HTTP / MCP tools with side effects should implement
DurableCallable.reconciler()per Discussion #598:For parallel batches, each tool's reconciler runs independently during recovery when its slot is
PENDING.Implementation
Module Changes
apiTOOL_CALL_PARALLEL,RunnerContext.durableExecuteAllAsyncruntime(java21)ContinuationContext.pendingBatchFuture,executeAllAsync, updateexecuteActionruntimeJavaRunnerContextImpl.durableExecuteAllAsyncwith 3-phase fan-out/fan-inruntime(java11)durableExecuteloopplanToolCallAction; fix per-toolfunctionIdpythondurable_execute_all_async+ updatetool_call_action.pyMulti-release JAR
Same pattern as Discussion #429:
Migration
tool-call.parallel=true: existing jobs with multi-tool batches get parallel behavior automatically when on JDK 21+."tool-call"to"tool-call-{id}"affects in-flightActionStateduring rolling upgrade. Mitigation: treat functionId change as call-order mismatch → clear and re-execute (safe, at-most-once).Documentation
durableExecuteAllAsync, note thatasyncio.gatherremains unsupported in Python actions (usedurable_execute_all_asyncinstead).tool-call.parallel.Testing
PENDINGon recovery → reconcilers invoked independentlytool-call.parallel=falseand JDK < 21 paths unchangedtool-call-{id}Beta Was this translation helpful? Give feedback.
All reactions