Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions src/services/ipcMain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
StreamStartEvent,
StreamDeltaEvent,
StreamEndEvent,
StreamAbortEvent,
ToolCallStartEvent,
ToolCallDeltaEvent,
ToolCallEndEvent,
Expand Down Expand Up @@ -1156,19 +1157,12 @@ export class IpcMain {
});

// Handle stream abort events
this.aiService.on(
"stream-abort",
(data: { type: string; workspaceId: string; messageId?: string }) => {
if (this.mainWindow) {
// Send the stream-abort event to frontend
this.mainWindow.webContents.send(getChatChannel(data.workspaceId), {
type: "stream-abort",
workspaceId: data.workspaceId,
messageId: data.messageId,
});
}
this.aiService.on("stream-abort", (data: StreamAbortEvent) => {
if (this.mainWindow) {
// Forward complete abort event including metadata (usage, duration)
this.mainWindow.webContents.send(getChatChannel(data.workspaceId), data);
}
);
});
}

/**
Expand Down
40 changes: 36 additions & 4 deletions src/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
APICallError,
RetryError,
} from "ai";
import type { LanguageModelV2Usage } from "@ai-sdk/provider";
import type { Result } from "@/types/result";
import { Ok, Err } from "@/types/result";
import { log } from "./log";
Expand Down Expand Up @@ -219,6 +220,33 @@ export class StreamManager extends EventEmitter {
return randomUUID() as StreamToken;
}

/**
* Extracts usage and duration metadata from stream result.
*
* Usage is only available after stream completes naturally.
* On abort, the usage promise may hang - we use a timeout to return quickly.
*/
private async getStreamMetadata(
streamInfo: WorkspaceStreamInfo,
timeoutMs = 1000
): Promise<{ usage?: LanguageModelV2Usage; duration: number }> {
let usage = undefined;
try {
// Race usage retrieval against timeout to prevent hanging on abort
usage = await Promise.race([
streamInfo.streamResult.usage,
new Promise<undefined>((resolve) => setTimeout(() => resolve(undefined), timeoutMs)),
]);
} catch (error) {
log.debug("Could not retrieve usage:", error);
}

return {
usage,
duration: Date.now() - streamInfo.startTime,
};
}

/**
* Safely cancels an existing stream with proper cleanup
*
Expand All @@ -243,11 +271,15 @@ export class StreamManager extends EventEmitter {
// while a new stream starts (e.g., old stream writing to partial.json)
await streamInfo.processingPromise;

// Emit abort event
// Get usage and duration metadata (usage may be undefined if aborted early)
const { usage, duration } = await this.getStreamMetadata(streamInfo);

// Emit abort event with usage if available
this.emit("stream-abort", {
type: "stream-abort",
workspaceId: workspaceId as string,
messageId: streamInfo.messageId,
metadata: { usage, duration },
});

// Clean up immediately
Expand Down Expand Up @@ -580,8 +612,8 @@ export class StreamManager extends EventEmitter {

// Check if stream completed successfully
if (!streamInfo.abortController.signal.aborted) {
// Get usage and provider metadata from stream result
const usage = await streamInfo.streamResult.usage;
// Get usage, duration, and provider metadata from stream result
const { usage, duration } = await this.getStreamMetadata(streamInfo);
const providerMetadata = await streamInfo.streamResult.providerMetadata;

// Emit stream end event with parts preserved in temporal order
Expand All @@ -594,7 +626,7 @@ export class StreamManager extends EventEmitter {
model: streamInfo.model,
usage, // AI SDK normalized usage
providerMetadata, // Raw provider metadata
duration: Date.now() - streamInfo.startTime,
duration,
},
parts: streamInfo.parts, // Parts array with temporal ordering (includes reasoning)
};
Expand Down
5 changes: 5 additions & 0 deletions src/types/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export interface StreamAbortEvent {
type: "stream-abort";
workspaceId: string;
messageId: string;
// Metadata may contain usage if abort occurred after stream completed processing
metadata?: {
usage?: LanguageModelV2Usage;
duration?: number;
};
}

export interface ErrorEvent {
Expand Down
8 changes: 6 additions & 2 deletions src/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,14 @@ export class StreamingMessageAggregator {
const activeStream = this.activeStreams.get(data.messageId);

if (activeStream) {
// Mark the message as interrupted
// Mark the message as interrupted and merge metadata (consistent with handleStreamEnd)
const message = this.messages.get(data.messageId);
if (message?.metadata) {
message.metadata.partial = true;
message.metadata = {
...message.metadata,
partial: true,
...data.metadata, // Spread abort metadata (usage, duration)
};
}

// Clean up active stream - direct delete by messageId
Expand Down
60 changes: 60 additions & 0 deletions tests/ipcMain/sendMessage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,66 @@ describeIntegration("IpcMain sendMessage integration tests", () => {
15000
);

test.concurrent(
"should include usage data in stream-abort events",
async () => {
// Setup test environment
const { env, workspaceId, cleanup } = await setupWorkspace(provider);
try {
// Start a stream that will generate some tokens
const message = "Write a haiku about coding";
void sendMessageWithModel(env.mockIpcRenderer, workspaceId, message, provider, model);

// Wait for stream to start and get some deltas
const collector = createEventCollector(env.sentEvents, workspaceId);
await collector.waitForEvent("stream-start", 5000);

// Wait a bit for some content to be generated
await new Promise((resolve) => setTimeout(resolve, 1000));

// Interrupt the stream with an empty message
const interruptResult = await sendMessageWithModel(
env.mockIpcRenderer,
workspaceId,
"",
provider,
model
);

expect(interruptResult.success).toBe(true);

// Collect all events and find abort event
await waitFor(() => {
collector.collect();
return collector.getEvents().some((e) => "type" in e && e.type === "stream-abort");
}, 5000);

const abortEvent = collector
.getEvents()
.find((e) => "type" in e && e.type === "stream-abort");
expect(abortEvent).toBeDefined();

// Verify abort event structure
if (abortEvent && "metadata" in abortEvent) {
// Metadata should exist with duration
expect(abortEvent.metadata).toBeDefined();
expect(abortEvent.metadata?.duration).toBeGreaterThan(0);

// Usage MAY be present depending on abort timing:
// - Early abort: usage is undefined (stream didn't complete)
// - Late abort: usage available (stream finished before UI processed it)
if (abortEvent.metadata?.usage) {
expect(abortEvent.metadata.usage.inputTokens).toBeGreaterThan(0);
expect(abortEvent.metadata.usage.outputTokens).toBeGreaterThanOrEqual(0);
}
}
} finally {
await cleanup();
}
},
15000
);

test.concurrent(
"should handle reconnection during active stream",
async () => {
Expand Down