Skip to content

Commit 73b0a22

Browse files
committed
🤖 feat: mid-stream token usage updates
Emit usage-delta events on AI SDK finish-step, allowing the UI to display live token counts as the AI generates multi-step responses. Backend: - Add UsageDeltaEvent type emitted on each finish-step - StreamManager tracks and emits usage after each tool call step - Forward events through AIService → AgentSession → IPC Frontend: - StreamingMessageAggregator stores active stream usage per messageId - WorkspaceStore exposes liveUsage in WorkspaceUsageState - CostsTab, RightSidebar, ChatMetaSidebar consume liveUsage for real-time context window display Tests: - Unit tests for usage-delta handling in StreamingMessageAggregator - Integration test for usage-delta events during multi-step streams
1 parent a253090 commit 73b0a22

File tree

12 files changed

+237
-22
lines changed

12 files changed

+237
-22
lines changed

src/browser/components/ChatMetaSidebar.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const ChatMetaSidebarComponent: React.FC<ChatMetaSidebarProps> = ({ workspaceId,
1919
const use1M = options.anthropic?.use1MContext ?? false;
2020
const chatAreaSize = useResizeObserver(chatAreaRef);
2121

22-
const lastUsage = usage?.usageHistory[usage.usageHistory.length - 1];
22+
const lastUsage = usage?.liveUsage ?? usage?.usageHistory[usage.usageHistory.length - 1];
2323

2424
// Memoize vertical meter data calculation to prevent unnecessary re-renders
2525
const verticalMeterData = React.useMemo(() => {

src/browser/components/RightSidebar.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ const RightSidebarComponent: React.FC<RightSidebarProps> = ({
134134
const costsPanelId = `${baseId}-panel-costs`;
135135
const reviewPanelId = `${baseId}-panel-review`;
136136

137-
const lastUsage = usage?.usageHistory[usage.usageHistory.length - 1];
137+
const lastUsage = usage?.liveUsage ?? usage?.usageHistory[usage.usageHistory.length - 1];
138138

139139
// Memoize vertical meter data calculation to prevent unnecessary re-renders
140140
const verticalMeterData = React.useMemo(() => {

src/browser/components/RightSidebar/CostsTab.tsx

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,15 @@ const CostsTabComponent: React.FC<CostsTabProps> = ({ workspaceId }) => {
6363
const { options } = useProviderOptions();
6464
const use1M = options.anthropic?.use1MContext ?? false;
6565

66-
// Check if we have any data to display
67-
const hasUsageData = usage && usage.usageHistory.length > 0;
66+
// Session usage for cost
67+
const sessionUsage = React.useMemo(() => {
68+
const historicalSum = sumUsageHistory(usage.usageHistory);
69+
if (!usage.liveUsage) return historicalSum;
70+
if (!historicalSum) return usage.liveUsage;
71+
return sumUsageHistory([historicalSum, usage.liveUsage]);
72+
}, [usage.usageHistory, usage.liveUsage]);
73+
74+
const hasUsageData = usage && (usage.usageHistory.length > 0 || usage.liveUsage !== undefined);
6875
const hasConsumerData = consumers && (consumers.totalTokens > 0 || consumers.isCalculating);
6976
const hasAnyData = hasUsageData || hasConsumerData;
7077

@@ -80,28 +87,22 @@ const CostsTabComponent: React.FC<CostsTabProps> = ({ workspaceId }) => {
8087
);
8188
}
8289

83-
// Context Usage always shows Last Request data
84-
const lastRequestUsage = hasUsageData
85-
? usage.usageHistory[usage.usageHistory.length - 1]
86-
: undefined;
90+
// Last Request (for Cost section): always the last completed request
91+
const lastRequestUsage = usage.usageHistory[usage.usageHistory.length - 1];
8792

8893
// Cost and Details table use viewMode
89-
const displayUsage =
90-
viewMode === "last-request"
91-
? usage.usageHistory[usage.usageHistory.length - 1]
92-
: sumUsageHistory(usage.usageHistory);
94+
const displayUsage = viewMode === "last-request" ? lastRequestUsage : sessionUsage;
9395

9496
return (
9597
<div className="text-light font-primary text-[13px] leading-relaxed">
9698
{hasUsageData && (
9799
<div data-testid="context-usage-section" className="mt-2 mb-5">
98100
<div data-testid="context-usage-list" className="flex flex-col gap-3">
99101
{(() => {
100-
// Context Usage always uses last request
101-
const contextUsage = lastRequestUsage;
102-
103-
// Get model from last request (for context window display)
104-
const model = lastRequestUsage?.model ?? "unknown";
102+
// Context usage: live when streaming, else last historical
103+
const contextUsage =
104+
usage.liveUsage ?? usage.usageHistory[usage.usageHistory.length - 1];
105+
const model = contextUsage?.model ?? "unknown";
105106

106107
// Get max tokens for the model from the model stats database
107108
const modelStats = getModelStats(model);

src/browser/stores/WorkspaceStore.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
isRestoreToInput,
1818
} from "@/common/types/ipc";
1919
import { MapStore } from "./MapStore";
20-
import { collectUsageHistory } from "@/common/utils/tokens/displayUsage";
20+
import { collectUsageHistory, createDisplayUsage } from "@/common/utils/tokens/displayUsage";
2121
import { WorkspaceConsumerManager } from "./WorkspaceConsumerManager";
2222
import type { ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator";
2323
import type { TokenConsumer } from "@/common/types/chatStats";
@@ -63,6 +63,8 @@ type DerivedState = Record<string, number>;
6363
export interface WorkspaceUsageState {
6464
usageHistory: ChatUsageDisplay[];
6565
totalTokens: number;
66+
/** Live usage during streaming (inputTokens = current context window) */
67+
liveUsage?: ChatUsageDisplay;
6668
}
6769

6870
/**
@@ -178,6 +180,10 @@ export class WorkspaceStore {
178180
aggregator.handleReasoningEnd(data as never);
179181
this.states.bump(workspaceId);
180182
},
183+
"usage-delta": (workspaceId, aggregator, data) => {
184+
aggregator.handleUsageDelta(data as never);
185+
this.usageStore.bump(workspaceId);
186+
},
181187
"init-start": (workspaceId, aggregator, data) => {
182188
aggregator.handleMessage(data);
183189
this.states.bump(workspaceId);
@@ -449,7 +455,12 @@ export class WorkspaceStore {
449455
0
450456
);
451457

452-
return { usageHistory, totalTokens };
458+
// Include active stream usage if currently streaming (already converted)
459+
const activeStreamId = aggregator.getActiveStreamMessageId();
460+
const rawUsage = activeStreamId ? aggregator.getActiveStreamUsage(activeStreamId) : undefined;
461+
const liveUsage = rawUsage && model ? createDisplayUsage(rawUsage, model) : undefined;
462+
463+
return { usageHistory, totalTokens, liveUsage };
453464
});
454465
}
455466

src/browser/utils/messages/StreamingMessageAggregator.test.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,4 +379,96 @@ describe("StreamingMessageAggregator", () => {
379379
expect(aggregator.getCurrentTodos()).toHaveLength(0);
380380
});
381381
});
382+
383+
describe("usage-delta handling", () => {
384+
test("handleUsageDelta stores usage by messageId", () => {
385+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
386+
387+
aggregator.handleUsageDelta({
388+
type: "usage-delta",
389+
workspaceId: "ws-1",
390+
messageId: "msg-1",
391+
usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
392+
});
393+
394+
expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({
395+
inputTokens: 1000,
396+
outputTokens: 50,
397+
totalTokens: 1050,
398+
});
399+
});
400+
401+
test("clearTokenState removes usage", () => {
402+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
403+
404+
aggregator.handleUsageDelta({
405+
type: "usage-delta",
406+
workspaceId: "ws-1",
407+
messageId: "msg-1",
408+
usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
409+
});
410+
411+
expect(aggregator.getActiveStreamUsage("msg-1")).toBeDefined();
412+
413+
aggregator.clearTokenState("msg-1");
414+
415+
expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined();
416+
});
417+
418+
test("latest usage-delta replaces previous for same messageId", () => {
419+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
420+
421+
// First step usage
422+
aggregator.handleUsageDelta({
423+
type: "usage-delta",
424+
workspaceId: "ws-1",
425+
messageId: "msg-1",
426+
usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
427+
});
428+
429+
// Second step usage (larger context after tool result added)
430+
aggregator.handleUsageDelta({
431+
type: "usage-delta",
432+
workspaceId: "ws-1",
433+
messageId: "msg-1",
434+
usage: { inputTokens: 1500, outputTokens: 100, totalTokens: 1600 },
435+
});
436+
437+
// Should have latest values, not summed
438+
expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({
439+
inputTokens: 1500,
440+
outputTokens: 100,
441+
totalTokens: 1600,
442+
});
443+
});
444+
445+
test("tracks usage independently per messageId", () => {
446+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
447+
448+
aggregator.handleUsageDelta({
449+
type: "usage-delta",
450+
workspaceId: "ws-1",
451+
messageId: "msg-1",
452+
usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 },
453+
});
454+
455+
aggregator.handleUsageDelta({
456+
type: "usage-delta",
457+
workspaceId: "ws-1",
458+
messageId: "msg-2",
459+
usage: { inputTokens: 2000, outputTokens: 100, totalTokens: 2100 },
460+
});
461+
462+
expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({
463+
inputTokens: 1000,
464+
outputTokens: 50,
465+
totalTokens: 1050,
466+
});
467+
expect(aggregator.getActiveStreamUsage("msg-2")).toEqual({
468+
inputTokens: 2000,
469+
outputTokens: 100,
470+
totalTokens: 2100,
471+
});
472+
});
473+
});
382474
});

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { createMuxMessage } from "@/common/types/message";
99
import type {
1010
StreamStartEvent,
1111
StreamDeltaEvent,
12+
UsageDeltaEvent,
1213
StreamEndEvent,
1314
StreamAbortEvent,
1415
ToolCallStartEvent,
@@ -17,6 +18,7 @@ import type {
1718
ReasoningDeltaEvent,
1819
ReasoningEndEvent,
1920
} from "@/common/types/stream";
21+
import type { LanguageModelV2Usage } from "@ai-sdk/provider";
2022
import type { TodoItem, StatusSetToolResult } from "@/common/types/tools";
2123

2224
import type { WorkspaceChatMessage, StreamErrorMessage, DeleteMessage } from "@/common/types/ipc";
@@ -72,6 +74,10 @@ export class StreamingMessageAggregator {
7274
// Delta history for token counting and TPS calculation
7375
private deltaHistory = new Map<string, DeltaRecordStorage>();
7476

77+
// Active stream step usage (updated on each stream-step event)
78+
// Tracks cumulative usage for the currently streaming message
79+
private activeStreamStepUsage = new Map<string, LanguageModelV2Usage>();
80+
7581
// Current TODO list (updated when todo_write succeeds, cleared on stream end)
7682
// Stream-scoped: automatically reset when stream completes
7783
// On reload: only reconstructed if reconnecting to active stream
@@ -992,5 +998,20 @@ export class StreamingMessageAggregator {
992998
*/
993999
clearTokenState(messageId: string): void {
9941000
this.deltaHistory.delete(messageId);
1001+
this.activeStreamStepUsage.delete(messageId);
1002+
}
1003+
1004+
/**
1005+
* Handle usage-delta event: update cumulative usage for active stream
1006+
*/
1007+
handleUsageDelta(data: UsageDeltaEvent): void {
1008+
this.activeStreamStepUsage.set(data.messageId, data.usage);
1009+
}
1010+
1011+
/**
1012+
* Get active stream usage for a message (if streaming)
1013+
*/
1014+
getActiveStreamUsage(messageId: string): LanguageModelV2Usage | undefined {
1015+
return this.activeStreamStepUsage.get(messageId);
9951016
}
9961017
}

src/common/types/ipc.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import type {
2121
StreamDeltaEvent,
2222
StreamEndEvent,
2323
StreamAbortEvent,
24+
UsageDeltaEvent,
2425
ToolCallStartEvent,
2526
ToolCallDeltaEvent,
2627
ToolCallEndEvent,
@@ -103,6 +104,7 @@ export type WorkspaceChatMessage =
103104
| DeleteMessage
104105
| StreamStartEvent
105106
| StreamDeltaEvent
107+
| UsageDeltaEvent
106108
| StreamEndEvent
107109
| StreamAbortEvent
108110
| ToolCallStartEvent
@@ -149,6 +151,11 @@ export function isStreamAbort(msg: WorkspaceChatMessage): msg is StreamAbortEven
149151
return "type" in msg && msg.type === "stream-abort";
150152
}
151153

154+
// Type guard for usage delta events
155+
export function isUsageDelta(msg: WorkspaceChatMessage): msg is UsageDeltaEvent {
156+
return "type" in msg && msg.type === "usage-delta";
157+
}
158+
152159
// Type guard for tool call start events
153160
export function isToolCallStart(msg: WorkspaceChatMessage): msg is ToolCallStartEvent {
154161
return "type" in msg && msg.type === "tool-call-start";

src/common/types/stream.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,17 @@ export interface ReasoningEndEvent {
121121
messageId: string;
122122
}
123123

124+
/**
125+
* Emitted on each AI SDK finish-step event, providing incremental usage updates.
126+
* Allows UI to update token display as steps complete (after each tool call or at stream end).
127+
*/
128+
export interface UsageDeltaEvent {
129+
type: "usage-delta";
130+
workspaceId: string;
131+
messageId: string;
132+
usage: LanguageModelV2Usage; // This step's usage (inputTokens = full context)
133+
}
134+
124135
export type AIServiceEvent =
125136
| StreamStartEvent
126137
| StreamDeltaEvent
@@ -132,4 +143,5 @@ export type AIServiceEvent =
132143
| ToolCallEndEvent
133144
| ReasoningStartEvent
134145
| ReasoningDeltaEvent
135-
| ReasoningEndEvent;
146+
| ReasoningEndEvent
147+
| UsageDeltaEvent;

src/node/services/agentSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ export class AgentSession {
464464
});
465465
forward("reasoning-delta", (payload) => this.emitChatEvent(payload));
466466
forward("reasoning-end", (payload) => this.emitChatEvent(payload));
467+
forward("usage-delta", (payload) => this.emitChatEvent(payload));
467468

468469
forward("stream-end", async (payload) => {
469470
const handled = await this.compactionHandler.handleCompletion(payload as StreamEndEvent);

src/node/services/aiService.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ export class AIService extends EventEmitter {
203203
// Forward reasoning events
204204
this.streamManager.on("reasoning-delta", (data) => this.emit("reasoning-delta", data));
205205
this.streamManager.on("reasoning-end", (data) => this.emit("reasoning-end", data));
206+
this.streamManager.on("usage-delta", (data) => this.emit("usage-delta", data));
206207
}
207208

208209
private async ensureSessionsDir(): Promise<void> {

0 commit comments

Comments
 (0)