Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .changeset/chat-recovery-progress-monotonic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@cloudflare/think": patch
"@cloudflare/ai-chat": patch
---

Fix chat recovery prematurely exhausting its retry budget under compaction
(#1628). The deploy-churn forward-progress signal — which resets the recovery
budget when an interrupted turn is actually advancing — was recomputed from the
live transcript by counting assistant messages. Compaction collapses older
assistant messages into a summary, lowering that count, so a turn that had
genuinely advanced could read as "no progress" between recovery attempts and
exhaust at `maxAttempts`, sealing a healthy turn. Progress is now tracked by a
durable, monotonic counter incremented when `_persistOrphanedStream` materializes
a non-empty partial (the exact event the message count was proxying for), so
compaction can never lower it. A turn that genuinely fails to advance still
exhausts at the cap, and the 15-minute wall-clock ceiling is unchanged.
61 changes: 43 additions & 18 deletions packages/ai-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,23 @@ type ChatRecoveryIncident = {
lastAttemptAt: number;
reason?: string;
/**
* High-water mark of a monotonic recovery-progress signal (count of persisted
* assistant messages) observed for this incident. Used to distinguish a turn
* that is making forward progress but keeps getting interrupted by isolate
* resets (deploys) — which should NOT exhaust the budget — from one that
* genuinely fails to advance.
* High-water mark of the durable, monotonic recovery-progress counter (see
* `_chatRecoveryProgressMarker`) observed for this incident. Used to
* distinguish a turn that is making forward progress but keeps getting
* interrupted by isolate resets (deploys) — which should NOT exhaust the
* budget — from one that genuinely fails to advance. Sourced from a persisted
* counter rather than the live transcript so compaction cannot lower it
* (#1628).
*/
progress?: number;
};

const CHAT_RECOVERY_INCIDENT_KEY_PREFIX = "cf:chat-recovery:incident:";
// Durable, monotonic forward-progress counter for recovery budget resets.
// Incremented once per non-empty partial materialized by
// `_persistOrphanedStream`; never recomputed from the (compactable) transcript.
// See `_chatRecoveryProgressMarker`.
const CHAT_RECOVERY_PROGRESS_KEY = "cf:chat-recovery:progress";
const DEFAULT_CHAT_RECOVERY_MAX_ATTEMPTS = 6;
const DEFAULT_CHAT_RECOVERY_STABLE_TIMEOUT_MS = 10_000;
// Delay before retrying a recovery that timed out waiting for stable state.
Expand Down Expand Up @@ -994,7 +1001,7 @@ export class AIChatAgent<
// assistant message from stored chunks and persist it so it
// survives further page refreshes.
if (orphanedStreamId) {
this._persistOrphanedStream(orphanedStreamId);
await this._persistOrphanedStream(orphanedStreamId);
}
} else if (this._resumableStream.hasActiveStream()) {
// Ignore ACKs for a different active stream request id.
Expand Down Expand Up @@ -1311,7 +1318,7 @@ export class AIChatAgent<
* message parts, then persists the result so it survives further refreshes.
* @internal
*/
protected _persistOrphanedStream(streamId: string) {
protected async _persistOrphanedStream(streamId: string): Promise<void> {
const chunks = this._resumableStream.getStreamChunks(streamId);
if (!chunks.length) return;

Expand Down Expand Up @@ -1378,7 +1385,12 @@ export class AIChatAgent<
existingIdx >= 0
? this.messages.map((m, i) => (i === existingIdx ? message : m))
: [...this.messages, message];
this.persistMessages(updatedMessages);
await this.persistMessages(updatedMessages);
// Real forward progress: a non-empty partial was materialized. Advance
// the durable, compaction-immune progress counter so a deploy-churned
// turn that keeps producing content resets its recovery budget instead
// of exhausting it (#1628).
await this._bumpChatRecoveryProgress();
}
}

Expand Down Expand Up @@ -3004,18 +3016,31 @@ export class AIChatAgent<
}

