From e090d22467a4bc42f382f1ff3781aa25fee4cf49 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 10 Oct 2025 14:01:10 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20Track=20token=20usage=20in=20str?= =?UTF-8?q?eam=20abort=20events?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Captures usage data when streams are interrupted, consistent with normal stream completion. Provides better visibility into API costs for interrupted requests. Changes: - StreamAbortEvent includes optional metadata (usage, duration) - StreamManager extracts usage with timeout to handle early aborts - IPC forwards complete abort event (not just type/workspaceId) - StreamingMessageAggregator merges abort metadata into message - Integration test verifies abort events include duration Usage availability on abort depends on timing: - Early abort (before API completes): usage is undefined - Late abort (after API finishes): usage is available The timeout prevents hanging when usage promise doesn't resolve. _Generated with `cmux`_ --- src/services/ipcMain.ts | 18 ++---- src/services/streamManager.ts | 40 +++++++++++-- src/types/stream.ts | 5 ++ .../messages/StreamingMessageAggregator.ts | 8 ++- tests/ipcMain/sendMessage.test.ts | 60 +++++++++++++++++++ 5 files changed, 113 insertions(+), 18 deletions(-) diff --git a/src/services/ipcMain.ts b/src/services/ipcMain.ts index 96c5c8113..a654af266 100644 --- a/src/services/ipcMain.ts +++ b/src/services/ipcMain.ts @@ -21,6 +21,7 @@ import type { StreamStartEvent, StreamDeltaEvent, StreamEndEvent, + StreamAbortEvent, ToolCallStartEvent, ToolCallDeltaEvent, ToolCallEndEvent, @@ -1156,19 +1157,12 @@ export class IpcMain { }); // Handle stream abort events - this.aiService.on( - "stream-abort", - (data: { type: string; workspaceId: string; messageId?: string }) => { - if (this.mainWindow) { - // Send the stream-abort event to frontend - this.mainWindow.webContents.send(getChatChannel(data.workspaceId), { - type: "stream-abort", - workspaceId: data.workspaceId, - messageId: data.messageId, - }); - } + this.aiService.on("stream-abort", (data: StreamAbortEvent) => { + if (this.mainWindow) { + // Forward complete abort event including metadata (usage, duration) + this.mainWindow.webContents.send(getChatChannel(data.workspaceId), data); } - ); + }); } /** diff --git a/src/services/streamManager.ts b/src/services/streamManager.ts index 70e155eeb..7a8ebcfd8 100644 --- a/src/services/streamManager.ts +++ b/src/services/streamManager.ts @@ -10,6 +10,7 @@ import { APICallError, RetryError, } from "ai"; +import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import type { Result } from "@/types/result"; import { Ok, Err } from "@/types/result"; import { log } from "./log"; @@ -219,6 +220,33 @@ export class StreamManager extends EventEmitter { return randomUUID() as StreamToken; } + /** + * Extracts usage and duration metadata from stream result. + * + * Usage is only available after stream completes naturally. + * On abort, the usage promise may hang - we use a timeout to return quickly. + */ + private async getStreamMetadata( + streamInfo: WorkspaceStreamInfo, + timeoutMs = 1000 + ): Promise<{ usage?: LanguageModelV2Usage; duration: number }> { + let usage = undefined; + try { + // Race usage retrieval against timeout to prevent hanging on abort + usage = await Promise.race([ + streamInfo.streamResult.usage, + new Promise((resolve) => setTimeout(() => resolve(undefined), timeoutMs)), + ]); + } catch (error) { + log.debug("Could not retrieve usage:", error); + } + + return { + usage, + duration: Date.now() - streamInfo.startTime, + }; + } + /** * Safely cancels an existing stream with proper cleanup * @@ -243,11 +271,15 @@ export class StreamManager extends EventEmitter { // while a new stream starts (e.g., old stream writing to partial.json) await streamInfo.processingPromise; - // Emit abort event + // Get usage and duration metadata (usage may be undefined if aborted early) + const { usage, duration } = await this.getStreamMetadata(streamInfo); + + // Emit abort event with usage if available this.emit("stream-abort", { type: "stream-abort", workspaceId: workspaceId as string, messageId: streamInfo.messageId, + metadata: { usage, duration }, }); // Clean up immediately @@ -580,8 +612,8 @@ export class StreamManager extends EventEmitter { // Check if stream completed successfully if (!streamInfo.abortController.signal.aborted) { - // Get usage and provider metadata from stream result - const usage = await streamInfo.streamResult.usage; + // Get usage, duration, and provider metadata from stream result + const { usage, duration } = await this.getStreamMetadata(streamInfo); const providerMetadata = await streamInfo.streamResult.providerMetadata; // Emit stream end event with parts preserved in temporal order @@ -594,7 +626,7 @@ export class StreamManager extends EventEmitter { model: streamInfo.model, usage, // AI SDK normalized usage providerMetadata, // Raw provider metadata - duration: Date.now() - streamInfo.startTime, + duration, }, parts: streamInfo.parts, // Parts array with temporal ordering (includes reasoning) }; diff --git a/src/types/stream.ts b/src/types/stream.ts index 79088293f..29c9c040d 100644 --- a/src/types/stream.ts +++ b/src/types/stream.ts @@ -47,6 +47,11 @@ export interface StreamAbortEvent { type: "stream-abort"; workspaceId: string; messageId: string; + // Metadata may contain usage if abort occurred after stream completed processing + metadata?: { + usage?: LanguageModelV2Usage; + duration?: number; + }; } export interface ErrorEvent { diff --git a/src/utils/messages/StreamingMessageAggregator.ts b/src/utils/messages/StreamingMessageAggregator.ts index 23a7253fb..6db8d2fc0 100644 --- a/src/utils/messages/StreamingMessageAggregator.ts +++ b/src/utils/messages/StreamingMessageAggregator.ts @@ -265,10 +265,14 @@ export class StreamingMessageAggregator { const activeStream = this.activeStreams.get(data.messageId); if (activeStream) { - // Mark the message as interrupted + // Mark the message as interrupted and merge metadata (consistent with handleStreamEnd) const message = this.messages.get(data.messageId); if (message?.metadata) { - message.metadata.partial = true; + message.metadata = { + ...message.metadata, + partial: true, + ...data.metadata, // Spread abort metadata (usage, duration) + }; } // Clean up active stream - direct delete by messageId diff --git a/tests/ipcMain/sendMessage.test.ts b/tests/ipcMain/sendMessage.test.ts index 3cab00cb5..b02b6fb0c 100644 --- a/tests/ipcMain/sendMessage.test.ts +++ b/tests/ipcMain/sendMessage.test.ts @@ -124,6 +124,66 @@ describeIntegration("IpcMain sendMessage integration tests", () => { 15000 ); + test.concurrent( + "should include usage data in stream-abort events", + async () => { + // Setup test environment + const { env, workspaceId, cleanup } = await setupWorkspace(provider); + try { + // Start a stream that will generate some tokens + const message = "Write a haiku about coding"; + void sendMessageWithModel(env.mockIpcRenderer, workspaceId, message, provider, model); + + // Wait for stream to start and get some deltas + const collector = createEventCollector(env.sentEvents, workspaceId); + await collector.waitForEvent("stream-start", 5000); + + // Wait a bit for some content to be generated + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Interrupt the stream with an empty message + const interruptResult = await sendMessageWithModel( + env.mockIpcRenderer, + workspaceId, + "", + provider, + model + ); + + expect(interruptResult.success).toBe(true); + + // Collect all events and find abort event + await waitFor(() => { + collector.collect(); + return collector.getEvents().some((e) => "type" in e && e.type === "stream-abort"); + }, 5000); + + const abortEvent = collector + .getEvents() + .find((e) => "type" in e && e.type === "stream-abort"); + expect(abortEvent).toBeDefined(); + + // Verify abort event structure + if (abortEvent && "metadata" in abortEvent) { + // Metadata should exist with duration + expect(abortEvent.metadata).toBeDefined(); + expect(abortEvent.metadata?.duration).toBeGreaterThan(0); + + // Usage MAY be present depending on abort timing: + // - Early abort: usage is undefined (stream didn't complete) + // - Late abort: usage available (stream finished before UI processed it) + if (abortEvent.metadata?.usage) { + expect(abortEvent.metadata.usage.inputTokens).toBeGreaterThan(0); + expect(abortEvent.metadata.usage.outputTokens).toBeGreaterThanOrEqual(0); + } + } + } finally { + await cleanup(); + } + }, + 15000 + ); + test.concurrent( "should handle reconnection during active stream", async () => {