fix(chat): orphaned-stream recovery no longer merges a new turn into the previous message (#1691)#1693
Merged
Merged
Conversation
…the previous message (#1691) When an AIChatAgent stream is interrupted before its assistant message is persisted (Durable Object hibernation, deploy churn, isolate restart, reconnect), orphan recovery reconstructs the message from stored chunks. If the chunks carry no provider `start.messageId` — the common case with `streamText(...).toUIMessageStreamResponse()`, where the id is assigned client-side — recovery used to fall back to the LAST assistant message in history. That is correct for a continuation, but wrong for a normal new turn after a later user message: the recovered chunks were appended onto the PREVIOUS assistant message, corrupting both the persisted transcript and future model context. Core fix - ResumableStream now persists the allocated assistant message id in stream metadata (`message_id` column, added via a one-time, schema-checked migration) and exposes `getStreamMessageId()`. - `_persistOrphanedStream` keys recovery on that stored id when the chunks carry no provider `start.messageId`, so a new turn becomes its own message and a continuation still merges into the message it was extending (it stored the cloned last-assistant id). A provider `start.messageId` still wins when present. Pre-migration rows keep the legacy last-assistant fallback. - Dropped the now-unused `is_continuation` metadata column. Two related variants of the same corruption on the durable (chatRecovery) continuation path, found during review and fixed here: - Early-persist + recovery (e.g. a tool-approval pause) re-appended chunks it had already stored, duplicating a tool call's parts. Recovery now skips reconstructed parts whose `toolCallId` already exists on the message. - A new turn interrupted before any assistant part was persisted — cut off before the first chunk materialized, or discarded via `onChatRecovery` returning `{ persist: false }` — was "continued" by cloning the previous assistant message and merging into it. `_handleInternalFiberRecovery` now detects that the conversation leaf is still the unanswered user message (no partial to continue) and re-runs the turn fresh, so it becomes its own message. @cloudflare/think is unaffected — its session-tree recovery already allocates a distinct message id per orphan and never falls back to the last assistant message. Tests - New regression + wiring tests in durable-chat-recovery, resumable-streaming, and the test worker, including the fiber-continuation happy path and the two edge cases (empty partial, persist:false) that previously merged. Verification - Verified live against real LLMs (Workers AI, OpenAI, Anthropic) and Think via a SIGKILL-mid-stream / restart harness (wip/issue-1691-live): the recovered turn always lands as its own message and the previous turn is untouched. - Cross-model continuation with large partials is clean (no duplication, no restarts); OpenAI and Anthropic resume a truncated partial to completion. The harness and its methodology notes are documented in its README.
🦋 Changeset detectedLatest commit: 63f8da4 The changes in this PR will be included in the next version bump. This PR includes changesets to release 2 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
- Report `recoveryKind: "retry"` to `onChatRecovery` and the incident record for an empty-partial new turn (interrupted before any chunk), since that case is deterministically a retry — it's knowable before the hook runs. The `persist: false` sibling case still reports "continue" (it only becomes a retry based on the hook's own return value) and the comment documents why. - Await `_persistOrphanedStream` in the `triggerInterruptedStreamCheck` test helper so it matches the production fiber-recovery path (latent test-only race, harmless in practice but now correct). - Rename the two `wip/` package.json names to the `@cloudflare/agents-*` prefix so changesets' ignore glob excludes them from versioning/release.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes #1691. When an
AIChatAgentstream is interrupted before its assistant message is persisted (Durable Object hibernation, deploy churn, isolate restart, reconnect), orphan recovery reconstructs the message from stored chunks. If those chunks carry no providerstart.messageId— the common case withstreamText(...).toUIMessageStreamResponse(), where the id is assigned client-side — recovery used to fall back to the last assistant message in history.That is correct for a continuation, but wrong for a normal new turn after a later user message: the recovered chunks were appended onto the previous assistant message, corrupting both the persisted transcript and future model context.
The fix
Core
ResumableStreamnow persists the allocated assistant message id in stream metadata (message_idcolumn, added via a one-time, schema-checked migration) and exposesgetStreamMessageId()._persistOrphanedStreamkeys recovery on that stored id when the chunks carry no providerstart.messageId, so a new turn becomes its own message and a continuation still merges into the message it was extending (it stored the cloned last-assistant id). A providerstart.messageIdstill wins when present. Pre-migration rows keep the legacy last-assistant fallback.is_continuationmetadata column.Two related variants of the same corruption (found during review, fixed here)
toolCallIdalready exists on the message.onChatRecoveryreturning{ persist: false }— was "continued" by cloning the previous assistant message and merging into it._handleInternalFiberRecoverynow detects that the conversation leaf is still the unanswered user message (no partial to continue) and re-runs the turn fresh, so it becomes its own message.@cloudflare/thinkis unaffected — its session-tree recovery already allocates a distinct message id per orphan and never falls back to the last assistant message.Tests
durable-chat-recovery,resumable-streaming, and the test worker, including the fiber-continuation happy path and the two edge cases (empty partial,persist: false) that previously merged.pnpm run check(93 projects) green.Verification (real LLMs)
A SIGKILL-mid-stream / restart harness (
wip/issue-1691-live/, included and documented) drives the exact #1691 sequence against real models:Think), the recovered turn always lands as its own message and the previous turn is untouched.Changeset
@cloudflare/ai-chat+agents— patch.