Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .changeset/stream-callback-on-interrupted.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
"@cloudflare/think": minor
---

Add `StreamCallback.onInterrupted()` so a `chat()`-driven turn interrupted by recovery isn't silently abandoned

When a turn driven through `chat(userMessage, callback)` is interrupted and routed
into bounded recovery (a stream-stall watchdog abort), the scheduled continuation
runs in a **later isolate invocation without the original callback** — so neither
`onDone()` nor `onError()` ever fires for that callback. Because the isolate is
still alive, the RPC promise resolves **cleanly**, and a consumer that keys off the
clean resolve mis-reads it as success: it finalizes whatever partial it had
streamed. For the built-in messenger delivery this meant posting a **truncated**
answer as final, while the real recovered answer was produced later and broadcast
only to WebSocket connections.

`StreamCallback` now has an optional `onInterrupted?()` signal, emitted from the
stall→recovery branches of the RPC stream path instead of returning silently. It
means "not done, not a terminal error — a continuation owns the final outcome";
consumers should keep the channel open / show a recovering state / re-attach
rather than finalizing the partial. It is **optional**, so existing
`StreamCallback` implementers are unaffected.

Messenger delivery is wired to it: an interrupted reply now surfaces an
"interrupted, please retry" message instead of finalizing the truncated partial.

Note: a deploy/eviction interruption kills the isolate (and the callback) before
this can fire — the caller observes a transport break instead. `onInterrupted`
covers the in-isolate stall→recovery path.
16 changes: 16 additions & 0 deletions packages/think/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,11 @@ interface StreamCallback {
onEvent(json: string): void | Promise<void>;
onDone(): void | Promise<void>;
onError(error: string): void | Promise<void>;
// Optional. The attempt was interrupted (a stream-stall watchdog abort routed
// into bounded recovery) and a scheduled continuation — in a later isolate,
// without this callback — owns the final outcome. NOT done, NOT a terminal
// error. Defaults to a no-op, so existing implementers are unaffected.
onInterrupted?(): void | Promise<void>;
}

const agent = await this.subAgent(MyAgent, "thread-1");
Expand All @@ -736,6 +741,17 @@ await agent.chat("Summarize the project", relay);
`agent.cancelChat(requestId, reason)` if the parent needs to stop the child turn
after it has started.

`onInterrupted` matters for a `chat()`-driven turn that is interrupted and
recovers: the RPC promise resolves **cleanly** (the isolate is still alive), so a
consumer that keys off the clean resolve would mis-read it as success and
finalize whatever partial it had streamed. Treat `onInterrupted` as "not done,
not failed — a continuation owns the answer": keep the channel open, show a
recovering state, or re-attach, rather than finalizing the partial. (The built-in
messenger delivery already does this — it surfaces an "interrupted, please retry"
reply instead of posting the truncated partial.) Note: a deploy/eviction
interruption kills the isolate before this can fire — the caller sees a transport
break instead; `onInterrupted` covers the in-isolate stall→recovery path.

Tools belong to the child agent; define them with `getTools()` or use
`agentTool()` / `runAgentTool()` for parent-child orchestration.

