Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions src/components/PinnedTodoList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ interface PinnedTodoListProps {

/**
* Pinned TODO list displayed at bottom of chat (before StreamingBarrier).
* Shows current TODOs from active stream only.
* Shows current TODOs from active stream only - automatically cleared when stream ends.
* Reuses TodoList component for consistent styling.
*
* Relies on natural reference stability from MapStore + Aggregator architecture:
* - Aggregator.getCurrentTodos() returns direct reference (not a copy)
* - Reference only changes when todos are actually modified
* - MapStore caches WorkspaceState per version, avoiding unnecessary recomputation
* - Todos are cleared by StreamingMessageAggregator when stream completes
*/
export const PinnedTodoList: React.FC<PinnedTodoListProps> = ({ workspaceId }) => {
const [expanded, setExpanded] = usePersistedState("pinnedTodoExpanded", true);
Expand All @@ -27,17 +28,8 @@ export const PinnedTodoList: React.FC<PinnedTodoListProps> = ({ workspaceId }) =
() => workspaceStore.getWorkspaceState(workspaceId).todos
);

// Get streaming state
const canInterrupt = useSyncExternalStore(
(callback) => workspaceStore.subscribeKey(workspaceId, callback),
() => workspaceStore.getWorkspaceState(workspaceId).canInterrupt
);

// When idle (not streaming), only show completed todos for clean summary
// When streaming, show all todos so user can see active work
const displayTodos = canInterrupt ? todos : todos.filter((todo) => todo.status === "completed");

if (displayTodos.length === 0) {
// Todos are cleared when stream ends, so if there are todos they're from an active stream
if (todos.length === 0) {
return null;
}

Expand All @@ -57,7 +49,7 @@ export const PinnedTodoList: React.FC<PinnedTodoListProps> = ({ workspaceId }) =
</span>
TODO{expanded ? ":" : ""}
</div>
{expanded && <TodoList todos={displayTodos} />}
{expanded && <TodoList todos={todos} />}
</div>
);
};
9 changes: 7 additions & 2 deletions src/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -910,17 +910,22 @@ export class WorkspaceStore {
const historicalMsgs = this.historicalMessages.get(workspaceId) ?? [];

if (isCaughtUpMessage(data)) {
// Check if there's an active stream in buffered events (reconnection scenario)
const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? [];
const hasActiveStream = pendingEvents.some(
(event) => "type" in event && event.type === "stream-start"
);

// Load historical messages first
if (historicalMsgs.length > 0) {
aggregator.loadHistoricalMessages(historicalMsgs);
aggregator.loadHistoricalMessages(historicalMsgs, hasActiveStream);
this.historicalMessages.set(workspaceId, []);
}

// Mark that we're replaying buffered history (prevents O(N) scheduling)
this.replayingHistory.add(workspaceId);

// Process buffered stream events now that history is loaded
const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? [];
for (const event of pendingEvents) {
this.processStreamEvent(workspaceId, aggregator, event);
}
Expand Down
242 changes: 242 additions & 0 deletions src/utils/messages/StreamingMessageAggregator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,246 @@ describe("StreamingMessageAggregator", () => {
expect(messages1).toBe(messages2);
});
});

describe("todo lifecycle", () => {
test("should clear todos when stream ends", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

// Start a stream
aggregator.handleStreamStart({
type: "stream-start",
workspaceId: "test-workspace",
messageId: "msg1",
historySequence: 1,
model: "claude-3-5-sonnet-20241022",
});

// Simulate todo_write tool call
aggregator.handleToolCallStart({
messageId: "msg1",
toolCallId: "tool1",
toolName: "todo_write",
args: {
todos: [
{ content: "Do task 1", status: "in_progress" },
{ content: "Do task 2", status: "pending" },
],
},
tokens: 10,
timestamp: Date.now(),
type: "tool-call-start",
workspaceId: "test-workspace",
});

aggregator.handleToolCallEnd({
type: "tool-call-end",
workspaceId: "test-workspace",
messageId: "msg1",
toolCallId: "tool1",
toolName: "todo_write",
result: { success: true },
});

// Verify todos are set
expect(aggregator.getCurrentTodos()).toHaveLength(2);
expect(aggregator.getCurrentTodos()[0].content).toBe("Do task 1");

// End the stream
aggregator.handleStreamEnd({
type: "stream-end",
workspaceId: "test-workspace",
messageId: "msg1",
metadata: {
historySequence: 1,
timestamp: Date.now(),
model: "claude-3-5-sonnet-20241022",
},
parts: [],
});

// Todos should be cleared
expect(aggregator.getCurrentTodos()).toHaveLength(0);
});

test("should clear todos when stream aborts", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

aggregator.handleStreamStart({
type: "stream-start",
workspaceId: "test-workspace",
messageId: "msg1",
historySequence: 1,
model: "claude-3-5-sonnet-20241022",
});

// Simulate todo_write
aggregator.handleToolCallStart({
messageId: "msg1",
toolCallId: "tool1",
toolName: "todo_write",
args: {
todos: [{ content: "Task", status: "in_progress" }],
},
tokens: 10,
timestamp: Date.now(),
type: "tool-call-start",
workspaceId: "test-workspace",
});

aggregator.handleToolCallEnd({
type: "tool-call-end",
workspaceId: "test-workspace",
messageId: "msg1",
toolCallId: "tool1",
toolName: "todo_write",
result: { success: true },
});

expect(aggregator.getCurrentTodos()).toHaveLength(1);

// Abort the stream
aggregator.handleStreamAbort({
type: "stream-abort",
workspaceId: "test-workspace",
messageId: "msg1",
metadata: {},
});

// Todos should be cleared
expect(aggregator.getCurrentTodos()).toHaveLength(0);
});

