Skip to content

StreamCallback is silently abandoned (no onDone/onError) when a chat()-driven turn is interrupted by recovery — add an explicit onInterrupted signal #1644

@threepointone

Description

@threepointone

Summary

When a turn driven through the chat(userMessage, callback) RPC path is interrupted and routed into bounded recovery (a stream-stall watchdog abort from #1626, or — in spirit — a deploy/eviction), the StreamCallback is left in an incomplete state: neither onDone() nor onError() is ever called for that callback. The StreamCallback contract is effectively onStart → onEvent* → (onDone | onError), and an interrupted-then-recovering turn violates it.

This surfaced in review of #1643 (#1626). The fix there intentionally keeps the current behavior (clean return, no terminal callback) because none of the existing terminal signals is correct for a recovering turn — but the contract is dishonest and the failure mode is silent. This issue proposes adding an explicit, optional onInterrupted() signal and wiring the consumers that care.

Where it happens

StreamCallback (in packages/think/src/think.ts):

export interface StreamCallback {
  onStart(event: ChatStartEvent): void | Promise<void>;
  onEvent(json: string): void | Promise<void>;
  onDone(): void | Promise<void>;
  onError(error: string): void | Promise<void>;
}

Both stream catches route a ChatStreamStalledError into _routeStallToBoundedRecovery, then on "scheduled" / "exhausted" they finalize the stream and return without calling callback.onDone() / callback.onError():

  • _streamResultToRpcCallback — the stall branch (outcome === "scheduled" / "exhausted") returns void; the comment even says "No callback.onError/response hook: the scheduled continuation owns the real terminal outcome."
  • _streamResult — symmetric (returns { status: "aborted" }).

The scheduled continuation runs in a later isolate invocation (_chatRecoveryContinue via an alarm) and does not hold the original callback reference, so onDone/onError will never fire for that callback.

The deploy/stall asymmetry (the subtle part)

  • Deploy/eviction interruption: the isolate dies, so the callback dies with it and the caller's RPC connection breaks — the caller observes a transport error/disconnect and re-attaches (via the durable child-run ledger).
  • Stall→recovery interruption: the isolate is alive; we return cleanly, so the RPC promise resolves successfully. The caller cannot distinguish "the turn finished" from "the turn was interrupted and is continuing elsewhere."

So even though stall-recovery was designed to mirror deploy interruption, the caller-visible outcome differs: a clean success vs. a broken connection.

Consumers affected

chat(msg, callback) consumers (grep \.chat( + StreamCallback):

  1. Messengerspackages/think/src/messengers/delivery.ts. The TextStreamCallback's onDone() closes the channel, onError() fails it. Crucially, delivery itself calls callback.close() right after chat() resolves:

    await options.target.chat(userMessage, callback); // (or chatWithMessengerContext)
    completedModelTurn = true;
    callback.close();
    // ... posts callback text to the surface

    So a recovering stall makes the messenger finalize and post whatever partial text it had streamed, treating the turn as complete — while the real recovered answer is produced later by the continuation and broadcast only to WebSocket connections, never to the messenger surface. Net: the messenger user gets a truncated/empty answer.

  2. chat()-driven "helper" sub-agents — e.g. the e2e HelperChatCallback pattern (packages/think/src/e2e-tests/worker.ts) where a parent drives a child via chat() and keys off onDone. A recovering stall resolves cleanly, so the driver thinks the child finished and reads its (incomplete) output as final.

Note: the primary sub-agent API (runAgentTool() / agentTool()) does not use this callback — it recovers via the durable child-run ledger + bounded re-attach (#1630/N9), so it is unaffected. This issue is specifically about the chat(msg, callback) RPC path.

Why none of the existing signals is correct

For a turn that was interrupted but is recovering:

  • onDone() → falsely signals success of an incomplete turn (messenger finalizes a partial message as the final answer).
  • onError() → falsely signals terminal failure of a turn that is actually recovering (messenger shows an error banner; a sub-agent driver aborts).
  • clean return (current) → the least-wrong of the existing signals, but silent: the caller can't react, and (for messengers) it actively truncates the answer.

The only strictly-correct option is a new, dedicated signal.

Options

A. Status quo — abandon the callback (clean return)

  • Pros: zero API change; matches deploy-interruption intent; avoids a false onDone/onError.
  • Cons: dishonest contract; silent; truncates messenger answers; caller can't distinguish success from interruption.

B. Signal via onError() with a recoverable marker

  • Pros: caller learns the attempt didn't complete normally.
  • Cons: onError means terminal failure; messengers would show an error for a turn that is recovering; every consumer must special-case the marker. Misleading in the other direction.

C. Add an optional onInterrupted() to StreamCallback (recommended)

export interface StreamCallback {
  onStart(event: ChatStartEvent): void | Promise<void>;
  onEvent(json: string): void | Promise<void>;
  onDone(): void | Promise<void>;
  onError(error: string): void | Promise<void>;
  /**
   * The current attempt was interrupted (stream-stall watchdog abort, or a
   * deploy/eviction) and a scheduled continuation owns the final outcome. This
   * is NOT done and NOT a terminal error. Consumers should avoid finalizing
   * (keep the channel open, show a "recovering…" state, or re-attach) instead
   * of treating a clean resolve as success. Optional → default no-op, so this
   * is fully backward-compatible.
   */
  onInterrupted?(): void | Promise<void>;
}
  • Pros: honest contract; non-breaking (optional); lets each consumer choose the right behavior (keep-open / recovering-UI / re-attach); cleanly distinguishes the three outcomes.
  • Cons: adds interface surface; requires wiring the consumers that care to be useful (the hook alone is inert).

Recommended solution (C) — implementation sketch

  1. Interface: add optional onInterrupted?() to StreamCallback (think.ts). Keep it optional so all existing implementers compile unchanged.

  2. Emit it from the interruption sites — replace the silent return in the stall branches with await callback.onInterrupted?.() before returning, in BOTH:

    • _streamResultToRpcCallback (the outcome === "scheduled" and "exhausted" branches)
    • _streamResult (symmetric branches)

    (Consider also emitting it where a deploy-interrupted chat()-driven turn is detected, so deploy + stall behave identically from the callback's perspective — needs a check for whether the live chat() path can observe deploy interruption, vs. only the isolate dying.)

  3. Wire the messenger delivery (messengers/delivery.ts + the TextStreamCallback in the same package): on onInterrupted, do not finalize/close-and-post. Options to decide during implementation:

    • keep the channel open and let a later mechanism deliver the recovered tail, or
    • surface a transient "recovering…" indicator and rely on the continuation,
    • and make delivery.ts NOT set completedModelTurn = true / NOT callback.close() when the turn was interrupted (so it doesn't post a truncated answer).

    This is the bulk of the work and the reason the hook is worth adding.

  4. Decide the chat()-driven sub-agent behavior — document that a clean resolve without onDone means "interrupted; await/re-attach," and update the e2e helper pattern accordingly.

Test plan

  • Unit: a stall-recovering chat() turn calls onInterrupted exactly once and does NOT call onDone/onError (both _streamResult and _streamResultToRpcCallback).
  • Unit: a normally-completing turn still calls onDone (no onInterrupted); a terminal failure still calls onError.
  • Messenger: a stall-recovering messenger turn does not post a truncated/empty final answer; the recovered answer is delivered (or the channel stays open / shows recovering), per the chosen wiring.
  • Backward-compat: an implementer that does not define onInterrupted behaves exactly as today.

Out of scope / related

Acceptance criteria

  • StreamCallback.onInterrupted?() exists and is documented.
  • The two stall branches (and, ideally, the deploy-interrupted chat() path) emit it instead of returning silently.
  • Messenger delivery no longer posts a truncated answer on a recovering interruption.
  • Existing StreamCallback implementers are unaffected (optional method).

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions