fix(think): add StreamCallback.onInterrupted for recovering chat() turns (#1644)#1647
Merged
Merged
Conversation
…urn isn't silently abandoned (#1644) When a turn driven through `chat(userMessage, callback)` is interrupted and routed into bounded recovery (a stream-stall watchdog abort, #1626/#1643), the scheduled continuation runs in a LATER isolate invocation that does not hold the original `callback` — so neither `onDone()` nor `onError()` ever fires for it. Because the isolate is still alive, the RPC promise resolves CLEANLY, and the callback contract `onStart → onEvent* → (onDone | onError)` is silently violated. The deploy/stall asymmetry is the subtle part: a deploy/eviction kills the isolate, so the caller sees a transport break and re-attaches; a stall→recovery interruption returns cleanly, so the caller cannot distinguish "finished" from "interrupted, continuing elsewhere." A consumer that keys off the clean resolve mis-reads it as success and finalizes whatever partial it streamed. Real-world impact: messenger delivery (`deliverMessengerReply`) calls `callback.close()` right after `chat()` resolves and posts the streamed text as the final reply — so a recovering stall made it post a TRUNCATED answer, while the real recovered answer was produced later by the continuation and broadcast only to WebSocket connections (never to the messenger surface). ## What changed - **API:** add optional `onInterrupted?()` to `StreamCallback` (think.ts). It means "not done, not a terminal error — a continuation owns the final outcome"; consumers should keep the channel open / show a recovering state / re-attach instead of finalizing. Optional → default no-op, so existing implementers are unaffected (fully backward-compatible). - **Emit:** `_streamResultToRpcCallback` now calls `await callback.onInterrupted?.()` in BOTH stall branches (`scheduled` and `exhausted`) instead of returning silently. (`_streamResult` is the WebSocket-broadcast path and has no `StreamCallback`, so it is unaffected. A deploy/eviction can't fire this — the isolate is already gone.) - **Messenger wiring:** `TextStreamCallback` gains `onInterrupted()` (+ `wasInterrupted()`); `deliverMessengerReply` no longer marks the turn complete or posts the truncated partial on interruption — it surfaces the "interrupted, please retry" reply (`INTERRUPTED_MESSENGER_RESPONSE`) and checkpoints the one-shot delivery completed. ## Tests - `think-session.test.ts`: a stall-recovering `chat()` turn calls `onInterrupted` exactly once (NOT onDone/onError); a normally-completing turn calls onDone and an in-band stream error calls onError, neither firing `onInterrupted`. - `messengers.test.ts`: an interrupted messenger turn surfaces the interrupted apology (not the truncated partial, not the empty-response fallback) and checkpoints completed. - Harness: `TestCollectingCallback` captures `interruptedCalls`; `TestChatResult` carries it; `testChatWithStallThenRecover` surfaces `firstInterruptedCalls`. - README: documents `onInterrupted` on the `StreamCallback` sub-agent-streaming section. Full think suite green (481). Backward-compatible: a `StreamCallback` implementer that does not define `onInterrupted` behaves exactly as before. Co-authored-by: Cursor <cursoragent@cursor.com>
🦋 Changeset detectedLatest commit: ffbfb51 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
…e exhausted case The JSDoc said a "scheduled continuation owns the final outcome", but `onInterrupted` also fires from the budget-EXHAUSTED stall branch, where the turn is terminally over (terminalMessage + onExhausted already delivered out-of-band) and NO continuation will run. A custom consumer taking the docstring literally — e.g. parking to re-attach to the continuation — would block/leak on exhaustion. Broadened the contract to cover both branches (continuation will produce it OR it was already terminalized via exhaustion) and added the explicit warning to not block indefinitely waiting for a continuation. Behavior unchanged (the messenger apology + checkpoint-completed UX is already correct for both cases); this only fixes the documented contract so it can't mislead custom onInterrupted handlers. Co-authored-by: Cursor <cursoragent@cursor.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #1644.
Summary
When a turn driven through
chat(userMessage, callback)is interrupted and routed into bounded recovery (a stream-stall watchdog abort — #1626/#1643), the scheduled continuation runs in a later isolate invocation that doesn't hold the originalcallback— so neitheronDone()noronError()ever fires for it. Because the isolate is still alive, the RPC promise resolves cleanly, silently violating theonStart → onEvent* → (onDone | onError)contract.The deploy/stall asymmetry (the subtle part): a deploy/eviction kills the isolate, so the caller observes a transport break and re-attaches; a stall→recovery interruption returns cleanly, so the caller cannot distinguish "finished" from "interrupted, continuing elsewhere." A consumer that keys off the clean resolve mis-reads it as success and finalizes whatever partial it streamed.
Real-world impact (messenger truncation):
deliverMessengerReplycallscallback.close()right afterchat()resolves and posts the streamed text as the final reply. So a recovering stall made it post a truncated answer, while the real recovered answer was produced later by the continuation and broadcast only to WebSocket connections — never to the messenger surface. The messenger user got a truncated/empty reply.This was flagged in #1643 review (option C, recommended). The fix there intentionally kept the silent clean-return because none of the existing terminal signals is correct for a recovering turn; this PR adds the dedicated signal and wires the consumer that truncates.
What changed
onInterrupted?()onStreamCallback(think.ts). Means not done, not a terminal error — a continuation owns the final outcome; consumers should keep the channel open / show a recovering state / re-attach instead of finalizing. Optional → default no-op, fully backward-compatible._streamResultToRpcCallbackcallsawait callback.onInterrupted?.()in both stall branches (scheduledandexhausted) instead of returning silently._streamResultis the WebSocket-broadcast path (noStreamCallback) so it's unaffected; a deploy/eviction can't fire this (the isolate is already gone).TextStreamCallbackgainsonInterrupted()/wasInterrupted();deliverMessengerReplyno longer marks the turn complete or posts the truncated partial on interruption — it surfaces theINTERRUPTED_MESSENGER_RESPONSE("interrupted, please retry") reply and checkpoints the one-shot delivery completed.StreamCallbacksub-agent-streaming section documentsonInterrupted+ the clean-resolve gotcha.Why a new signal (not onDone/onError)
For an interrupted-but-recovering turn:
onDone()falsely signals success (messenger finalizes a partial as final);onError()falsely signals terminal failure (error banner / sub-agent driver aborts); the current clean return is least-wrong but silent and truncates messenger answers. Only a dedicated signal is correct.Tests
think-session.test.ts: a stall-recoveringchat()turn callsonInterruptedexactly once (NOT onDone/onError); a normally-completing turn calls onDone and an in-band stream error calls onError — neither firesonInterrupted.messengers.test.ts: an interrupted messenger turn surfaces the interrupted apology (not the truncated partial, not the empty-response fallback) and checkpoints completed.TestCollectingCallbackcapturesinterruptedCalls;TestChatResultcarries it;testChatWithStallThenRecoversurfacesfirstInterruptedCalls.Scope / compatibility
runAgentTool()/agentTool()(durable ledger + re-attach path, bug: sub-agent (agent-tool) runs are abandoned and re-run on parent recovery instead of re-attaching #1630/fix(agents,think,ai-chat): re-attach to still-running sub-agent runs on parent recovery (#1630) #1640) — unaffected.StreamCallbackimplementer that doesn't defineonInterruptedbehaves exactly as before.Changeset
@cloudflare/thinkminor (new optional API + messenger behavior fix).Test plan
npm run check— all 91 projects typecheck; format + lint cleanpackages/thinkfull unit suite — 481 pass (think-session 160, messengers 27)Made with Cursor