Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/types/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export interface StreamEndEvent {
providerMetadata?: Record<string, unknown>;
duration?: number;
systemMessageTokens?: number;
historySequence?: number; // Present when loading from history
timestamp?: number; // Present when loading from history
};
// Parts array preserves temporal ordering of reasoning, text, and tool calls
parts: CompletedMessagePart[];
Expand Down
147 changes: 147 additions & 0 deletions src/utils/messages/StreamingMessageAggregator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,150 @@ it("should clear TODOs on reconnection stream-end", () => {
// Verify TODOs were cleared on stream-end (even in reconnection case)
expect(aggregator.getCurrentTodos()).toHaveLength(0);
});

describe("Part-level timestamps", () => {
it("should assign timestamps to text/reasoning parts during streaming", () => {
const aggregator = new StreamingMessageAggregator();
const startTime = Date.now();

// Start a stream
aggregator.handleStreamStart({
type: "stream-start",
workspaceId: "test-ws",
messageId: "msg-1",
model: "claude-3",
historySequence: 1,
});

// Add text deltas
aggregator.handleStreamDelta({
type: "stream-delta",
workspaceId: "test-ws",
messageId: "msg-1",
delta: "First part ",
tokens: 2,
timestamp: startTime,
});

aggregator.handleStreamDelta({
type: "stream-delta",
workspaceId: "test-ws",
messageId: "msg-1",
delta: "second part",
tokens: 2,
timestamp: startTime + 100,
});

// Add reasoning delta
aggregator.handleReasoningDelta({
type: "reasoning-delta",
workspaceId: "test-ws",
messageId: "msg-1",
delta: "thinking...",
tokens: 1,
timestamp: startTime + 200,
});

// End stream
aggregator.handleStreamEnd({
type: "stream-end",
workspaceId: "test-ws",
messageId: "msg-1",
metadata: {
model: "claude-3",
historySequence: 1,
},
parts: [],
});

// Check that parts have timestamps
const messages = aggregator.getAllMessages();
expect(messages).toHaveLength(1);
const msg = messages[0];

// Text parts should have timestamps
const textParts = msg.parts.filter((p) => p.type === "text");
expect(textParts.length).toBeGreaterThan(0);
for (const part of textParts) {
if (part.type === "text") {
expect(part.timestamp).toBeNumber();
}
}

// Reasoning parts should have timestamps
const reasoningParts = msg.parts.filter((p) => p.type === "reasoning");
expect(reasoningParts.length).toBeGreaterThan(0);
for (const part of reasoningParts) {
if (part.type === "reasoning") {
expect(part.timestamp).toBeNumber();
}
}
});

it("should preserve individual part timestamps when displaying", () => {
const aggregator = new StreamingMessageAggregator();
const startTime = 1000;

// Simulate stream-end with pre-timestamped parts
aggregator.handleStreamEnd({
type: "stream-end",
workspaceId: "test-ws",
messageId: "msg-1",
metadata: {
model: "claude-3",
historySequence: 1,
timestamp: startTime, // Message-level timestamp
},
parts: [
{ type: "text", text: "First", timestamp: startTime },
{ type: "text", text: " second", timestamp: startTime + 100 },
{ type: "reasoning", text: "thinking", timestamp: startTime + 200 },
],
});

// Get displayed messages
const displayed = aggregator.getDisplayedMessages();

// Should have merged text parts into one display message and one reasoning message
const assistantMsgs = displayed.filter((m) => m.type === "assistant");
const reasoningMsgs = displayed.filter((m) => m.type === "reasoning");

expect(assistantMsgs).toHaveLength(1);
expect(reasoningMsgs).toHaveLength(1);

// Assistant message should use the timestamp of the first text part
expect(assistantMsgs[0].timestamp).toBe(startTime);

// Reasoning message should use its part's timestamp
expect(reasoningMsgs[0].timestamp).toBe(startTime + 200);
});

it("should use message-level timestamp as fallback when parts don't have timestamps", () => {
const aggregator = new StreamingMessageAggregator();
const messageTimestamp = 5000;

// Load a message without part-level timestamps (e.g., from old history)
aggregator.handleStreamEnd({
type: "stream-end",
workspaceId: "test-ws",
messageId: "msg-1",
metadata: {
model: "claude-3",
historySequence: 1,
timestamp: messageTimestamp,
},
parts: [
{ type: "text", text: "No timestamp" },
{ type: "reasoning", text: "thinking" },
],
});

const displayed = aggregator.getDisplayedMessages();
const assistantMsgs = displayed.filter((m) => m.type === "assistant");
const reasoningMsgs = displayed.filter((m) => m.type === "reasoning");

// Both should fall back to message-level timestamp
expect(assistantMsgs[0].timestamp).toBe(messageTimestamp);
expect(reasoningMsgs[0].timestamp).toBe(messageTimestamp);
});
});
14 changes: 9 additions & 5 deletions src/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ export class StreamingMessageAggregator {
message.parts.push({
type: "text",
text: data.delta,
timestamp: data.timestamp,
});

// Track delta for token counting and TPS calculation
Expand Down Expand Up @@ -315,7 +316,7 @@ export class StreamingMessageAggregator {
role: "assistant",
metadata: {
...data.metadata,
timestamp: Date.now(),
timestamp: data.metadata.timestamp ?? Date.now(),
},
parts: data.parts,
};
Expand Down Expand Up @@ -452,6 +453,7 @@ export class StreamingMessageAggregator {
message.parts.push({
type: "reasoning",
text: data.delta,
timestamp: data.timestamp,
});

// Track delta for token counting and TPS calculation
Expand Down Expand Up @@ -575,16 +577,18 @@ export class StreamingMessageAggregator {

// Try to merge with last part if same type
if (lastMerged?.type === "text" && part.type === "text") {
// Merge text parts
// Merge text parts, preserving the first timestamp
mergedParts[mergedParts.length - 1] = {
type: "text",
text: lastMerged.text + part.text,
timestamp: lastMerged.timestamp ?? part.timestamp,
};
} else if (lastMerged?.type === "reasoning" && part.type === "reasoning") {
// Merge reasoning parts
// Merge reasoning parts, preserving the first timestamp
mergedParts[mergedParts.length - 1] = {
type: "reasoning",
text: lastMerged.text + part.text,
timestamp: lastMerged.timestamp ?? part.timestamp,
};
} else {
// Different type or tool part - add new part
Expand Down Expand Up @@ -624,7 +628,7 @@ export class StreamingMessageAggregator {
isStreaming,
isPartial: message.metadata?.partial ?? false,
isLastPartOfMessage: isLastPart,
timestamp: baseTimestamp,
timestamp: part.timestamp ?? baseTimestamp,
});
} else if (part.type === "text" && part.text) {
// Skip empty text parts
Expand All @@ -640,7 +644,7 @@ export class StreamingMessageAggregator {
isLastPartOfMessage: isLastPart,
isCompacted: message.metadata?.compacted ?? false,
model: message.metadata?.model,
timestamp: baseTimestamp,
timestamp: part.timestamp ?? baseTimestamp,
});
} else if (isDynamicToolPart(part)) {
const status =
Expand Down