From 57d52b2091c4a84a8cc162eaaa51a1e7e4e5c7b1 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 18 Oct 2025 13:10:29 -0500 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=A4=96=20Fix=20assistant=20message=20?= =?UTF-8?q?part=20timestamps=20to=20preserve=20individual=20delta=20times?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this fix, all parts of an assistant message (text, reasoning, tool calls) were using the message-level timestamp, causing all parts to show the same time before reload, and then all show a different (wrong) time after reload. Changes: - Add timestamp field to text/reasoning parts during streaming (matching tool parts) - Preserve first timestamp when merging adjacent parts of the same type - Use part.timestamp ?? baseTimestamp when displaying (consistent with tool parts) - Preserve metadata.timestamp in reconnection case instead of overwriting with Date.now() Result: Each part now shows its actual creation time, consistent across reloads. Generated with `cmux` --- src/types/stream.ts | 2 + .../StreamingMessageAggregator.test.ts | 148 ++++++++++++++++++ .../messages/StreamingMessageAggregator.ts | 14 +- 3 files changed, 159 insertions(+), 5 deletions(-) diff --git a/src/types/stream.ts b/src/types/stream.ts index e615c2cca..f3990d1dc 100644 --- a/src/types/stream.ts +++ b/src/types/stream.ts @@ -40,6 +40,8 @@ export interface StreamEndEvent { providerMetadata?: Record; duration?: number; systemMessageTokens?: number; + historySequence?: number; // Present when loading from history + timestamp?: number; // Present when loading from history }; // Parts array preserves temporal ordering of reasoning, text, and tool calls parts: CompletedMessagePart[]; diff --git a/src/utils/messages/StreamingMessageAggregator.test.ts b/src/utils/messages/StreamingMessageAggregator.test.ts index eb3127791..0327e2e3c 100644 --- a/src/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/utils/messages/StreamingMessageAggregator.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect } from "bun:test"; import { StreamingMessageAggregator } from "./StreamingMessageAggregator"; +import type { DisplayedMessage } from "@/types/message"; import type { StreamEndEvent } from "@/types/stream"; import type { DynamicToolPart } from "@/types/toolParts"; @@ -543,3 +544,150 @@ it("should clear TODOs on reconnection stream-end", () => { // Verify TODOs were cleared on stream-end (even in reconnection case) expect(aggregator.getCurrentTodos()).toHaveLength(0); }); + +describe("Part-level timestamps", () => { + it("should assign timestamps to text/reasoning parts during streaming", () => { + const aggregator = new StreamingMessageAggregator(); + const startTime = Date.now(); + + // Start a stream + aggregator.handleStreamStart({ + type: "stream-start", + workspaceId: "test-ws", + messageId: "msg-1", + model: "claude-3", + historySequence: 1, + }); + + // Add text deltas + aggregator.handleStreamDelta({ + type: "stream-delta", + workspaceId: "test-ws", + messageId: "msg-1", + delta: "First part ", + tokens: 2, + timestamp: startTime, + }); + + aggregator.handleStreamDelta({ + type: "stream-delta", + workspaceId: "test-ws", + messageId: "msg-1", + delta: "second part", + tokens: 2, + timestamp: startTime + 100, + }); + + // Add reasoning delta + aggregator.handleReasoningDelta({ + type: "reasoning-delta", + workspaceId: "test-ws", + messageId: "msg-1", + delta: "thinking...", + tokens: 1, + timestamp: startTime + 200, + }); + + // End stream + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId: "test-ws", + messageId: "msg-1", + metadata: { + model: "claude-3", + historySequence: 1, + }, + parts: [], + }); + + // Check that parts have timestamps + const messages = aggregator.getAllMessages(); + expect(messages).toHaveLength(1); + const msg = messages[0]; + + // Text parts should have timestamps + const textParts = msg.parts.filter((p) => p.type === "text"); + expect(textParts.length).toBeGreaterThan(0); + for (const part of textParts) { + if (part.type === "text") { + expect(part.timestamp).toBeNumber(); + } + } + + // Reasoning parts should have timestamps + const reasoningParts = msg.parts.filter((p) => p.type === "reasoning"); + expect(reasoningParts.length).toBeGreaterThan(0); + for (const part of reasoningParts) { + if (part.type === "reasoning") { + expect(part.timestamp).toBeNumber(); + } + } + }); + + it("should preserve individual part timestamps when displaying", () => { + const aggregator = new StreamingMessageAggregator(); + const startTime = 1000; + + // Simulate stream-end with pre-timestamped parts + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId: "test-ws", + messageId: "msg-1", + metadata: { + model: "claude-3", + historySequence: 1, + timestamp: startTime, // Message-level timestamp + }, + parts: [ + { type: "text", text: "First", timestamp: startTime }, + { type: "text", text: " second", timestamp: startTime + 100 }, + { type: "reasoning", text: "thinking", timestamp: startTime + 200 }, + ], + }); + + // Get displayed messages + const displayed = aggregator.getDisplayedMessages(); + + // Should have merged text parts into one display message and one reasoning message + const assistantMsgs = displayed.filter((m) => m.type === "assistant"); + const reasoningMsgs = displayed.filter((m) => m.type === "reasoning"); + + expect(assistantMsgs).toHaveLength(1); + expect(reasoningMsgs).toHaveLength(1); + + // Assistant message should use the timestamp of the first text part + expect(assistantMsgs[0].timestamp).toBe(startTime); + + // Reasoning message should use its part's timestamp + expect(reasoningMsgs[0].timestamp).toBe(startTime + 200); + }); + + it("should use message-level timestamp as fallback when parts don't have timestamps", () => { + const aggregator = new StreamingMessageAggregator(); + const messageTimestamp = 5000; + + // Load a message without part-level timestamps (e.g., from old history) + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId: "test-ws", + messageId: "msg-1", + metadata: { + model: "claude-3", + historySequence: 1, + timestamp: messageTimestamp, + }, + parts: [ + { type: "text", text: "No timestamp" }, + { type: "reasoning", text: "thinking" }, + ], + }); + + const displayed = aggregator.getDisplayedMessages(); + const assistantMsgs = displayed.filter((m) => m.type === "assistant"); + const reasoningMsgs = displayed.filter((m) => m.type === "reasoning"); + + // Both should fall back to message-level timestamp + expect(assistantMsgs[0].timestamp).toBe(messageTimestamp); + expect(reasoningMsgs[0].timestamp).toBe(messageTimestamp); + }); +}); diff --git a/src/utils/messages/StreamingMessageAggregator.ts b/src/utils/messages/StreamingMessageAggregator.ts index 29b566b93..40b6d1d24 100644 --- a/src/utils/messages/StreamingMessageAggregator.ts +++ b/src/utils/messages/StreamingMessageAggregator.ts @@ -256,6 +256,7 @@ export class StreamingMessageAggregator { message.parts.push({ type: "text", text: data.delta, + timestamp: data.timestamp, }); // Track delta for token counting and TPS calculation @@ -315,7 +316,7 @@ export class StreamingMessageAggregator { role: "assistant", metadata: { ...data.metadata, - timestamp: Date.now(), + timestamp: data.metadata.timestamp ?? Date.now(), }, parts: data.parts, }; @@ -452,6 +453,7 @@ export class StreamingMessageAggregator { message.parts.push({ type: "reasoning", text: data.delta, + timestamp: data.timestamp, }); // Track delta for token counting and TPS calculation @@ -575,16 +577,18 @@ export class StreamingMessageAggregator { // Try to merge with last part if same type if (lastMerged?.type === "text" && part.type === "text") { - // Merge text parts + // Merge text parts, preserving the first timestamp mergedParts[mergedParts.length - 1] = { type: "text", text: lastMerged.text + part.text, + timestamp: lastMerged.timestamp ?? part.timestamp, }; } else if (lastMerged?.type === "reasoning" && part.type === "reasoning") { - // Merge reasoning parts + // Merge reasoning parts, preserving the first timestamp mergedParts[mergedParts.length - 1] = { type: "reasoning", text: lastMerged.text + part.text, + timestamp: lastMerged.timestamp ?? part.timestamp, }; } else { // Different type or tool part - add new part @@ -624,7 +628,7 @@ export class StreamingMessageAggregator { isStreaming, isPartial: message.metadata?.partial ?? false, isLastPartOfMessage: isLastPart, - timestamp: baseTimestamp, + timestamp: part.timestamp ?? baseTimestamp, }); } else if (part.type === "text" && part.text) { // Skip empty text parts @@ -640,7 +644,7 @@ export class StreamingMessageAggregator { isLastPartOfMessage: isLastPart, isCompacted: message.metadata?.compacted ?? false, model: message.metadata?.model, - timestamp: baseTimestamp, + timestamp: part.timestamp ?? baseTimestamp, }); } else if (isDynamicToolPart(part)) { const status = From 3c1c6aed204e4301f7eb309eb5044ae3cf9ab907 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 18 Oct 2025 13:12:45 -0500 Subject: [PATCH 2/2] Remove unused DisplayedMessage import --- src/utils/messages/StreamingMessageAggregator.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils/messages/StreamingMessageAggregator.test.ts b/src/utils/messages/StreamingMessageAggregator.test.ts index 0327e2e3c..7eb9a1afe 100644 --- a/src/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/utils/messages/StreamingMessageAggregator.test.ts @@ -1,6 +1,5 @@ import { describe, it, expect } from "bun:test"; import { StreamingMessageAggregator } from "./StreamingMessageAggregator"; -import type { DisplayedMessage } from "@/types/message"; import type { StreamEndEvent } from "@/types/stream"; import type { DynamicToolPart } from "@/types/toolParts";