Skip to content

Think.saveMessages should accept an external AbortSignal so callers can cancel an in-flight turn from outside #1406

@threepointone

Description

@threepointone

TL;DR

Today there is no public way for an external caller to abort an in-flight Think.saveMessages turn. The method generates its own requestId synchronously at the top of the body and only surfaces it via the resolved return value — by which time the inference loop has already finished. The only abort surfaces are MSG_CHAT_CANCEL over a chat WebSocket and _aborts.destroyAll() (private) on agent shutdown, neither of which fits the "I, the caller, want to cancel THIS turn" use case.

This blocks helper-as-sub-agent patterns where the helper drives a Think turn on behalf of a parent and the parent needs to propagate its own abort signal into the helper's inference loop.

Where this surfaces

examples/agents-as-tools (PR #1405) builds a chat-helper-as-Think-sub-agent pattern. The parent's tool execute receives the AI SDK's abortSignal, and threads it into a per-helper _runHelperTurn call. The helper itself is a Think instance whose runTurnAndStream opens a ReadableStream over DO RPC; the parent reads that stream and forwards its chunks live as helper-event envelopes.

When the parent's chat turn aborts (Stop button, tab close, sibling abort), the parent cancels the helper RPC reader. Workerd propagates that cancel to the helper's ReadableStream cancel callback. The cancel callback then has no good way to actually abort the helper's in-flight Think turn, because the only way to cancel a specific Think turn is _aborts.cancel(requestId), and the helper doesn't have access to the requestId Think generated for THIS particular saveMessages call.

We tried two approaches; both have problems:

Approach 1: capture requestId from saveMessages's return — DOES NOT WORK

async runTurnAndStream(query: string, helperId: string) {
  const self = this;
  return new ReadableStream({
    async start(controller) {
      const result = await self.saveMessages([{ /* ... */ }]);
      self._activeRequestId = result.requestId;
      // ... releaseClaim() clears _activeRequestId synchronously below
    },
    cancel() {
      // Targeted: cancel just this turn.
      if (self._activeRequestId !== undefined) {
        self._aborts.cancel(self._activeRequestId);
      }
    }
  });
}

This is dead code. _activeRequestId is only assigned after saveMessages resolves (i.e. after the inference loop has finished). The synchronous span between the assignment and releaseClaim() clearing it back to undefined has no awaits, so cancel() cannot ever observe it set during a real cancellation. The reviewer of #1405 caught this; we removed the field.

Approach 2: best-effort _aborts.destroyAll() — RACES

cancel() {
  // Helper is single-purpose (one in-flight turn at a time), so
  // destroying all controllers is equivalent to "cancel the one
  // turn that's running".
  this._aborts.destroyAll();
}

This works if saveMessages has already reached its internal _aborts.getSignal(requestId) call by the time cancel() fires. But that call is buried after several internal awaits (keepAliveWhile_turnQueue.enqueueappendMessage_broadcastMessagesgetSignal). For an early cancel — pre-aborted parent signal, very fast tab close — the registry is empty when destroyAll() runs, the call is a no-op, and the inference still runs to completion. One inference pass of wasted Workers AI tokens per early-cancelled helper turn.

This is what examples/agents-as-tools ships today. It's correct in steady state ("Stop after several seconds of streaming"), wrong on early cancel.

Proposed fix

Add an optional signal: AbortSignal argument to Think.saveMessages (and parallel methods that drive a turn — submitMessage, etc.). When set, saveMessages's internal _aborts.getSignal(requestId) would linkSignals(internal, options.signal) so the inference loop aborts when either Think's own registry signals OR the external signal does:

interface SaveMessagesOptions {
  /**
   * External abort signal. If aborted, the in-flight turn is
   * cancelled the same way `MSG_CHAT_CANCEL` would cancel it
   * over a chat WebSocket: the inference loop's signal aborts,
   * `_streamResult` short-circuits, and the row's status
   * resolves to `error` with the abort reason as the message.
   *
   * Linked with the request's own `_aborts` controller so
   * either source can trigger the abort; the registry's
   * controller still surfaces via `MSG_CHAT_CANCEL` etc.
   */
  signal?: AbortSignal;
}

async saveMessages(
  messages: UIMessage[] | ((curr: UIMessage[]) => UIMessage[] | Promise<UIMessage[]>),
  options?: SaveMessagesOptions
): Promise<SaveMessagesResult>;

Implementation sketch (rough — linkSignals is illustrative; could be inlined as addEventListener("abort", () => internal.abort(reason))):

async saveMessages(messages, options) {
  const requestId = crypto.randomUUID();
  // ... existing setup ...
  await this.keepAliveWhile(async () => {
    await this._turnQueue.enqueue(requestId, async () => {
      // ... existing setup ...
      const internal = this._aborts.getSignal(requestId)!;
      if (options?.signal) {
        if (options.signal.aborted) {
          this._aborts.cancel(requestId);
        } else {
          options.signal.addEventListener(
            "abort",
            () => this._aborts.cancel(requestId),
            { once: true }
          );
        }
      }
      // ... existing _runInferenceLoop({ signal: internal, ... }) ...
    });
  });
}

With that, the helper-as-sub-agent pattern becomes trivial:

async runTurnAndStream(query: string, helperId: string) {
  const controller = new AbortController();
  return new ReadableStream({
    async start(controller_) {
      try {
        await this.saveMessages([/* ... */], { signal: controller.signal });
      } catch (err) {
        controller_.error(err);
        return;
      }
      controller_.close();
    },
    cancel() {
      controller.abort(new Error("parent cancelled the helper turn"));
    }
  });
}

No race window, no destroyAll blast radius, no reaching into _aborts via bracket access. The same pattern works for any external "abort this specific Think turn" caller (an HTTP endpoint, an alarm-driven cleanup, a sibling DO).

Backwards-compatibility

Fully additive. Default behavior unchanged — callers that don't pass options.signal get exactly today's flow. Existing MSG_CHAT_CANCEL-driven aborts also keep working because the registry's own controller is still in place.

Out of scope

  • A public Think.abortRequest(id) method. Would be useful for a different shape of caller ("I have a requestId and want to cancel it from outside") but doesn't help the helper-as-sub-agent pattern, where the requestId isn't known to the caller.
  • An equivalent for submitMessage and other turn-driving methods. Worth adding for consistency once the saveMessages shape is settled.

Workaround

Until this lands, callers can fall back to _aborts.destroyAll() from a private-bracket access — the examples/agents-as-tools example does this with explicit comments about the race window. See the helper's abortCurrentTurn method and the cancellation paragraph in its README.

Context

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions