Fix saveMessages cancellation race with external AbortSignal#1411
Merged
threepointone merged 3 commits intomainfrom Apr 28, 2026
Merged
Fix saveMessages cancellation race with external AbortSignal#1411threepointone merged 3 commits intomainfrom
threepointone merged 3 commits intomainfrom
Conversation
Resolves the race in `Think.saveMessages` and `AIChatAgent.saveMessages` where callers had no way to safely cancel an in-flight programmatic turn without reaching into private `_aborts` state. `saveMessages` and `continueLastTurn` now accept `options.signal`, the signal is bridged into the per-turn `AbortRegistry` controller via a new `AbortRegistry.linkExternal()`, and `SaveMessagesResult.status` reports `"aborted"` when the external signal fires. Adds protected `abortRequest()` / `abortAllRequests()` helpers so subclasses no longer need bracket-access workarounds. Updates the `agents-as-tools` example to use the new contract, expands unit + integration coverage (registry, Think, AIChatAgent, helper stream), and documents the API plus its DO RPC and hibernation limitations. See #1406. Made-with: Cursor
🦋 Changeset detectedLatest commit: 49f3ec0 The changes in this PR will be included in the next version bump. This PR includes changesets to release 3 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
Fixes a listener leak in `_runProgrammaticChatTurn` where `linkExternal` was called before the `runFiber` boundary while the `try/finally` that calls `detachExternal()` lived inside `programmaticBody`. If `runFiber` itself threw — e.g. a SQLite error inserting the fiber row, or `keepAlive()` failing — the body never ran, so the external-signal listener was never removed and the registry entry was never cleaned up. Long-lived parent signals driving many helper turns would accumulate listeners across failures. The fix mirrors the structure already used by `Think.saveMessages` and `Think.continueLastTurn`: the `try/finally` now wraps both the `runFiber` and direct-call branches, so cleanup runs regardless of where the throw originates. `AIChatAgent.continueLastTurn` was already structurally safe (linkExternal runs *inside* the runFiber boundary) and is unchanged. Adds a regression test that monkey-patches `runFiber` to throw synchronously and asserts the abort registry drains and no listener remains on the external signal. Made-with: Cursor
Use `Parameters<...>` of the bound `addEventListener`/`removeEventListener` overloads (instead of redeclaring the signature) and reference the non-polymorphic `RecoverySlowStreamAgent["runFiber"]` so `typeof this` doesn't leak into the cast. Behavior is unchanged. Made-with: Cursor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Resolves #1406:
Think.saveMessages(and the equivalentAIChatAgent.saveMessages) had no race-free way for callers to cancel an in-flight programmatic turn. The previous workaround required reaching into private_abortsstate and still had a window where a cancel could land between turn-id assignment and registry insertion.saveMessagesandcontinueLastTurnnow acceptoptions.signal: AbortSignal. The external signal is bridged into the per-turnAbortRegistrycontroller via a newAbortRegistry.linkExternal(id, signal), which:abortlistener with a returned detacher,signal.reasoninto the registry'scancel(id, reason),MSG_CHAT_CANCEL, internal cancellation, and per-request listener cleanup.SaveMessagesResult.statusnow includes"aborted"alongside"completed"and"skipped". Existing callers that only switch on"completed"are unaffected. Two new protected helpers —abortRequest(id, reason?)andabortAllRequests()— replace the historical(this as { _aborts: ... })._aborts.destroyAll()pattern.The
agents-as-toolsexample is updated to use the new contract: each helper turn now owns a localAbortControllerwhose signal is passed tosaveMessages, and the returnedReadableStream'scancelcallback aborts that controller. This is the canonical pattern for bridging a parent DO's intent into a child DO across RPC.Changes
Core (
packages/agents)AbortRegistry.linkExternal(id, signal)+cancel(id, reason)accepting an explicit reason.SaveMessagesOptionstype exported fromchat/.SaveMessagesResult.statusextended with"aborted".Think (
packages/think)saveMessages(messages, options?)andcontinueLastTurn(options?)acceptoptions.signal.abortRequest()/abortAllRequests()methods.finally.AIChatAgent (
packages/ai-chat)saveMessages/continueLastTurn, including_runProgrammaticChatTurnhonoring the external signal.Example (
examples/agents-as-tools)AbortController; streamcancelaborts it._abortsbracket-access workaround.Tests
AbortRegistryunit tests covering pre-abort, deferred abort, listener leakage across many turns, double-abort, coexistence with internal cancel, and reason propagation.SaveMessagesOptions(tests-d/save-messages-options.test-d.ts).abortAllRequests()/ leak prevention.testRunHelperPreCancelled,testRunHelperMidCancelled,getAbortRegistrySize,waitForAbortRegistryDrained) plus a delayed mock model to make thecancelcallback land mid-inference reliably.Docs
docs/think/sub-agents.md:options.signal,"aborted"status, "Crossing DO boundaries", "Hibernation and recovery".docs/server-driven-messages.md: cancellation section + limitations.docs/chat-agents.md: updated signatures and behavior.docs/think/lifecycle-hooks.md:onChatResponseexample forabortedstatus.packages/ai-chat/README.md: usage example referencingThink.saveMessagesshould accept an externalAbortSignalso callers can cancel an in-flight turn from outside #1406.examples/agents-as-tools/README.md: updated B4 cancellation note.wip/inline-sub-agent-events.md: B4 marked resolved.Changesets (
patchper repo convention for pre-1.0 additive features):agents-abort-registry-link-external.mdthink-savemessages-abort-signal.mdai-chat-savemessages-abort-signal.mdLimitations (documented)
AbortSignalcannot cross Durable Object RPC. Construct the controller inside the DO that callssaveMessages. To bridge a parent's intent into a child DO, return aReadableStreamfrom the child whosecancelcallback aborts a per-turn controller — seeexamples/agents-as-tools.chatRecoveryis enabled, the recovered turn runs without the original signal.Test plan
pnpm -w turbo run test --filter=@cloudflare/agents --filter=@cloudflare/think --filter=@cloudflare/ai-chat --filter=agents-as-tools-examplepassespnpm -w turbo run typecheck lintpassesoxfmt --checkpasses (auto-applied via lint-staged on commit)AbortRegistryunit tests cover pre-abort, deferred abort, listener leak prevention, reason propagationabortAllRequests()Closes #1406.
Made with Cursor