test("should reconstruct todos on reload ONLY when reconnecting to active stream", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

const historicalMessage = {
id: "msg1",
role: "assistant" as const,
parts: [
{
type: "dynamic-tool" as const,
toolCallId: "tool1",
toolName: "todo_write",
state: "output-available" as const,
input: {
todos: [
{ content: "Historical task 1", status: "completed" },
{ content: "Historical task 2", status: "completed" },
],
},
output: { success: true },
},
],
metadata: {
historySequence: 1,
timestamp: Date.now(),
model: "claude-3-5-sonnet-20241022",
},
};

// Scenario 1: Reload with active stream (hasActiveStream = true)
aggregator.loadHistoricalMessages([historicalMessage], true);
expect(aggregator.getCurrentTodos()).toHaveLength(2);
expect(aggregator.getCurrentTodos()[0].content).toBe("Historical task 1");

// Reset for next scenario
const aggregator2 = new StreamingMessageAggregator(TEST_CREATED_AT);

// Scenario 2: Reload without active stream (hasActiveStream = false)
aggregator2.loadHistoricalMessages([historicalMessage], false);
expect(aggregator2.getCurrentTodos()).toHaveLength(0);
});

test("should reconstruct agentStatus but NOT todos when no active stream", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

const historicalMessage = {
id: "msg1",
role: "assistant" as const,
parts: [
{
type: "dynamic-tool" as const,
toolCallId: "tool1",
toolName: "todo_write",
state: "output-available" as const,
input: {
todos: [{ content: "Task 1", status: "completed" }],
},
output: { success: true },
},
{
type: "dynamic-tool" as const,
toolCallId: "tool2",
toolName: "status_set",
state: "output-available" as const,
input: { emoji: "πŸ”§", message: "Working on it" },
output: { success: true, emoji: "πŸ”§", message: "Working on it" },
},
],
metadata: {
historySequence: 1,
timestamp: Date.now(),
model: "claude-3-5-sonnet-20241022",
},
};

// Load without active stream
aggregator.loadHistoricalMessages([historicalMessage], false);

// agentStatus should be reconstructed (persists across sessions)
expect(aggregator.getAgentStatus()).toEqual({ emoji: "πŸ”§", message: "Working on it" });

// TODOs should NOT be reconstructed (stream-scoped)
expect(aggregator.getCurrentTodos()).toHaveLength(0);
});

test("should clear todos when new user message arrives during active stream", () => {
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);

// Simulate an active stream with todos
aggregator.handleStreamStart({
type: "stream-start",
workspaceId: "test-workspace",
messageId: "msg1",
historySequence: 1,
model: "claude-3-5-sonnet-20241022",
});

aggregator.handleToolCallStart({
messageId: "msg1",
toolCallId: "tool1",
toolName: "todo_write",
args: {
todos: [{ content: "Task", status: "completed" }],
},
tokens: 10,
timestamp: Date.now(),
type: "tool-call-start",
workspaceId: "test-workspace",
});

aggregator.handleToolCallEnd({
type: "tool-call-end",
workspaceId: "test-workspace",
messageId: "msg1",
toolCallId: "tool1",
toolName: "todo_write",
result: { success: true },
});

// TODOs should be set
expect(aggregator.getCurrentTodos()).toHaveLength(1);

// Add new user message (simulating user sending a new message)
aggregator.handleMessage({
id: "msg2",
role: "user",
parts: [{ type: "text", text: "Hello" }],
metadata: { historySequence: 2, timestamp: Date.now() },
});

// Todos should be cleared when new user message arrives
expect(aggregator.getCurrentTodos()).toHaveLength(0);
});
});
});
Loading