diff --git a/src/types/stream.ts b/src/types/stream.ts index e615c2cca..f3990d1dc 100644 --- a/src/types/stream.ts +++ b/src/types/stream.ts @@ -40,6 +40,8 @@ export interface StreamEndEvent { providerMetadata?: Record; duration?: number; systemMessageTokens?: number; + historySequence?: number; // Present when loading from history + timestamp?: number; // Present when loading from history }; // Parts array preserves temporal ordering of reasoning, text, and tool calls parts: CompletedMessagePart[]; diff --git a/src/utils/messages/StreamingMessageAggregator.test.ts b/src/utils/messages/StreamingMessageAggregator.test.ts index eb3127791..7eb9a1afe 100644 --- a/src/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/utils/messages/StreamingMessageAggregator.test.ts @@ -543,3 +543,150 @@ it("should clear TODOs on reconnection stream-end", () => { // Verify TODOs were cleared on stream-end (even in reconnection case) expect(aggregator.getCurrentTodos()).toHaveLength(0); }); + +describe("Part-level timestamps", () => { + it("should assign timestamps to text/reasoning parts during streaming", () => { + const aggregator = new StreamingMessageAggregator(); + const startTime = Date.now(); + + // Start a stream + aggregator.handleStreamStart({ + type: "stream-start", + workspaceId: "test-ws", + messageId: "msg-1", + model: "claude-3", + historySequence: 1, + }); + + // Add text deltas + aggregator.handleStreamDelta({ + type: "stream-delta", + workspaceId: "test-ws", + messageId: "msg-1", + delta: "First part ", + tokens: 2, + timestamp: startTime, + }); + + aggregator.handleStreamDelta({ + type: "stream-delta", + workspaceId: "test-ws", + messageId: "msg-1", + delta: "second part", + tokens: 2, + timestamp: startTime + 100, + }); + + // Add reasoning delta + aggregator.handleReasoningDelta({ + type: "reasoning-delta", + workspaceId: "test-ws", + messageId: "msg-1", + delta: "thinking...", + tokens: 1, + timestamp: startTime + 200, + }); + + // End stream + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId: "test-ws", + messageId: "msg-1", + metadata: { + model: "claude-3", + historySequence: 1, + }, + parts: [], + }); + + // Check that parts have timestamps + const messages = aggregator.getAllMessages(); + expect(messages).toHaveLength(1); + const msg = messages[0]; + + // Text parts should have timestamps + const textParts = msg.parts.filter((p) => p.type === "text"); + expect(textParts.length).toBeGreaterThan(0); + for (const part of textParts) { + if (part.type === "text") { + expect(part.timestamp).toBeNumber(); + } + } + + // Reasoning parts should have timestamps + const reasoningParts = msg.parts.filter((p) => p.type === "reasoning"); + expect(reasoningParts.length).toBeGreaterThan(0); + for (const part of reasoningParts) { + if (part.type === "reasoning") { + expect(part.timestamp).toBeNumber(); + } + } + }); + + it("should preserve individual part timestamps when displaying", () => { + const aggregator = new StreamingMessageAggregator(); + const startTime = 1000; + + // Simulate stream-end with pre-timestamped parts + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId: "test-ws", + messageId: "msg-1", + metadata: { + model: "claude-3", + historySequence: 1, + timestamp: startTime, // Message-level timestamp + }, + parts: [ + { type: "text", text: "First", timestamp: startTime }, + { type: "text", text: " second", timestamp: startTime + 100 }, + { type: "reasoning", text: "thinking", timestamp: startTime + 200 }, + ], + }); + + // Get displayed messages + const displayed = aggregator.getDisplayedMessages(); + + // Should have merged text parts into one display message and one reasoning message + const assistantMsgs = displayed.filter((m) => m.type === "assistant"); + const reasoningMsgs = displayed.filter((m) => m.type === "reasoning"); + + expect(assistantMsgs).toHaveLength(1); + expect(reasoningMsgs).toHaveLength(1); + + // Assistant message should use the timestamp of the first text part + expect(assistantMsgs[0].timestamp).toBe(startTime); + + // Reasoning message should use its part's timestamp + expect(reasoningMsgs[0].timestamp).toBe(startTime + 200); + }); + + it("should use message-level timestamp as fallback when parts don't have timestamps", () => { + const aggregator = new StreamingMessageAggregator(); + const messageTimestamp = 5000; + + // Load a message without part-level timestamps (e.g., from old history) + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId: "test-ws", + messageId: "msg-1", + metadata: { + model: "claude-3", + historySequence: 1, + timestamp: messageTimestamp, + }, + parts: [ + { type: "text", text: "No timestamp" }, + { type: "reasoning", text: "thinking" }, + ], + }); + + const displayed = aggregator.getDisplayedMessages(); + const assistantMsgs = displayed.filter((m) => m.type === "assistant"); + const reasoningMsgs = displayed.filter((m) => m.type === "reasoning"); + + // Both should fall back to message-level timestamp + expect(assistantMsgs[0].timestamp).toBe(messageTimestamp); + expect(reasoningMsgs[0].timestamp).toBe(messageTimestamp); + }); +}); diff --git a/src/utils/messages/StreamingMessageAggregator.ts b/src/utils/messages/StreamingMessageAggregator.ts index 29b566b93..40b6d1d24 100644 --- a/src/utils/messages/StreamingMessageAggregator.ts +++ b/src/utils/messages/StreamingMessageAggregator.ts @@ -256,6 +256,7 @@ export class StreamingMessageAggregator { message.parts.push({ type: "text", text: data.delta, + timestamp: data.timestamp, }); // Track delta for token counting and TPS calculation @@ -315,7 +316,7 @@ export class StreamingMessageAggregator { role: "assistant", metadata: { ...data.metadata, - timestamp: Date.now(), + timestamp: data.metadata.timestamp ?? Date.now(), }, parts: data.parts, }; @@ -452,6 +453,7 @@ export class StreamingMessageAggregator { message.parts.push({ type: "reasoning", text: data.delta, + timestamp: data.timestamp, }); // Track delta for token counting and TPS calculation @@ -575,16 +577,18 @@ export class StreamingMessageAggregator { // Try to merge with last part if same type if (lastMerged?.type === "text" && part.type === "text") { - // Merge text parts + // Merge text parts, preserving the first timestamp mergedParts[mergedParts.length - 1] = { type: "text", text: lastMerged.text + part.text, + timestamp: lastMerged.timestamp ?? part.timestamp, }; } else if (lastMerged?.type === "reasoning" && part.type === "reasoning") { - // Merge reasoning parts + // Merge reasoning parts, preserving the first timestamp mergedParts[mergedParts.length - 1] = { type: "reasoning", text: lastMerged.text + part.text, + timestamp: lastMerged.timestamp ?? part.timestamp, }; } else { // Different type or tool part - add new part @@ -624,7 +628,7 @@ export class StreamingMessageAggregator { isStreaming, isPartial: message.metadata?.partial ?? false, isLastPartOfMessage: isLastPart, - timestamp: baseTimestamp, + timestamp: part.timestamp ?? baseTimestamp, }); } else if (part.type === "text" && part.text) { // Skip empty text parts @@ -640,7 +644,7 @@ export class StreamingMessageAggregator { isLastPartOfMessage: isLastPart, isCompacted: message.metadata?.compacted ?? false, model: message.metadata?.model, - timestamp: baseTimestamp, + timestamp: part.timestamp ?? baseTimestamp, }); } else if (isDynamicToolPart(part)) { const status =