Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/agents-abort-registry-link-external.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"agents": patch
---

Add `AbortRegistry.linkExternal(id, signal)` for connecting external `AbortSignal`s to per-request abort controllers, and add `"aborted"` to the `SaveMessagesResult.status` union ([#1406](https://github.com/cloudflare/agents/issues/1406)).

`linkExternal` is the integration point for callers that drive a chat turn programmatically and want to cancel it from outside without knowing the internally-generated request id (the helper-as-sub-agent pattern, where a parent's `AbortSignal` from the AI SDK tool `execute` needs to land inside a sub-agent's `saveMessages` call). When the external signal aborts, the registry's controller for `id` is cancelled — the same path `chat-request-cancel` takes over the WebSocket. The returned detacher must be called in `finally` to avoid leaking listeners on long-lived parent signals.

`SaveMessagesResult.status` now includes `"aborted"` alongside `"completed"` and `"skipped"`. Existing callers that only switch on `"completed"` are unaffected; turns cancelled via the new signal API surface as `"aborted"` rather than `"completed"`.

Also exposes `SaveMessagesOptions` from `agents/chat` for use by `@cloudflare/think` and `@cloudflare/ai-chat` typed APIs. `AbortRegistry.cancel(id, reason?)` now accepts an optional reason that flows through to `signal.reason` on the cancelled controller.

See [`cloudflare/agents#1406`](https://github.com/cloudflare/agents/issues/1406) for the motivating use case.
27 changes: 27 additions & 0 deletions .changeset/ai-chat-savemessages-abort-signal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
"@cloudflare/ai-chat": patch
---

Add `options.signal` to `AIChatAgent.saveMessages` and `continueLastTurn` for external cancellation of programmatic turns, plus protected `abortRequest(id)` / `abortAllRequests()` methods ([#1406](https://github.com/cloudflare/agents/issues/1406)).

`saveMessages` and `continueLastTurn` accept a second `SaveMessagesOptions` argument:

```typescript
const result = await this.saveMessages(messages, { signal: controller.signal });
if (result.status === "aborted") {
// Inference loop terminated mid-stream; partial chunks persisted.
}
```

The signal is linked to AIChatAgent's per-turn `AbortController` and produces the same end state as a `chat-request-cancel` WebSocket message: the inference loop's signal aborts, partial chunks persist, the result reports `status: "aborted"`, and `onChatResponse` fires with the same status. Pre-aborted signals short-circuit before any model work runs. Listeners are detached cleanly when the turn finishes, so the same long-lived signal can be passed to many turns without leaking.

`abortRequest(id, reason?)` and `abortAllRequests()` are protected entry points for subclasses that want to cancel turns without tracking ids.

`SaveMessagesResult.status` now includes `"aborted"` alongside `"completed"` and `"skipped"`. Existing callers that only switch on `"completed"` are unaffected.

**Limitations.**

- `AbortSignal` cannot cross Durable Object RPC. Construct the controller inside the DO that calls `saveMessages`.
- The signal lives in memory only. If the DO hibernates mid-turn and `chatRecovery` is enabled, the recovered turn runs without the original signal.

See [`cloudflare/agents#1406`](https://github.com/cloudflare/agents/issues/1406) for the motivating use case.
34 changes: 34 additions & 0 deletions .changeset/think-savemessages-abort-signal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
"@cloudflare/think": patch
---

Add `options.signal` to `Think.saveMessages` and `Think.continueLastTurn` for external cancellation of programmatic turns, plus protected `abortRequest(id)` / `abortAllRequests()` methods to replace bracket access into the private `_aborts` registry ([#1406](https://github.com/cloudflare/agents/issues/1406)).

`saveMessages` and `continueLastTurn` accept a second `SaveMessagesOptions` argument:

```typescript
const result = await this.saveMessages(messages, { signal: controller.signal });
if (result.status === "aborted") {
// Inference loop terminated mid-stream; partial chunks persisted.
}
```

The signal is linked to Think's per-turn `AbortController` for the duration of the call. When it aborts:

- the inference loop's signal aborts (the same path `chat-request-cancel` takes);
- partial chunks already streamed are persisted to the resumable stream;
- `saveMessages` resolves with `{ status: "aborted" }`;
- `onChatResponse` fires with `status: "aborted"`.

Pre-aborted signals short-circuit before any model work runs. Listeners are detached cleanly when the turn finishes, so passing the same long-lived `AbortSignal` to many turns (e.g. a parent chat-turn signal driving multiple sub-agent calls) is safe and leak-free.

`abortRequest(id, reason?)` and `abortAllRequests()` are protected entry points for DO subclasses (e.g. RPC-driven helpers) that want to cancel turns without tracking ids — they replace the historical `(this as unknown as { _aborts: ... })._aborts.destroyAll()` workaround used by helper-as-sub-agent implementations.

`SaveMessagesResult.status` now includes `"aborted"` alongside `"completed"` and `"skipped"`. Existing callers that only switch on `"completed"` are unaffected.

**Limitations.**

- `AbortSignal` cannot cross Durable Object RPC. Construct the controller inside the DO that calls `saveMessages`. To bridge a parent's intent into a child DO, return a `ReadableStream` from the child whose `cancel` callback aborts a per-turn controller — `examples/agents-as-tools` shows the canonical pattern.
- The signal lives in memory only. If the DO hibernates mid-turn and `chatRecovery` is enabled, the recovered turn calls `continueLastTurn()` internally without the original signal — an abort fired after restart has no effect on the recovered turn.

See [`cloudflare/agents#1406`](https://github.com/cloudflare/agents/issues/1406) for the motivating use case.
32 changes: 28 additions & 4 deletions docs/chat-agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,28 @@ messages against the latest persisted transcript when the turn actually starts.
This avoids stale baselines when multiple `saveMessages()` calls queue up
behind active work.

`saveMessages()` returns `{ requestId, status }` so callers can detect whether
the turn ran (`"completed"`) or was skipped because the chat was cleared
(`"skipped"`).
`saveMessages()` returns `{ requestId, status }`. The `status` is `"completed"`
when the turn ran, `"skipped"` when it was invalidated (chat cleared mid-flight),
or `"aborted"` when an external `AbortSignal` cancelled it before completion.

Pass `options.signal` to cancel the turn from outside. Useful for forwarding
an upstream `AbortSignal` (e.g. an AI SDK tool `execute`'s `abortSignal` on a
parent agent) into a child DO's `saveMessages` call without knowing the
internally-generated request id:

```typescript
const result = await this.saveMessages([...this.messages, newMessage], {
signal: abortController.signal
});
if (result.status === "aborted") {
// Partial chunks already streamed are persisted; the inference
// loop terminated when the signal aborted.
}
```

The same `options.signal` is accepted by `continueLastTurn()`. See
[`cloudflare/agents#1406`](https://github.com/cloudflare/agents/issues/1406)
for the helper-as-sub-agent pattern that motivated the API.

### `onChatResponse`

Expand Down Expand Up @@ -678,9 +697,14 @@ override async onChatResponse(result: ChatResponseResult): Promise<void> {
Appends to the last assistant message by re-calling `onChatMessage` with the saved request body. The response is streamed as a continuation — appended to the existing assistant message, not a new one. No synthetic user message is created.

```typescript
protected continueLastTurn(body?: Record<string, unknown>): Promise<SaveMessagesResult>;
protected continueLastTurn(
body?: Record<string, unknown>,
options?: SaveMessagesOptions
): Promise<SaveMessagesResult>;
```

The optional `options.signal` accepts an external `AbortSignal` for cancellation, matching the `saveMessages` contract.

Called automatically by the default recovery path. Can also be called manually from scheduled callbacks or other entry points. The optional `body` parameter merges with the saved `_lastBody`.

#### Stashing recovery data
Expand Down
34 changes: 34 additions & 0 deletions docs/server-driven-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,39 @@ The `messageConcurrency` setting on `AIChatAgent` controls how overlapping user
| `onRequest()` | Handle webhooks and call `saveMessages` |
| `this.broadcast()` | Broadcast custom state from `onChatResponse` |

## Cancelling a server-driven turn

Pass `options.signal` to cancel a programmatic turn from outside without knowing the internally-generated request id:

```typescript
async runLongTask(query: string, abortSignal: AbortSignal) {
const result = await this.saveMessages(
[{ id: crypto.randomUUID(), role: "user", parts: [{ type: "text", text: query }] }],
{ signal: abortSignal }
);

if (result.status === "aborted") {
// The signal aborted mid-stream. Partial chunks are still persisted.
}
}
```

When the signal aborts:

- the inference loop's signal aborts (same path `chat-request-cancel` takes);
- partial chunks streamed before the abort are persisted;
- `saveMessages` resolves with `{ status: "aborted" }`;
- `onChatResponse` fires with `status: "aborted"`.

Pre-aborted signals short-circuit before any model work runs.

### Limitations

- **Signals cannot cross Durable Object boundaries.** `AbortSignal` is not an RPC-serializable type. Construct the controller inside the DO that calls `saveMessages`. To bridge a parent's abort intent into a child DO, return a `ReadableStream` from the child and let the parent cancel it — workerd propagates the cancel back to the source's `cancel` callback. See `examples/agents-as-tools` for the canonical helper-as-sub-agent pattern.
- **Hibernation drops the listener.** The signal lives in memory. If the DO hibernates mid-turn and `chatRecovery` is enabled, the recovered turn calls `continueLastTurn()` internally without the original signal — an abort fired after restart has no effect on the recovered turn. Override `onChatRecovery` (Think) or set `chatRecovery = false` for callers that need stronger guarantees.

This is the integration point for helper-as-sub-agent patterns where the parent's AI SDK abort signal needs to propagate into a child DO's `saveMessages` call. See [`cloudflare/agents#1406`](https://github.com/cloudflare/agents/issues/1406) for the original use case.

## Important notes

- **`saveMessages` is awaitable.** After it returns, the LLM has responded and the message is persisted. Use this when you control the trigger.
Expand All @@ -398,4 +431,5 @@ The `messageConcurrency` setting on `AIChatAgent` controls how overlapping user
- **Messages are persisted before `onChatResponse` fires.** If the Durable Object evicts during the hook, the conversation is safe in SQLite — only the hook callback is lost.
- **`waitUntilStable()` before injecting.** Always call this from schedule callbacks, webhooks, or other non-chat entry points to avoid overlapping with an in-flight stream or pending tool interaction.
- **The client sees `done: true` before `onChatResponse` runs.** The server-side hook does not delay the client.
- **`saveMessages` accepts `options.signal` for external cancellation.** Useful when forwarding an upstream `AbortSignal` (e.g. from an AI SDK tool `execute` on a parent agent) into a child DO's chat turn.
- **`messageConcurrency` does not affect `saveMessages`.** Server-driven messages always queue and execute in order.
16 changes: 16 additions & 0 deletions docs/think/lifecycle-hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,22 @@ async onChatResponse(result: ChatResponseResult) {
}
```

Distinguish abort from error:

```typescript
async onChatResponse(result: ChatResponseResult) {
if (result.status === "aborted") {
// Cancelled via chat-request-cancel or saveMessages({ signal })
// — partial chunks are persisted, the message is the partial
// assistant transcript at the moment of abort.
this.logAbortMetric(result.requestId);
} else if (result.status === "error") {
// Inference threw — `result.error` carries the error message.
console.error(`Turn ${result.requestId} errored: ${result.error}`);
}
}
```

---

## onChatError
Expand Down
113 changes: 109 additions & 4 deletions docs/think/sub-agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,18 @@ When aborted, the partial assistant message is still persisted.

```typescript
async saveMessages(
messages: UIMessage[] | ((current: UIMessage[]) => UIMessage[] | Promise<UIMessage[]>)
messages: UIMessage[] | ((current: UIMessage[]) => UIMessage[] | Promise<UIMessage[]>),
options?: SaveMessagesOptions
): Promise<SaveMessagesResult>
```

Returns `{ requestId, status }` where `status` is `"completed"` or `"skipped"`.
Returns `{ requestId, status }` where `status` is `"completed"`, `"skipped"`, or `"aborted"`.

| `status` | When |
| ------------- | ------------------------------------------------------------------------------------------------------------------------- |
| `"completed"` | Turn ran to completion. |
| `"skipped"` | Turn invalidated mid-flight (e.g. by `chat-clear`); user message persisted, no model run. |
| `"aborted"` | Turn cancelled before completion via `options.signal` or `chat-request-cancel`. Partial assistant chunks still persisted. |

### Static messages

Expand Down Expand Up @@ -217,6 +224,87 @@ async onChatResponse(result: ChatResponseResult) {
}
```

### External cancellation with `options.signal`

`saveMessages` accepts an `AbortSignal` so callers can cancel the turn from outside without knowing the internally-generated request id. The signal is linked to Think's per-turn `AbortController`; when it aborts:

- the inference loop's signal aborts (the same path `chat-request-cancel` takes);
- partial chunks already streamed are persisted to the resumable stream;
- `saveMessages` resolves with `{ status: "aborted" }`;
- `onChatResponse` fires with `status: "aborted"`.

If the signal is **already aborted** when `saveMessages` is called, no inference work runs.

```typescript
class MyAgent extends Think<Env> {
async runWithTimeout(text: string) {
const controller = new AbortController();
setTimeout(() => controller.abort(), 30_000);

const { status } = await this.saveMessages(
[
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text }]
}
],
{ signal: controller.signal }
);

if (status === "aborted") {
console.log("Turn cancelled by external signal");
}
}
}
```

#### Crossing DO boundaries

`AbortSignal` cannot be passed as an RPC argument across Durable Object boundaries — workerd's JSRPC layer rejects it at serialization time. Construct the controller **inside** the DO that calls `saveMessages` and bridge the parent's intent through a different mechanism (typically a `ReadableStream` returned from the child whose `cancel` callback aborts the per-turn controller).

The reference implementation lives in `examples/agents-as-tools`:

```typescript
// In the helper sub-agent (Think subclass).
async runTurnAndStream(query: string): Promise<ReadableStream<Uint8Array>> {
// Per-turn controller — owned in this DO so the signal is local.
const turnAbort = new AbortController();

return new ReadableStream<Uint8Array>({
async start(controller) {
await this.saveMessages([userMsg(query)], { signal: turnAbort.signal });
// ... stream chunks via the broadcast tee ...
controller.close();
},
cancel(reason) {
// workerd propagates the parent's reader.cancel() here.
turnAbort.abort(reason);
}
});
}

// In the parent's tool execute.
const stream = await helper.runTurnAndStream(query);
const reader = stream.getReader();
// Forward the parent's AI SDK abort signal by cancelling the local reader —
// workerd propagates the cancel back across RPC to the helper's source.
parentSignal.addEventListener("abort", () => reader.cancel(parentSignal.reason),
{ once: true });
```

#### Hibernation and recovery

The external signal lives in memory only. If the Durable Object hibernates mid-turn and `chatRecovery` is enabled, the recovered turn runs via `continueLastTurn()` **without** the original `options.signal` — the listener was lost on eviction, and the recovery path has no way to reach back to the original caller.

In practice this means:

- A signal that aborts **after** the DO restarts has no effect on the recovered turn.
- Subclasses that need the recovered turn to honor a fresh signal should override `onChatRecovery` and reject continuation (`return { continue: false }`) when the original caller is gone.
- The `examples/agents-as-tools` helper sets `chatRecovery = false` for exactly this reason: helpers are per-turn workers driven by an active parent, and silently re-running on wake would burn an inference call on no consumer.

See [`cloudflare/agents#1406`](https://github.com/cloudflare/agents/issues/1406) for the original motivation, and `examples/agents-as-tools` for the helper-as-sub-agent reference implementation.

---

## continueLastTurn
Expand All @@ -225,13 +313,30 @@ Resume the last assistant turn without injecting a new user message. Useful afte

```typescript
protected async continueLastTurn(
body?: Record<string, unknown>
body?: Record<string, unknown>,
options?: SaveMessagesOptions
): Promise<SaveMessagesResult>
```

Returns `{ requestId, status: "skipped" }` if the last message is not an assistant message.

The optional `body` parameter overrides the stored body for this continuation. If omitted, the last body from the previous turn is used.
The optional `body` parameter overrides the stored body for this continuation. If omitted, the last body from the previous turn is used. The optional `options.signal` accepts an external `AbortSignal` for cancellation, matching the `saveMessages` contract.

---

## Aborting in-flight turns

For callers that don't have access to the `requestId` but need a coarse "cancel whatever is running" handle (e.g. an RPC-driven sub-agent helper that runs one turn at a time), Think exposes two protected methods:

```typescript
protected abortRequest(requestId: string, reason?: unknown): void
protected abortAllRequests(): void
```

- **`abortRequest(id, reason?)`** — abort a specific in-flight turn by id. No-op if no controller exists for that id. Equivalent to a client `chat-request-cancel`.
- **`abortAllRequests()`** — abort every in-flight controller in the registry. Used by single-purpose sub-agents that don't track ids.

Both methods produce the same end state as `chat-request-cancel`: inference loop terminates, partial chunks persist, the turn's `ChatResponseResult` reports `status: "aborted"`. Prefer `options.signal` on `saveMessages` / `continueLastTurn` when driving a turn programmatically — it threads the abort intent in from turn start without requiring the caller to know the id.

---

Expand Down
Loading
Loading