Expand Down
36 changes: 36 additions & 0 deletions packages/think/src/messengers/delivery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export class TextStreamCallback extends RpcTarget implements StreamCallback {
private readonly visibleSoftLimit?: number;
private chatRequestId?: string;
private closed = false;
private interrupted = false;
private error?: Error;
private text = "";
private visibleClosed = false;
Expand Down Expand Up @@ -64,6 +65,20 @@ export class TextStreamCallback extends RpcTarget implements StreamCallback {
this.fail(new Error(error));
}

onInterrupted(): void {
// The attempt was interrupted and a continuation (not this callback) owns
// the real answer — delivered only to WebSocket connections, never to this
// surface. Mark interrupted and stop the visible stream WITHOUT failing it,
// so delivery surfaces the interrupted apology instead of treating the
// partial as the final reply (#1644).
this.interrupted = true;
this.close();
}

wasInterrupted(): boolean {
return this.interrupted;
}

close(): void {
this.closed = true;
this.visibleClosed = true;
Expand Down Expand Up @@ -365,6 +380,27 @@ export async function deliverMessengerReply(
} else {
await options.target.chat(userMessage, callback);
}
if (callback.wasInterrupted()) {
// The model turn was interrupted and routed into bounded recovery; the
// recovered answer is produced later by a scheduled continuation and
// broadcast only to WebSocket connections, NOT to this one-shot messenger
// delivery. Do NOT mark the turn complete or finalize the truncated
// partial as the reply — surface the interrupted apology so the user
// knows to retry (#1644). `completedModelTurn` stays false.
callback.close();
await post.catch(() => undefined);
await options.surface
.post(interruptedResponseText)
.catch(() => undefined);
await checkpoint(
messengerReplySnapshot(
"completed",
snapshotEvent,
options.snapshotThread
)
);
return;
}
completedModelTurn = true;
callback.close();
await post;
Expand Down
12 changes: 11 additions & 1 deletion packages/think/src/tests/agents/assistant-agent-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type TestChatResult = {
events: string[];
done: boolean;
error?: string;
interruptedCalls: number;
};

class TestCollectingCallback implements StreamCallback {
events: string[] = [];
doneCalled = false;
errorMessage?: string;
interruptedCalls = 0;
onStart(): void {}
onEvent(json: string): void {
this.events.push(json);
Expand All @@ -37,6 +39,9 @@ class TestCollectingCallback implements StreamCallback {
onError(error: string): void {
this.errorMessage = error;
}
onInterrupted(): void {
this.interruptedCalls++;
}
}

// ── Mock LanguageModel ──────────────────────────────────────────────
Expand Down Expand Up @@ -276,7 +281,12 @@ export class LoopToolTestAgent extends Think {
async testChat(message: string): Promise<TestChatResult> {
const cb = new TestCollectingCallback();
await this.chat(message, cb);
return { events: cb.events, done: cb.doneCalled, error: cb.errorMessage };
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

async getBeforeToolCallLog(): Promise<
Expand Down
51 changes: 38 additions & 13 deletions packages/think/src/tests/agents/think-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export type TestChatResult = {
events: string[];
done: boolean;
error?: string;
interruptedCalls: number;
};

/** Shallow JSON object for DO RPC returns (`Record<string, unknown>` fails RPC typing). */
Expand Down Expand Up @@ -455,6 +456,7 @@ class TestCollectingCallback implements StreamCallback {
doneCalled = false;
errorMessage?: string;
requestId?: string;
interruptedCalls = 0;

onStart(event: { requestId: string }): void {
this.requestId = event.requestId;
Expand All @@ -471,6 +473,10 @@ class TestCollectingCallback implements StreamCallback {
onError(error: string): void {
this.errorMessage = error;
}

onInterrupted(): void {
this.interruptedCalls++;
}
}

// ── ThinkTestAgent ─────────────────────────────────────────
Expand Down Expand Up @@ -826,7 +832,8 @@ export class ThinkTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -870,7 +877,8 @@ export class ThinkTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand All @@ -890,7 +898,8 @@ export class ThinkTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -965,6 +974,7 @@ export class ThinkTestAgent extends Think {
timeoutMs: number
): Promise<{
firstError: string | undefined;
firstInterruptedCalls: number;
scheduledContinues: number;
assistantMessages: number;
finalAssistantText: string;
Expand Down Expand Up @@ -1007,6 +1017,7 @@ export class ThinkTestAgent extends Think {
: "";
return {
firstError: first.error,
firstInterruptedCalls: first.interruptedCalls,
scheduledContinues,
assistantMessages: assistant.length,
finalAssistantText
Expand Down Expand Up @@ -1233,7 +1244,7 @@ export class ThinkTestAgent extends Think {

await this.chat(message, cb, { signal: controller.signal });

return { events, done: doneCalled, doneCalled };
return { events, done: doneCalled, doneCalled, interruptedCalls: 0 };
}

async testChatWithCancelChat(
Expand Down Expand Up @@ -1264,7 +1275,13 @@ export class ThinkTestAgent extends Think {

await this.chat(message, cb);

return { events, done: doneCalled, doneCalled, requestId };
return {
events,
done: doneCalled,
doneCalled,
requestId,
interruptedCalls: 0
};
}

async setResponse(response: string): Promise<void> {
Expand Down Expand Up @@ -2049,7 +2066,8 @@ export class ThinkSessionTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -2137,7 +2155,8 @@ export class ThinkAsyncConfigSessionAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -2262,7 +2281,8 @@ export class ThinkConfigInSessionAgent extends Think<Cloudflare.Env> {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -2496,7 +2516,8 @@ export class ThinkToolsTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -3202,7 +3223,8 @@ export class ThinkProgrammaticTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}
}
Expand Down Expand Up @@ -3526,7 +3548,8 @@ export class ThinkAsyncHookTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -3638,7 +3661,8 @@ export class ThinkRecoveryTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down Expand Up @@ -4554,7 +4578,8 @@ export class ThinkNonRecoveryTestAgent extends Think {
return {
events: cb.events,
done: cb.doneCalled,
error: cb.errorMessage
error: cb.errorMessage,
interruptedCalls: cb.interruptedCalls
};
}

Expand Down
Loading
Loading