From b30861f21a8481c8878e6a787e53db50417a3ddd Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 31 Oct 2025 16:10:33 +0000 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=A4=96=20fix:=20reset=20todos=20on=20?= =?UTF-8?q?stream=20end=20and=20persist=20correctly=20on=20reload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem 1. **Todos not resetting on stream end**: Todos persisted after stream completion instead of being cleared 2. **Incorrect persistence on reload**: TODOs were reconstructed from history even for completed streams, causing stale todos to reappear 3. **Missing reconnection logic**: No distinction between reloading during an active stream vs after completion ## Solution ### Reset todos on stream end - Modified `cleanupStreamState()` to clear `currentTodos` when stream completes (end/abort/error) - Todos are now truly stream-scoped - cleared immediately when streams end ### Smart reconstruction on reload - Added `hasActiveStream` parameter to `loadHistoricalMessages()` - **WorkspaceStore** checks buffered events for `stream-start` to detect active streams - **Active stream reconnection** (hasActiveStream=true): Reconstruct TODOs from history ✅ - **Completed stream reload** (hasActiveStream=false): Don't reconstruct TODOs ✅ - **agentStatus** always reconstructed (persists across sessions) ✅ ### Simplified UI - Removed filtering logic from `PinnedTodoList` - no longer needed since todos auto-clear - Cleaner component with fewer moving parts ## Implementation **StreamingMessageAggregator.loadHistoricalMessages():** ```typescript loadHistoricalMessages(messages: CmuxMessage[], hasActiveStream: boolean = false): void { // ... add messages to map ... // Reconstruct based on tool type and stream state const shouldReconstructTodos = part.toolName === "todo_write" && hasActiveStream; const shouldReconstructStatus = part.toolName === "status_set"; if (shouldReconstructTodos || shouldReconstructStatus) { this.processToolResult(part.toolName, part.input, part.output); } } ``` **WorkspaceStore.handleChatMessage():** ```typescript // Check if there's an active stream in buffered events const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? []; const hasActiveStream = pendingEvents.some( (event) => "type" in event && event.type === "stream-start" ); aggregator.loadHistoricalMessages(historicalMsgs, hasActiveStream); ``` ## Testing ✅ **Todo reset on stream end** - todos cleared on completion ✅ **Todo reset on stream abort** - todos cleared on interruption ✅ **Active stream reconnection** - todos reconstructed with hasActiveStream=true ✅ **Completed stream reload** - todos NOT reconstructed with hasActiveStream=false ✅ **agentStatus persistence** - always reconstructed regardless of stream state ✅ **User message clears todos** - existing behavior maintained All 146 tests pass ✅ ## Behavior After This PR **During streaming:** - Todos update in real-time as `todo_write` is called - UI shows all todos (pending, in_progress, completed) **On stream end:** - Todos cleared immediately - UI shows nothing (clean slate) **On reload - active stream (reconnection):** - Backend sends buffered `stream-start` event - TODOs reconstructed from historical tool calls ✅ - Stream continues with todos visible **On reload - completed stream:** - No buffered stream events - TODOs NOT reconstructed ✅ - UI remains clean **On new user message:** - Todos cleared for fresh start - Existing behavior unchanged --- _Generated with `cmux`_ --- src/components/PinnedTodoList.tsx | 18 +- src/stores/WorkspaceStore.ts | 9 +- .../StreamingMessageAggregator.test.ts | 242 ++++++++++++++++++ .../messages/StreamingMessageAggregator.ts | 55 +++- 4 files changed, 295 insertions(+), 29 deletions(-) diff --git a/src/components/PinnedTodoList.tsx b/src/components/PinnedTodoList.tsx index 2e06db5fe..df1e8569e 100644 --- a/src/components/PinnedTodoList.tsx +++ b/src/components/PinnedTodoList.tsx @@ -10,13 +10,14 @@ interface PinnedTodoListProps { /** * Pinned TODO list displayed at bottom of chat (before StreamingBarrier). - * Shows current TODOs from active stream only. + * Shows current TODOs from active stream only - automatically cleared when stream ends. * Reuses TodoList component for consistent styling. * * Relies on natural reference stability from MapStore + Aggregator architecture: * - Aggregator.getCurrentTodos() returns direct reference (not a copy) * - Reference only changes when todos are actually modified * - MapStore caches WorkspaceState per version, avoiding unnecessary recomputation + * - Todos are cleared by StreamingMessageAggregator when stream completes */ export const PinnedTodoList: React.FC = ({ workspaceId }) => { const [expanded, setExpanded] = usePersistedState("pinnedTodoExpanded", true); @@ -27,17 +28,8 @@ export const PinnedTodoList: React.FC = ({ workspaceId }) = () => workspaceStore.getWorkspaceState(workspaceId).todos ); - // Get streaming state - const canInterrupt = useSyncExternalStore( - (callback) => workspaceStore.subscribeKey(workspaceId, callback), - () => workspaceStore.getWorkspaceState(workspaceId).canInterrupt - ); - - // When idle (not streaming), only show completed todos for clean summary - // When streaming, show all todos so user can see active work - const displayTodos = canInterrupt ? todos : todos.filter((todo) => todo.status === "completed"); - - if (displayTodos.length === 0) { + // Todos are cleared when stream ends, so if there are todos they're from an active stream + if (todos.length === 0) { return null; } @@ -57,7 +49,7 @@ export const PinnedTodoList: React.FC = ({ workspaceId }) = TODO{expanded ? ":" : ""} - {expanded && } + {expanded && } ); }; diff --git a/src/stores/WorkspaceStore.ts b/src/stores/WorkspaceStore.ts index 1815c86a2..6c899455e 100644 --- a/src/stores/WorkspaceStore.ts +++ b/src/stores/WorkspaceStore.ts @@ -910,9 +910,15 @@ export class WorkspaceStore { const historicalMsgs = this.historicalMessages.get(workspaceId) ?? []; if (isCaughtUpMessage(data)) { + // Check if there's an active stream in buffered events (reconnection scenario) + const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? []; + const hasActiveStream = pendingEvents.some( + (event) => "type" in event && event.type === "stream-start" + ); + // Load historical messages first if (historicalMsgs.length > 0) { - aggregator.loadHistoricalMessages(historicalMsgs); + aggregator.loadHistoricalMessages(historicalMsgs, hasActiveStream); this.historicalMessages.set(workspaceId, []); } @@ -920,7 +926,6 @@ export class WorkspaceStore { this.replayingHistory.add(workspaceId); // Process buffered stream events now that history is loaded - const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? []; for (const event of pendingEvents) { this.processStreamEvent(workspaceId, aggregator, event); } diff --git a/src/utils/messages/StreamingMessageAggregator.test.ts b/src/utils/messages/StreamingMessageAggregator.test.ts index 69252f2d7..21f0af510 100644 --- a/src/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/utils/messages/StreamingMessageAggregator.test.ts @@ -137,4 +137,246 @@ describe("StreamingMessageAggregator", () => { expect(messages1).toBe(messages2); }); }); + + describe("todo lifecycle", () => { + test("should clear todos when stream ends", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + // Start a stream + aggregator.handleStreamStart({ + type: "stream-start", + workspaceId: "test-workspace", + messageId: "msg1", + historySequence: 1, + model: "claude-3-5-sonnet-20241022", + }); + + // Simulate todo_write tool call + aggregator.handleToolCallStart({ + messageId: "msg1", + toolCallId: "tool1", + toolName: "todo_write", + args: { + todos: [ + { content: "Do task 1", status: "in_progress" }, + { content: "Do task 2", status: "pending" }, + ], + }, + tokens: 10, + timestamp: Date.now(), + type: "tool-call-start", + workspaceId: "test-workspace", + }); + + aggregator.handleToolCallEnd({ + type: "tool-call-end", + workspaceId: "test-workspace", + messageId: "msg1", + toolCallId: "tool1", + toolName: "todo_write", + result: { success: true }, + }); + + // Verify todos are set + expect(aggregator.getCurrentTodos()).toHaveLength(2); + expect(aggregator.getCurrentTodos()[0].content).toBe("Do task 1"); + + // End the stream + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId: "test-workspace", + messageId: "msg1", + metadata: { + historySequence: 1, + timestamp: Date.now(), + model: "claude-3-5-sonnet-20241022", + }, + parts: [], + }); + + // Todos should be cleared + expect(aggregator.getCurrentTodos()).toHaveLength(0); + }); + + test("should clear todos when stream aborts", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleStreamStart({ + type: "stream-start", + workspaceId: "test-workspace", + messageId: "msg1", + historySequence: 1, + model: "claude-3-5-sonnet-20241022", + }); + + // Simulate todo_write + aggregator.handleToolCallStart({ + messageId: "msg1", + toolCallId: "tool1", + toolName: "todo_write", + args: { + todos: [{ content: "Task", status: "in_progress" }], + }, + tokens: 10, + timestamp: Date.now(), + type: "tool-call-start", + workspaceId: "test-workspace", + }); + + aggregator.handleToolCallEnd({ + type: "tool-call-end", + workspaceId: "test-workspace", + messageId: "msg1", + toolCallId: "tool1", + toolName: "todo_write", + result: { success: true }, + }); + + expect(aggregator.getCurrentTodos()).toHaveLength(1); + + // Abort the stream + aggregator.handleStreamAbort({ + type: "stream-abort", + workspaceId: "test-workspace", + messageId: "msg1", + metadata: {}, + }); + + // Todos should be cleared + expect(aggregator.getCurrentTodos()).toHaveLength(0); + }); + + test("should reconstruct todos on reload ONLY when reconnecting to active stream", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + const historicalMessage = { + id: "msg1", + role: "assistant" as const, + parts: [ + { + type: "dynamic-tool" as const, + toolCallId: "tool1", + toolName: "todo_write", + state: "output-available" as const, + input: { + todos: [ + { content: "Historical task 1", status: "completed" }, + { content: "Historical task 2", status: "completed" }, + ], + }, + output: { success: true }, + }, + ], + metadata: { + historySequence: 1, + timestamp: Date.now(), + model: "claude-3-5-sonnet-20241022", + }, + }; + + // Scenario 1: Reload with active stream (hasActiveStream = true) + aggregator.loadHistoricalMessages([historicalMessage], true); + expect(aggregator.getCurrentTodos()).toHaveLength(2); + expect(aggregator.getCurrentTodos()[0].content).toBe("Historical task 1"); + + // Reset for next scenario + const aggregator2 = new StreamingMessageAggregator(TEST_CREATED_AT); + + // Scenario 2: Reload without active stream (hasActiveStream = false) + aggregator2.loadHistoricalMessages([historicalMessage], false); + expect(aggregator2.getCurrentTodos()).toHaveLength(0); + }); + + test("should reconstruct agentStatus but NOT todos when no active stream", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + const historicalMessage = { + id: "msg1", + role: "assistant" as const, + parts: [ + { + type: "dynamic-tool" as const, + toolCallId: "tool1", + toolName: "todo_write", + state: "output-available" as const, + input: { + todos: [{ content: "Task 1", status: "completed" }], + }, + output: { success: true }, + }, + { + type: "dynamic-tool" as const, + toolCallId: "tool2", + toolName: "status_set", + state: "output-available" as const, + input: { emoji: "🔧", message: "Working on it" }, + output: { success: true, emoji: "🔧", message: "Working on it" }, + }, + ], + metadata: { + historySequence: 1, + timestamp: Date.now(), + model: "claude-3-5-sonnet-20241022", + }, + }; + + // Load without active stream + aggregator.loadHistoricalMessages([historicalMessage], false); + + // agentStatus should be reconstructed (persists across sessions) + expect(aggregator.getAgentStatus()).toEqual({ emoji: "🔧", message: "Working on it" }); + + // TODOs should NOT be reconstructed (stream-scoped) + expect(aggregator.getCurrentTodos()).toHaveLength(0); + }); + + test("should clear todos when new user message arrives during active stream", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + // Simulate an active stream with todos + aggregator.handleStreamStart({ + type: "stream-start", + workspaceId: "test-workspace", + messageId: "msg1", + historySequence: 1, + model: "claude-3-5-sonnet-20241022", + }); + + aggregator.handleToolCallStart({ + messageId: "msg1", + toolCallId: "tool1", + toolName: "todo_write", + args: { + todos: [{ content: "Task", status: "completed" }], + }, + tokens: 10, + timestamp: Date.now(), + type: "tool-call-start", + workspaceId: "test-workspace", + }); + + aggregator.handleToolCallEnd({ + type: "tool-call-end", + workspaceId: "test-workspace", + messageId: "msg1", + toolCallId: "tool1", + toolName: "todo_write", + result: { success: true }, + }); + + // TODOs should be set + expect(aggregator.getCurrentTodos()).toHaveLength(1); + + // Add new user message (simulating user sending a new message) + aggregator.handleMessage({ + id: "msg2", + role: "user", + parts: [{ type: "text", text: "Hello" }], + metadata: { historySequence: 2, timestamp: Date.now() }, + }); + + // Todos should be cleared when new user message arrives + expect(aggregator.getCurrentTodos()).toHaveLength(0); + }); + }); }); diff --git a/src/utils/messages/StreamingMessageAggregator.ts b/src/utils/messages/StreamingMessageAggregator.ts index 68f2e3340..61c22d934 100644 --- a/src/utils/messages/StreamingMessageAggregator.ts +++ b/src/utils/messages/StreamingMessageAggregator.ts @@ -66,7 +66,9 @@ export class StreamingMessageAggregator { // Delta history for token counting and TPS calculation private deltaHistory = new Map(); - // Current TODO list (updated when todo_write succeeds) + // Current TODO list (updated when todo_write succeeds, cleared on stream end) + // Stream-scoped: automatically reset when stream completes + // On reload: only reconstructed if reconnecting to active stream private currentTodos: TodoItem[] = []; // Current agent status (updated when status_set is called) @@ -172,12 +174,18 @@ export class StreamingMessageAggregator { * Clean up stream-scoped state when stream ends (normally or abnormally). * Called by handleStreamEnd, handleStreamAbort, and handleStreamError. * - * NOTE: Does NOT clear todos or agentStatus - those are cleared when a new - * user message arrives (see handleMessage), ensuring consistent behavior - * whether loading from history or processing live events. + * Clears: + * - Active stream tracking (this.activeStreams) + * - Current TODOs (this.currentTodos) - reconstructed from history on reload + * + * Does NOT clear: + * - agentStatus - persists after stream completion to show last activity */ private cleanupStreamState(messageId: string): void { this.activeStreams.delete(messageId); + // Clear todos when stream ends - they're stream-scoped state + // On reload, todos will be reconstructed from completed tool_write calls in history + this.currentTodos = []; } addMessage(message: CmuxMessage): void { @@ -200,16 +208,35 @@ export class StreamingMessageAggregator { /** * Load historical messages in batch, preserving their historySequence numbers. * This is more efficient than calling addMessage() repeatedly. + * + * @param messages - Historical messages to load + * @param hasActiveStream - Whether there's an active stream in buffered events (for reconnection scenario) */ - loadHistoricalMessages(messages: CmuxMessage[]): void { + loadHistoricalMessages(messages: CmuxMessage[], hasActiveStream: boolean = false): void { + // First, add all messages to the map for (const message of messages) { this.messages.set(message.id, message); + } + + // Then, reconstruct derived state from the most recent assistant message + // TODOs: only if there's an active stream (stream-scoped, only during reconnection) + // agentStatus: always (persists across sessions) + const sortedMessages = [...messages].sort( + (a, b) => (b.metadata?.historySequence ?? 0) - (a.metadata?.historySequence ?? 0) + ); - // Process completed tool calls to reconstruct derived state (todos, agentStatus) - // This ensures state persists across page reloads and workspace switches - if (message.role === "assistant") { - for (const part of message.parts) { - if (isDynamicToolPart(part) && part.state === "output-available") { + // Find the most recent assistant message + const lastAssistantMessage = sortedMessages.find((msg) => msg.role === "assistant"); + + if (lastAssistantMessage) { + // Only process tool results from the most recent assistant message + for (const part of lastAssistantMessage.parts) { + if (isDynamicToolPart(part) && part.state === "output-available") { + // Reconstruct based on tool type and stream state + const shouldReconstructTodos = part.toolName === "todo_write" && hasActiveStream; + const shouldReconstructStatus = part.toolName === "status_set"; + + if (shouldReconstructTodos || shouldReconstructStatus) { this.processToolResult(part.toolName, part.input, part.output); } } @@ -401,7 +428,7 @@ export class StreamingMessageAggregator { } } - // Clean up stream-scoped state (TODOs, active stream tracking) + // Clean up stream-scoped state (active stream tracking, TODOs) this.cleanupStreamState(data.messageId); } else { // Reconnection case: user reconnected after stream completed @@ -422,7 +449,7 @@ export class StreamingMessageAggregator { this.messages.set(data.messageId, message); - // Clean up stream-scoped state (TODOs, active stream tracking) + // Clean up stream-scoped state (active stream tracking, TODOs) this.cleanupStreamState(data.messageId); } this.invalidateCache(); @@ -443,7 +470,7 @@ export class StreamingMessageAggregator { }; } - // Clean up stream-scoped state (TODOs, active stream tracking) + // Clean up stream-scoped state (active stream tracking, TODOs) this.cleanupStreamState(data.messageId); this.invalidateCache(); } @@ -462,7 +489,7 @@ export class StreamingMessageAggregator { message.metadata.errorType = data.errorType; } - // Clean up stream-scoped state (TODOs, active stream tracking) + // Clean up stream-scoped state (active stream tracking, TODOs) this.cleanupStreamState(data.messageId); this.invalidateCache(); } From d97679e298f0f1b6911cd67804c04c2c428a92b4 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 31 Oct 2025 17:29:07 +0000 Subject: [PATCH 2/3] fix: remove type annotation for hasActiveStream parameter --- src/utils/messages/StreamingMessageAggregator.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/utils/messages/StreamingMessageAggregator.ts b/src/utils/messages/StreamingMessageAggregator.ts index 61c22d934..afebdcdf8 100644 --- a/src/utils/messages/StreamingMessageAggregator.ts +++ b/src/utils/messages/StreamingMessageAggregator.ts @@ -208,11 +208,11 @@ export class StreamingMessageAggregator { /** * Load historical messages in batch, preserving their historySequence numbers. * This is more efficient than calling addMessage() repeatedly. - * + * * @param messages - Historical messages to load * @param hasActiveStream - Whether there's an active stream in buffered events (for reconnection scenario) */ - loadHistoricalMessages(messages: CmuxMessage[], hasActiveStream: boolean = false): void { + loadHistoricalMessages(messages: CmuxMessage[], hasActiveStream = false): void { // First, add all messages to the map for (const message of messages) { this.messages.set(message.id, message); @@ -235,7 +235,7 @@ export class StreamingMessageAggregator { // Reconstruct based on tool type and stream state const shouldReconstructTodos = part.toolName === "todo_write" && hasActiveStream; const shouldReconstructStatus = part.toolName === "status_set"; - + if (shouldReconstructTodos || shouldReconstructStatus) { this.processToolResult(part.toolName, part.input, part.output); } From e6c05590a079995d263820fef0a0855cbbd72ed0 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 31 Oct 2025 17:47:21 +0000 Subject: [PATCH 3/3] refactor: centralize tool persistence logic in processToolResult Improved separation of concerns by moving tool-specific behavior into processToolResult: - Added 'context' parameter to processToolResult (streaming vs historical) - loadHistoricalMessages now just passes context, doesn't know about specific tools - processToolResult decides what to do based on tool type and context - Cleaner abstraction: tool behavior is centralized in one place --- .../messages/StreamingMessageAggregator.ts | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/utils/messages/StreamingMessageAggregator.ts b/src/utils/messages/StreamingMessageAggregator.ts index afebdcdf8..ad5eafb02 100644 --- a/src/utils/messages/StreamingMessageAggregator.ts +++ b/src/utils/messages/StreamingMessageAggregator.ts @@ -219,8 +219,9 @@ export class StreamingMessageAggregator { } // Then, reconstruct derived state from the most recent assistant message - // TODOs: only if there's an active stream (stream-scoped, only during reconnection) - // agentStatus: always (persists across sessions) + // Use "streaming" context if there's an active stream (reconnection), otherwise "historical" + const context = hasActiveStream ? "streaming" : "historical"; + const sortedMessages = [...messages].sort( (a, b) => (b.metadata?.historySequence ?? 0) - (a.metadata?.historySequence ?? 0) ); @@ -229,16 +230,11 @@ export class StreamingMessageAggregator { const lastAssistantMessage = sortedMessages.find((msg) => msg.role === "assistant"); if (lastAssistantMessage) { - // Only process tool results from the most recent assistant message + // Process all tool results from the most recent assistant message + // processToolResult will decide what to do based on tool type and context for (const part of lastAssistantMessage.parts) { if (isDynamicToolPart(part) && part.state === "output-available") { - // Reconstruct based on tool type and stream state - const shouldReconstructTodos = part.toolName === "todo_write" && hasActiveStream; - const shouldReconstructStatus = part.toolName === "status_set"; - - if (shouldReconstructTodos || shouldReconstructStatus) { - this.processToolResult(part.toolName, part.input, part.output); - } + this.processToolResult(part.toolName, part.input, part.output, context); } } } @@ -539,10 +535,21 @@ export class StreamingMessageAggregator { * * This is the single source of truth for updating state from tool results, * ensuring consistency whether processing live events or historical messages. + * + * @param toolName - Name of the tool that was called + * @param input - Tool input arguments + * @param output - Tool output result + * @param context - Whether this is from live streaming or historical reload */ - private processToolResult(toolName: string, input: unknown, output: unknown): void { + private processToolResult( + toolName: string, + input: unknown, + output: unknown, + context: "streaming" | "historical" + ): void { // Update TODO state if this was a successful todo_write - if (toolName === "todo_write" && hasSuccessResult(output)) { + // TODOs are stream-scoped: only update during live streaming, not on historical reload + if (toolName === "todo_write" && hasSuccessResult(output) && context === "streaming") { const args = input as { todos: TodoItem[] }; // Only update if todos actually changed (prevents flickering from reference changes) if (!this.todosEqual(this.currentTodos, args.todos)) { @@ -551,6 +558,7 @@ export class StreamingMessageAggregator { } // Update agent status if this was a successful status_set + // agentStatus persists: update both during streaming and on historical reload // Use output instead of input to get the truncated message if (toolName === "status_set" && hasSuccessResult(output)) { const result = output as Extract; @@ -584,7 +592,8 @@ export class StreamingMessageAggregator { (toolPart as DynamicToolPartAvailable).output = data.result; // Process tool result to update derived state (todos, agentStatus, etc.) - this.processToolResult(data.toolName, toolPart.input, data.result); + // This is from a live stream, so use "streaming" context + this.processToolResult(data.toolName, toolPart.input, data.result, "streaming"); } this.invalidateCache(); }