fix: wait for the Runner to persist a step's events before the ADK flow's next step (sequential-tool-execution race)#1234
Merged
Conversation
eff8f8d to
f5536cb
Compare
c869602 to
1e004b6
Compare
…ow's next step (sequential-tool-execution race) `BaseLlmFlow.run()` builds each step's request from the session, but the `Runner` persists events asynchronously downstream of the flow. A slow `appendEvent` could let the next step start from a stale session missing the prior step's events (e.g. a tool's function response), making the model re-call the tool or hallucinate its result. The `Runner` stays the sole `appendEvent` caller and the flow waits: it calls `PersistBarrier.markPersisted(id)` after each append (or `markFailed(id, error)` if it fails), and the flow calls `PersistBarrier.awaitPersisted(stepEvents)` between steps. The barrier is a reactive per-event signal in the shared `InvocationContext.callbackContextData` and never blocks a thread; `Contents` is unchanged. `PersistBarrier.enable()`, called by the `Runner`, keeps `awaitPersisted` a no-op when the flow runs without a `Runner`. Each event id maps to a `CompletableSubject`: pending until its append finishes, then terminally completed or failed. The subject retains its terminal state, so `awaitPersisted`/`mark*` may happen in any order and a late await -- e.g. at a higher flow level across an agent transfer -- resolves immediately. If an append fails, the matching await fails rather than blocking forever. It is thread-safe and lock-free: `markPersisted`/`markFailed` may run off-thread when an async `appendEvent` completes, and `ConcurrentHashMap.computeIfAbsent` hands both sides the same subject. PiperOrigin-RevId: 925858188
1e004b6 to
0a40557
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
fix: wait for the Runner to persist a step's events before the ADK flow's next step (sequential-tool-execution race)
BaseLlmFlow.run()builds each step's request from the session, but theRunnerpersists events asynchronously downstream of the flow. A slowappendEventcould let the next step start from a stale session missing theprior step's events (e.g. a tool's function response), making the model re-call
the tool or hallucinate its result.
The
Runnerstays the soleappendEventcaller and the flow waits: it callsPersistBarrier.markPersisted(id)after each append (ormarkFailed(id, error)if it fails), and the flow calls
PersistBarrier.awaitPersisted(stepEvents)between steps. The barrier is a reactive per-event signal in the shared
InvocationContext.callbackContextDataand never blocks a thread;Contentsisunchanged.
PersistBarrier.enable(), called by theRunner, keepsawaitPersisteda no-op when the flow runs without aRunner.Each event id maps to a
CompletableSubject: pending until its append finishes,then terminally completed or failed. The subject retains its terminal state, so
awaitPersisted/mark*may happen in any order and a late await -- e.g. at ahigher flow level across an agent transfer -- resolves immediately. If an append
fails, the matching await fails rather than blocking forever. It is thread-safe
and lock-free:
markPersisted/markFailedmay run off-thread when an asyncappendEventcompletes, andConcurrentHashMap.computeIfAbsenthands both sidesthe same subject.