/**
* Monotonic-ish recovery-progress signal: the number of persisted assistant
* messages. An interrupted partial is persisted as an assistant message (and
* not pruned), so this grows whenever recovery makes forward progress and
* never shrinks mid-incident.
* Monotonic forward-progress signal for recovery budget resets.
*
* This used to count assistant messages in `this.messages`, but that is
* recomputed from the live, mutable transcript. Compaction collapses older
* assistant messages into a summary, lowering the count — so a turn that had
* genuinely advanced could read as "no progress" between attempts and exhaust
* its budget prematurely (#1628). Instead we read a durably-persisted counter
* that only ever increments — bumped once per non-empty partial materialized
* by `_persistOrphanedStream`, which is the exact "forward progress" event the
* old message-count tracked — so compaction can never lower it.
*/
private _chatRecoveryProgressMarker(): number {
return this.messages.reduce(
(count, message) => (message.role === "assistant" ? count + 1 : count),
0
private async _chatRecoveryProgressMarker(): Promise<number> {
return (
(await this.ctx.storage.get<number>(CHAT_RECOVERY_PROGRESS_KEY)) ?? 0
);
}

/** Advance the durable recovery-progress counter. Called when a partial
* assistant message is materialized (real forward progress). */
private async _bumpChatRecoveryProgress(): Promise<void> {
const current =
(await this.ctx.storage.get<number>(CHAT_RECOVERY_PROGRESS_KEY)) ?? 0;
await this.ctx.storage.put(CHAT_RECOVERY_PROGRESS_KEY, current + 1);
}

/** Sweep recovery incidents that have been inactive past the TTL. */
private async _sweepStaleChatRecoveryIncidents(now: number): Promise<void> {
const entries = await this.ctx.storage.list<ChatRecoveryIncident>({
Expand Down Expand Up @@ -3060,7 +3085,7 @@ export class AIChatAgent<
// attempt saw) as environmental and reset the budget, while a turn that
// never advances still exhausts at `maxAttempts`.
const prevProgress = existing?.progress ?? 0;
const currentProgress = this._chatRecoveryProgressMarker();
const currentProgress = await this._chatRecoveryProgressMarker();
const madeProgress = existing != null && currentProgress > prevProgress;
const windowExceeded =
existing != null &&
Expand Down Expand Up @@ -3276,7 +3301,7 @@ export class AIChatAgent<
this._resumableStream.activeStreamId === streamId;

if (options.persist !== false && streamStillActive) {
this._persistOrphanedStream(streamId);
await this._persistOrphanedStream(streamId);
}

if (streamStillActive) {
Expand Down
37 changes: 33 additions & 4 deletions packages/ai-chat/src/tests/durable-chat-recovery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ interface ChatRecoveryTestStub {
}): Promise<void>;
getChatRecoveryIncidentsForTest(): Promise<unknown[]>;
addAssistantMessageForTest(id: string): Promise<void>;
bumpRecoveryProgressForTest(): Promise<void>;
dropAssistantMessagesForTest(): Promise<void>;
setRecoveryShouldThrowForTest(shouldThrow: boolean): Promise<void>;
enableThrowingOnExhaustedForTest(
maxAttempts: number,
Expand Down Expand Up @@ -263,9 +265,9 @@ describe("onChatRecovery", () => {
expect((await agentStub.beginIncidentForTest(input)).attempt).toBe(1);
expect((await agentStub.beginIncidentForTest(input)).attempt).toBe(2);

// Forward progress (an assistant message persisted, as `_persistOrphanedStream`
// does after a partial) resets the budget — this is the deploy-churn fix.
await agentStub.addAssistantMessageForTest("asst-1");
// Forward progress (the durable counter advances, as `_persistOrphanedStream`
// does after materializing a partial) resets the budget — the deploy-churn fix.
await agentStub.bumpRecoveryProgressForTest();
const afterProgress = await agentStub.beginIncidentForTest(input);
expect(afterProgress.attempt).toBe(1);
expect(afterProgress.exhausted).toBe(false);
Expand All @@ -277,6 +279,33 @@ describe("onChatRecovery", () => {
expect(exhausted.exhausted).toBe(true);
});

it("detects forward progress even after compaction collapses the transcript (#1628)", async () => {
const room = crypto.randomUUID();
const agentStub = await getTestAgent(room);
await agentStub.setChatRecoveryConfigForTest({ maxAttempts: 2 });

const input = {
requestId: "req-compact",
recoveryRootRequestId: "req-compact",
latestUserMessageId: "u1",
recoveryKind: "continue" as const
};

// First detection opens the incident.
expect((await agentStub.beginIncidentForTest(input)).attempt).toBe(1);

// The turn advances (a partial is materialized) AND compaction then
// collapses every assistant message out of the live transcript. The old
// message-count marker would now read FEWER messages than the previous
// attempt and miss the progress; the durable counter is immune.
await agentStub.bumpRecoveryProgressForTest();
await agentStub.dropAssistantMessagesForTest();

const afterProgress = await agentStub.beginIncidentForTest(input);
expect(afterProgress.attempt).toBe(1);
expect(afterProgress.exhausted).toBe(false);
});

it("exhausts via the wall-clock window even while making progress", async () => {
const room = crypto.randomUUID();
const agentStub = await getTestAgent(room);
Expand All @@ -295,7 +324,7 @@ describe("onChatRecovery", () => {
});

// Even with fresh progress, the wall-clock ceiling terminalizes it.
await agentStub.addAssistantMessageForTest("asst-old");
await agentStub.bumpRecoveryProgressForTest();
const next = await agentStub.beginIncidentForTest({
requestId: "req-old-2",
recoveryRootRequestId: "req-old",
Expand Down
18 changes: 18 additions & 0 deletions packages/ai-chat/src/tests/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,24 @@ export class ChatRecoveryTestAgent extends AIChatAgent<Env> {
await this.persistMessages(this.messages);
}

/** Simulate recovery forward progress: advance the durable progress counter
* exactly as `_persistOrphanedStream` does when it materializes a non-empty
* partial. The recovery budget keys off this counter (not the live message
* count), so this is how a test marks "the turn advanced". */
async bumpRecoveryProgressForTest(): Promise<void> {
const self = this as unknown as {
_bumpChatRecoveryProgress(): Promise<void>;
};
await self._bumpChatRecoveryProgress();
}

/** Simulate compaction collapsing the transcript by dropping all assistant
* messages from the live cache. Used to prove the recovery progress signal
* is compaction-immune (#1628). */
async dropAssistantMessagesForTest(): Promise<void> {
this.messages = this.messages.filter((m) => m.role !== "assistant");
}

getPersistedMessages(): ChatMessage[] {
return (
this.sql`select * from cf_ai_chat_agent_messages order by created_at` ||
Expand Down
21 changes: 21 additions & 0 deletions packages/think/src/tests/agents/think-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3406,6 +3406,27 @@ export class ThinkRecoveryTestAgent extends Think {
];
}

/** Simulate recovery forward progress: advance the durable progress counter
* exactly as `_persistOrphanedStream` does when it materializes a non-empty
* partial. The recovery budget keys off this counter (not the live message
* count), so this is how a test marks "the turn advanced". */
async bumpRecoveryProgressForTest(): Promise<void> {
const self = this as unknown as {
_bumpChatRecoveryProgress(): Promise<void>;
};
await self._bumpChatRecoveryProgress();
}

/** Simulate compaction collapsing the transcript by dropping all assistant
* messages from the live cache. Used to prove the recovery progress signal
* is compaction-immune (#1628). */
async dropAssistantMessagesForTest(): Promise<void> {
const self = this as unknown as { _cachedMessages: UIMessage[] };
self._cachedMessages = self._cachedMessages.filter(
(m) => m.role !== "assistant"
);
}

/**
* Stream a couple of text chunks (throttled → buffered) then a settled tool
* result, and report how many chunks are durably persisted (raw SQLite, no
Expand Down
34 changes: 30 additions & 4 deletions packages/think/src/tests/think-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2215,9 +2215,9 @@ describe("Think — onChatRecovery", () => {
expect((await agent.beginIncidentForTest(input)).attempt).toBe(1);
expect((await agent.beginIncidentForTest(input)).attempt).toBe(2);

// Forward progress (an assistant message persisted, as `_persistOrphanedStream`
// does after a partial) resets the budget — this is the deploy-churn fix.
await agent.addAssistantMessageForTest("asst-1");
// Forward progress (the durable counter advances, as `_persistOrphanedStream`
// does after materializing a partial) resets the budget — the deploy-churn fix.
await agent.bumpRecoveryProgressForTest();
const afterProgress = await agent.beginIncidentForTest(input);
expect(afterProgress.attempt).toBe(1);
expect(afterProgress.exhausted).toBe(false);
Expand All @@ -2229,6 +2229,32 @@ describe("Think — onChatRecovery", () => {
expect(exhausted.exhausted).toBe(true);
});

it("detects forward progress even after compaction collapses the transcript (#1628)", async () => {
const agent = await freshRecoveryAgent("recovery-progress-compaction");
await agent.setChatRecoveryConfigForTest({ maxAttempts: 2 });

const input = {
requestId: "req-compact",
recoveryRootRequestId: "req-compact",
latestUserMessageId: "u1",
recoveryKind: "continue" as const
};

// First detection opens the incident.
expect((await agent.beginIncidentForTest(input)).attempt).toBe(1);

// The turn advances (a partial is materialized) AND compaction then
// collapses every assistant message out of the live transcript. The old
// message-count marker would now read FEWER messages than the previous
// attempt and miss the progress; the durable counter is immune.
await agent.bumpRecoveryProgressForTest();
await agent.dropAssistantMessagesForTest();

const afterProgress = await agent.beginIncidentForTest(input);
expect(afterProgress.attempt).toBe(1);
expect(afterProgress.exhausted).toBe(false);
});

it("exhausts via the wall-clock window even while making progress", async () => {
const agent = await freshRecoveryAgent("recovery-window");
await agent.setChatRecoveryConfigForTest({ maxAttempts: 6 });
Expand All @@ -2246,7 +2272,7 @@ describe("Think — onChatRecovery", () => {
});

// Even with fresh progress, the wall-clock ceiling terminalizes it.
await agent.addAssistantMessageForTest("asst-old");
await agent.bumpRecoveryProgressForTest();
const next = await agent.beginIncidentForTest({
requestId: "req-old-2",
recoveryRootRequestId: "req-old",
Expand Down
53 changes: 39 additions & 14 deletions packages/think/src/think.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,16 +604,23 @@ type ChatRecoveryIncident = {
lastAttemptAt: number;
reason?: string;
/**
* High-water mark of a monotonic recovery-progress signal (count of persisted
* assistant messages) observed for this incident. Used to distinguish a turn
* that is making forward progress but keeps getting interrupted by isolate
* resets (deploys) — which should NOT exhaust the budget — from one that
* genuinely fails to advance.
* High-water mark of the durable, monotonic recovery-progress counter (see
* `_chatRecoveryProgressMarker`) observed for this incident. Used to
* distinguish a turn that is making forward progress but keeps getting
* interrupted by isolate resets (deploys) — which should NOT exhaust the
* budget — from one that genuinely fails to advance. Sourced from a persisted
* counter rather than the live transcript so compaction cannot lower it
* (#1628).
*/
progress?: number;
};

const CHAT_RECOVERY_INCIDENT_KEY_PREFIX = "cf:chat-recovery:incident:";
// Durable, monotonic forward-progress counter for recovery budget resets.
// Incremented once per non-empty partial materialized by
// `_persistOrphanedStream`; never recomputed from the (compactable) transcript.
// See `_chatRecoveryProgressMarker`.
const CHAT_RECOVERY_PROGRESS_KEY = "cf:chat-recovery:progress";
const DEFAULT_CHAT_RECOVERY_MAX_ATTEMPTS = 6;
const DEFAULT_CHAT_RECOVERY_STABLE_TIMEOUT_MS = 10_000;
// Delay before retrying a continuation that timed out waiting for stable state.
Expand Down Expand Up @@ -6773,18 +6780,31 @@ export class Think<
}

/**
* Monotonic-ish recovery-progress signal: the number of persisted assistant
* messages. `_persistOrphanedStream` turns each interrupted partial into a
* persisted assistant message (and does not prune it), so this grows whenever
* recovery makes forward progress and never shrinks mid-incident.
* Monotonic forward-progress signal for recovery budget resets.
*
* This used to count assistant messages in `this.messages`, but that is
* recomputed from the live, mutable transcript. Compaction collapses older
* assistant messages into a summary, lowering the count — so a turn that had
* genuinely advanced could read as "no progress" between attempts and exhaust
* its budget prematurely (#1628). Instead we read a durably-persisted counter
* that only ever increments — bumped once per non-empty partial materialized
* by `_persistOrphanedStream`, which is the exact "forward progress" event the
* old message-count tracked — so compaction can never lower it.
*/
private _chatRecoveryProgressMarker(): number {
return this.messages.reduce(
(count, message) => (message.role === "assistant" ? count + 1 : count),
0
private async _chatRecoveryProgressMarker(): Promise<number> {
return (
(await this.ctx.storage.get<number>(CHAT_RECOVERY_PROGRESS_KEY)) ?? 0
);
}

/** Advance the durable recovery-progress counter. Called when a partial
* assistant message is materialized (real forward progress). */
private async _bumpChatRecoveryProgress(): Promise<void> {
const current =
(await this.ctx.storage.get<number>(CHAT_RECOVERY_PROGRESS_KEY)) ?? 0;
await this.ctx.storage.put(CHAT_RECOVERY_PROGRESS_KEY, current + 1);
}

/** Sweep recovery incidents that have been inactive past the TTL. */
private async _sweepStaleChatRecoveryIncidents(now: number): Promise<void> {
const entries = await this.ctx.storage.list<ChatRecoveryIncident>({
Expand Down Expand Up @@ -6829,7 +6849,7 @@ export class Think<
// attempt saw) as environmental and reset the budget, while a turn that
// never advances still exhausts at `maxAttempts`.
const prevProgress = existing?.progress ?? 0;
const currentProgress = this._chatRecoveryProgressMarker();
const currentProgress = await this._chatRecoveryProgressMarker();
const madeProgress = existing != null && currentProgress > prevProgress;
const windowExceeded =
existing != null &&
Expand Down Expand Up @@ -7980,6 +8000,11 @@ export class Think<

if (accumulator.parts.length > 0) {
await this._persistAssistantMessage(accumulator.toMessage());
// Real forward progress: a non-empty partial was materialized. Advance
// the durable, compaction-immune progress counter so a deploy-churned
// turn that keeps producing content resets its recovery budget instead
// of exhausting it (#1628).
await this._bumpChatRecoveryProgress();
this._broadcastMessages();
}
}
Expand Down
Loading