Skip to content

Commit 8fb7c03

Browse files
Stop provider tool-call replays from regressing tool part state (#1404) (#1412)
1 parent 290bd2c commit 8fb7c03

8 files changed

Lines changed: 1300 additions & 59 deletions

File tree

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
"agents": patch
3+
---
4+
5+
Make `applyChunkToParts` idempotent against an existing tool part with the same `toolCallId`, and add `isReplayChunk(parts, chunk)` for stream broadcasters that want to drop provider replay chunks ([#1404](https://github.com/cloudflare/agents/issues/1404)).
6+
7+
Some providers (notably the OpenAI Responses API) re-emit a prior tool call in continuation streams. The previous `tool-input-start` handler unconditionally pushed a fresh tool part, which produced duplicate parts in the message; `tool-input-delta` and `tool-input-available` overwrote a fully resolved input/state if a chunk happened to arrive for an already-known toolCallId. The new behavior:
8+
9+
- `tool-input-start` for a `toolCallId` that already exists in `parts` is a no-op (it does not push a duplicate or regress state).
10+
- `tool-input-delta` only mutates input while the existing part is still `input-streaming`.
11+
- `tool-input-available` only advances from `input-streaming` to `input-available`; replays against parts that have already moved past `input-streaming` (including `approval-requested`/`approval-responded` and any terminal state) are no-ops.
12+
13+
`isReplayChunk(parts, chunk)` is exported from `agents/chat` for stream broadcasters (e.g. `AIChatAgent._streamSSEReply`) that want to detect "this chunk is a replay of an already-known tool call" and skip re-broadcasting it. AI SDK v6's `updateToolPart` on the client mutates an existing tool part in place when the toolCallId matches, so re-broadcasting these replay chunks would visibly regress an `output-available` part to `input-streaming` on connected clients. `tool-output-available` is _not_ treated as a replay because its in-place update is safe when the output already matches.
14+
15+
Tool calls that the model genuinely wants to re-issue always carry a new toolCallId, so an existing match is never a legitimate "start over".
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
"@cloudflare/ai-chat": patch
3+
---
4+
5+
Stop provider tool-call replays from regressing tool part state during continuation streams ([#1404](https://github.com/cloudflare/agents/issues/1404)).
6+
7+
Some providers (notably the OpenAI Responses API) re-emit prior tool calls in continuation streams as a `tool-input-start``tool-input-delta``tool-input-available``tool-output-available` sequence carrying the _same_ `toolCallId` and the _same_ `output` the part already holds. The AI SDK's `updateToolPart` mutates an existing tool part in place when the toolCallId matches, so a replayed `tool-input-start` was clobbering an `output-available` part back to `input-streaming` on the client and producing the worker warn `_applyToolResult: Tool part with toolCallId X not in expected state`.
8+
9+
Two fixes:
10+
11+
- `_streamSSEReply` now drops replay tool-input chunks before broadcasting them to clients or storing them for resume, using the new shared `isReplayChunk` helper. The cloned server-side streaming message is never corrupted because `applyChunkToParts` is idempotent against existing toolCallIds for these chunk types (also fixed below).
12+
- `_applyToolResult` accepts `output-available` and `output-error` as valid starting states for _idempotent_ re-application. A duplicate `cf_agent_tool_result` (cross-tab re-run, redelivered WS frame, provider replay round-trip) is now a silent no-op rather than a warn + skipped update. The cross-message `tool-output-available`/`tool-output-error` fallback in `_streamSSEReply` gets the same tolerance.
13+
14+
`_findAndUpdateToolPart` skips the SQLite write and `MESSAGE_UPDATED` broadcast when the apply produced no semantic change, so idempotent re-applies don't churn UI on connected tabs.

packages/agents/src/chat/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export {
22
applyChunkToParts,
3+
isReplayChunk,
34
type MessageParts,
45
type MessagePart,
56
type StreamChunkData

packages/agents/src/chat/message-builder.ts

Lines changed: 110 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,22 @@ export function applyChunkToParts(
182182
}
183183

184184
case "tool-input-start": {
185+
// Idempotent against an existing tool part with the same toolCallId.
186+
// Some providers (notably the OpenAI Responses API) replay prior
187+
// tool calls in continuation streams as a fresh `tool-input-start`
188+
// → `tool-input-delta` → `tool-input-available` →
189+
// `tool-output-available` sequence carrying the original toolCallId
190+
// and original output. Without this guard a replay would push a
191+
// duplicate part into the streaming message *and* clobber the
192+
// original part's state when the AI SDK's mutate-in-place
193+
// `updateToolPart` processes the replay on the client (issue #1404).
194+
// A model that genuinely wants a fresh tool call always emits a
195+
// new toolCallId, so an existing match is never a legitimate
196+
// "start over".
197+
const existing = findToolPartByCallId(parts, chunk.toolCallId);
198+
if (existing) {
199+
return true;
200+
}
185201
parts.push({
186202
type: `tool-${chunk.toolName}`,
187203
toolCallId: chunk.toolCallId,
@@ -200,8 +216,15 @@ export function applyChunkToParts(
200216
}
201217

202218
case "tool-input-delta": {
219+
// Only mutate input while the tool is still actively input-streaming.
220+
// Deltas arriving after the tool has already advanced (input-available
221+
// or any terminal state) are provider replay and must not regress
222+
// a fully-formed input back to a partial one.
203223
const toolPart = findToolPartByCallId(parts, chunk.toolCallId);
204-
if (toolPart) {
224+
if (
225+
toolPart &&
226+
(toolPart as Record<string, unknown>).state === "input-streaming"
227+
) {
205228
(toolPart as Record<string, unknown>).input = chunk.input;
206229
}
207230
return true;
@@ -211,40 +234,59 @@ export function applyChunkToParts(
211234
const existing = findToolPartByCallId(parts, chunk.toolCallId);
212235
if (existing) {
213236
const p = existing as Record<string, unknown>;
214-
p.state = "input-available";
215-
p.input = chunk.input;
216-
if (chunk.providerExecuted != null) {
217-
p.providerExecuted = chunk.providerExecuted;
218-
}
219-
if (chunk.providerMetadata != null) {
220-
p.callProviderMetadata = chunk.providerMetadata;
221-
}
222-
if (chunk.title != null) {
223-
p.title = chunk.title;
237+
// Only advance from the streaming-input phase. Once the tool is
238+
// already at input-available or any terminal state
239+
// (output-available, output-error, output-denied,
240+
// approval-requested, approval-responded), this chunk is a
241+
// provider replay and must not regress state or overwrite a
242+
// resolved input/output. See the comment on tool-input-start.
243+
if (p.state === "input-streaming") {
244+
p.state = "input-available";
245+
p.input = chunk.input;
246+
if (chunk.providerExecuted != null) {
247+
p.providerExecuted = chunk.providerExecuted;
248+
}
249+
if (chunk.providerMetadata != null) {
250+
p.callProviderMetadata = chunk.providerMetadata;
251+
}
252+
if (chunk.title != null) {
253+
p.title = chunk.title;
254+
}
224255
}
225-
} else {
226-
parts.push({
227-
type: `tool-${chunk.toolName}`,
228-
toolCallId: chunk.toolCallId,
229-
toolName: chunk.toolName,
230-
state: "input-available",
231-
input: chunk.input,
232-
...(chunk.providerExecuted != null
233-
? { providerExecuted: chunk.providerExecuted }
234-
: {}),
235-
...(chunk.providerMetadata != null
236-
? { callProviderMetadata: chunk.providerMetadata }
237-
: {}),
238-
...(chunk.title != null ? { title: chunk.title } : {})
239-
} as MessagePart);
256+
return true;
240257
}
258+
parts.push({
259+
type: `tool-${chunk.toolName}`,
260+
toolCallId: chunk.toolCallId,
261+
toolName: chunk.toolName,
262+
state: "input-available",
263+
input: chunk.input,
264+
...(chunk.providerExecuted != null
265+
? { providerExecuted: chunk.providerExecuted }
266+
: {}),
267+
...(chunk.providerMetadata != null
268+
? { callProviderMetadata: chunk.providerMetadata }
269+
: {}),
270+
...(chunk.title != null ? { title: chunk.title } : {})
271+
} as MessagePart);
241272
return true;
242273
}
243274

244275
case "tool-input-error": {
245276
const existing = findToolPartByCallId(parts, chunk.toolCallId);
246277
if (existing) {
247278
const p = existing as Record<string, unknown>;
279+
// First-write-wins: a tool that's already terminal must not be
280+
// regressed (or re-decided as an error) by a later chunk. A
281+
// tool-input-error here is either provider replay or a confused
282+
// upstream — preserve the existing terminal state.
283+
if (
284+
p.state === "output-available" ||
285+
p.state === "output-error" ||
286+
p.state === "output-denied"
287+
) {
288+
return true;
289+
}
248290
p.state = "output-error";
249291
p.errorText = chunk.errorText;
250292
p.input = chunk.input;
@@ -362,6 +404,48 @@ export function applyChunkToParts(
362404
}
363405
}
364406

407+
/**
408+
* Returns true if `chunk` would be a no-op replay against the already-known
409+
* `parts` — i.e. some upstream is re-emitting events for a tool call that
410+
* the message has already advanced past.
411+
*
412+
* Used by stream broadcasters to suppress re-broadcasting these chunks to
413+
* connected clients. AI SDK v6's `updateToolPart` mutates an existing tool
414+
* part in place when a chunk arrives with a matching `toolCallId`, so a
415+
* replayed `tool-input-start` would clobber an `output-available` part back
416+
* to `input-streaming` on the client (issue #1404).
417+
*
418+
* Only returns true when re-broadcasting would *visibly regress* state on
419+
* a v6 client. Safe-by-construction chunk types (e.g. `tool-output-available`
420+
* carrying the same output the part already has) return false.
421+
*
422+
* Conditions:
423+
* - `tool-input-start` for a `toolCallId` that already exists in `parts`.
424+
* - `tool-input-delta` for a `toolCallId` whose existing part is no longer
425+
* `input-streaming`.
426+
* - `tool-input-available` for a `toolCallId` whose existing part is no
427+
* longer `input-streaming` (i.e. has already advanced to `input-available`
428+
* or any terminal state).
429+
*/
430+
export function isReplayChunk(
431+
parts: MessagePart[],
432+
chunk: StreamChunkData
433+
): boolean {
434+
if (
435+
chunk.type !== "tool-input-start" &&
436+
chunk.type !== "tool-input-delta" &&
437+
chunk.type !== "tool-input-available"
438+
) {
439+
return false;
440+
}
441+
if (!chunk.toolCallId) return false;
442+
const existing = findToolPartByCallId(parts, chunk.toolCallId);
443+
if (!existing) return false;
444+
if (chunk.type === "tool-input-start") return true;
445+
const state = (existing as Record<string, unknown>).state;
446+
return state !== "input-streaming";
447+
}
448+
365449
/**
366450
* Finds the last part in the array matching the given type.
367451
* Searches from the end for efficiency (the part we want is usually recent).

0 commit comments

Comments
 (0)