diff --git a/src/browser/components/ChatMetaSidebar.tsx b/src/browser/components/ChatMetaSidebar.tsx index 7b539b1cd7..46e604ac8d 100644 --- a/src/browser/components/ChatMetaSidebar.tsx +++ b/src/browser/components/ChatMetaSidebar.tsx @@ -19,7 +19,8 @@ const ChatMetaSidebarComponent: React.FC = ({ workspaceId, const use1M = options.anthropic?.use1MContext ?? false; const chatAreaSize = useResizeObserver(chatAreaRef); - const lastUsage = usage?.liveUsage ?? usage?.usageHistory[usage.usageHistory.length - 1]; + // Use lastContextUsage for context window display (last step = actual context size) + const lastUsage = usage?.liveUsage ?? usage?.lastContextUsage; // Memoize vertical meter data calculation to prevent unnecessary re-renders const verticalMeterData = React.useMemo(() => { diff --git a/src/browser/components/RightSidebar.tsx b/src/browser/components/RightSidebar.tsx index 736dad60bd..6ee1cbb904 100644 --- a/src/browser/components/RightSidebar.tsx +++ b/src/browser/components/RightSidebar.tsx @@ -135,7 +135,8 @@ const RightSidebarComponent: React.FC = ({ const costsPanelId = `${baseId}-panel-costs`; const reviewPanelId = `${baseId}-panel-review`; - const lastUsage = usage?.liveUsage ?? usage?.usageHistory[usage.usageHistory.length - 1]; + // Use lastContextUsage for context window display (last step = actual context size) + const lastUsage = usage?.liveUsage ?? usage?.lastContextUsage; const model = lastUsage?.model ?? null; // Auto-compaction settings: threshold per-model diff --git a/src/browser/components/RightSidebar/CostsTab.tsx b/src/browser/components/RightSidebar/CostsTab.tsx index b9da3f7cc0..2614910856 100644 --- a/src/browser/components/RightSidebar/CostsTab.tsx +++ b/src/browser/components/RightSidebar/CostsTab.tsx @@ -65,22 +65,28 @@ const CostsTabComponent: React.FC = ({ workspaceId }) => { const use1M = options.anthropic?.use1MContext ?? false; // Get model from context usage for per-model threshold storage - const contextUsage = usage.liveUsage ?? usage.usageHistory[usage.usageHistory.length - 1]; - const currentModel = contextUsage?.model ?? null; + // Use lastContextUsage for context window display (last step's usage) + const contextUsageForModel = usage.liveUsage ?? usage.lastContextUsage; + const currentModel = contextUsageForModel?.model ?? null; // Auto-compaction settings: threshold per-model (100 = disabled) const { threshold: autoCompactThreshold, setThreshold: setAutoCompactThreshold } = useAutoCompactionSettings(workspaceId, currentModel); - // Session usage for cost + // Session usage for cost calculation + // Uses usageHistory (total across all steps) + liveCostUsage (cumulative during streaming) const sessionUsage = React.useMemo(() => { const historicalSum = sumUsageHistory(usage.usageHistory); - if (!usage.liveUsage) return historicalSum; - if (!historicalSum) return usage.liveUsage; - return sumUsageHistory([historicalSum, usage.liveUsage]); - }, [usage.usageHistory, usage.liveUsage]); - - const hasUsageData = usage && (usage.usageHistory.length > 0 || usage.liveUsage !== undefined); + if (!usage.liveCostUsage) return historicalSum; + if (!historicalSum) return usage.liveCostUsage; + return sumUsageHistory([historicalSum, usage.liveCostUsage]); + }, [usage.usageHistory, usage.liveCostUsage]); + + const hasUsageData = + usage && + (usage.usageHistory.length > 0 || + usage.lastContextUsage !== undefined || + usage.liveUsage !== undefined); const hasConsumerData = consumers && (consumers.totalTokens > 0 || consumers.isCalculating); const hasAnyData = hasUsageData || hasConsumerData; @@ -109,8 +115,8 @@ const CostsTabComponent: React.FC = ({ workspaceId }) => {
{(() => { // Context usage: live when streaming, else last historical - const contextUsage = - usage.liveUsage ?? usage.usageHistory[usage.usageHistory.length - 1]; + // Uses lastContextUsage (last step) for accurate context window size + const contextUsage = usage.liveUsage ?? usage.lastContextUsage; const model = contextUsage?.model ?? "unknown"; // Get max tokens for the model from the model stats database diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 5ab7324565..295d0acd49 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -59,12 +59,21 @@ type DerivedState = Record; /** * Usage metadata extracted from API responses (no tokenization). * Updates instantly when usage metadata arrives. + * + * For multi-step tool calls, cost and context usage differ: + * - usageHistory: Total usage per message (sum of all steps) for cost calculation + * - lastContextUsage: Last step's usage for context window display (inputTokens = actual context size) */ export interface WorkspaceUsageState { + /** Usage history for cost calculation (total across all steps per message) */ usageHistory: ChatUsageDisplay[]; + /** Last message's context usage (last step only, for context window display) */ + lastContextUsage?: ChatUsageDisplay; totalTokens: number; - /** Live usage during streaming (inputTokens = current context window) */ + /** Live context usage during streaming (last step's inputTokens = current context window) */ liveUsage?: ChatUsageDisplay; + /** Live cost usage during streaming (cumulative across all steps) */ + liveCostUsage?: ChatUsageDisplay; } /** @@ -441,6 +450,8 @@ export class WorkspaceStore { const messages = aggregator.getAllMessages(); const model = aggregator.getCurrentModel(); + + // Collect usage history for cost calculation (total across all steps per message) const usageHistory = collectUsageHistory(messages, model); // Calculate total from usage history (now includes historical) @@ -455,12 +466,47 @@ export class WorkspaceStore { 0 ); - // Include active stream usage if currently streaming (already converted) + // Get last message's context usage for context window display + // Uses contextUsage (last step) if available, falls back to usage for old messages + const lastContextUsage = (() => { + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (msg.role === "assistant") { + const rawUsage = msg.metadata?.contextUsage ?? msg.metadata?.usage; + const providerMeta = + msg.metadata?.contextProviderMetadata ?? msg.metadata?.providerMetadata; + if (rawUsage) { + const msgModel = msg.metadata?.model ?? model ?? "unknown"; + return createDisplayUsage(rawUsage, msgModel, providerMeta); + } + } + } + return undefined; + })(); + + // Include active stream usage if currently streaming const activeStreamId = aggregator.getActiveStreamMessageId(); - const rawUsage = activeStreamId ? aggregator.getActiveStreamUsage(activeStreamId) : undefined; - const liveUsage = rawUsage && model ? createDisplayUsage(rawUsage, model) : undefined; - return { usageHistory, totalTokens, liveUsage }; + // Live context usage (last step's inputTokens = current context window) + const rawContextUsage = activeStreamId + ? aggregator.getActiveStreamUsage(activeStreamId) + : undefined; + const liveUsage = + rawContextUsage && model ? createDisplayUsage(rawContextUsage, model) : undefined; + + // Live cost usage (cumulative across all steps, with accumulated cache creation tokens) + const rawCumulativeUsage = activeStreamId + ? aggregator.getActiveStreamCumulativeUsage(activeStreamId) + : undefined; + const rawCumulativeProviderMetadata = activeStreamId + ? aggregator.getActiveStreamCumulativeProviderMetadata(activeStreamId) + : undefined; + const liveCostUsage = + rawCumulativeUsage && model + ? createDisplayUsage(rawCumulativeUsage, model, rawCumulativeProviderMetadata) + : undefined; + + return { usageHistory, lastContextUsage, totalTokens, liveUsage, liveCostUsage }; }); } diff --git a/src/browser/utils/compaction/autoCompactionCheck.test.ts b/src/browser/utils/compaction/autoCompactionCheck.test.ts index 1b511758a8..a2e0040f12 100644 --- a/src/browser/utils/compaction/autoCompactionCheck.test.ts +++ b/src/browser/utils/compaction/autoCompactionCheck.test.ts @@ -40,9 +40,11 @@ const createMockUsage = ( } // Add recent usage - usageHistory.push(createUsageEntry(lastEntryTokens, model)); + const recentUsage = createUsageEntry(lastEntryTokens, model); + usageHistory.push(recentUsage); - return { usageHistory, totalTokens: 0, liveUsage }; + // lastContextUsage is the most recent context window state + return { usageHistory, lastContextUsage: recentUsage, totalTokens: 0, liveUsage }; }; describe("checkAutoCompaction", () => { @@ -136,17 +138,17 @@ describe("checkAutoCompaction", () => { test("includes all token types in calculation", () => { // Create usage with all token types specified + const usageEntry = { + input: { tokens: 10_000 }, + cached: { tokens: 5_000 }, + cacheCreate: { tokens: 2_000 }, + output: { tokens: 3_000 }, + reasoning: { tokens: 1_000 }, + model: KNOWN_MODELS.SONNET.id, + }; const usage: WorkspaceUsageState = { - usageHistory: [ - { - input: { tokens: 10_000 }, - cached: { tokens: 5_000 }, - cacheCreate: { tokens: 2_000 }, - output: { tokens: 3_000 }, - reasoning: { tokens: 1_000 }, - model: KNOWN_MODELS.SONNET.id, - }, - ], + usageHistory: [usageEntry], + lastContextUsage: usageEntry, totalTokens: 0, }; @@ -232,17 +234,17 @@ describe("checkAutoCompaction", () => { }); test("handles zero tokens gracefully", () => { + const zeroEntry = { + input: { tokens: 0 }, + cached: { tokens: 0 }, + cacheCreate: { tokens: 0 }, + output: { tokens: 0 }, + reasoning: { tokens: 0 }, + model: KNOWN_MODELS.SONNET.id, + }; const usage: WorkspaceUsageState = { - usageHistory: [ - { - input: { tokens: 0 }, - cached: { tokens: 0 }, - cacheCreate: { tokens: 0 }, - output: { tokens: 0 }, - reasoning: { tokens: 0 }, - model: KNOWN_MODELS.SONNET.id, - }, - ], + usageHistory: [zeroEntry], + lastContextUsage: zeroEntry, totalTokens: 0, }; @@ -357,7 +359,11 @@ describe("checkAutoCompaction", () => { test("shouldForceCompact triggers with empty history but liveUsage near limit", () => { // Bug fix: empty history but liveUsage should still trigger const liveUsage = createUsageEntry(SONNET_MAX_TOKENS - BUFFER); - const usage: WorkspaceUsageState = { usageHistory: [], totalTokens: 0, liveUsage }; + const usage: WorkspaceUsageState = { + usageHistory: [], + totalTokens: 0, + liveUsage, + }; const result = checkAutoCompaction(usage, KNOWN_MODELS.SONNET.id, false); expect(result.shouldForceCompact).toBe(true); diff --git a/src/browser/utils/compaction/autoCompactionCheck.ts b/src/browser/utils/compaction/autoCompactionCheck.ts index 107221c424..80e56ff0ae 100644 --- a/src/browser/utils/compaction/autoCompactionCheck.ts +++ b/src/browser/utils/compaction/autoCompactionCheck.ts @@ -94,8 +94,9 @@ export function checkAutoCompaction( }; } - // Current usage: live when streaming, else last historical (pattern from CostsTab) - const lastUsage = usage.usageHistory[usage.usageHistory.length - 1]; + // Current usage: live when streaming, else last historical + // Use lastContextUsage (last step) for accurate context window size + const lastUsage = usage.lastContextUsage; const currentUsage = usage.liveUsage ?? lastUsage; // Force-compact when approaching context limit (can trigger even with empty history if streaming) diff --git a/src/browser/utils/messages/StreamingMessageAggregator.test.ts b/src/browser/utils/messages/StreamingMessageAggregator.test.ts index cac12d623f..2e239a626f 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.test.ts @@ -389,6 +389,7 @@ describe("StreamingMessageAggregator", () => { workspaceId: "ws-1", messageId: "msg-1", usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, }); expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ @@ -406,6 +407,7 @@ describe("StreamingMessageAggregator", () => { workspaceId: "ws-1", messageId: "msg-1", usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, }); expect(aggregator.getActiveStreamUsage("msg-1")).toBeDefined(); @@ -424,6 +426,7 @@ describe("StreamingMessageAggregator", () => { workspaceId: "ws-1", messageId: "msg-1", usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, }); // Second step usage (larger context after tool result added) @@ -432,14 +435,21 @@ describe("StreamingMessageAggregator", () => { workspaceId: "ws-1", messageId: "msg-1", usage: { inputTokens: 1500, outputTokens: 100, totalTokens: 1600 }, + cumulativeUsage: { inputTokens: 2500, outputTokens: 150, totalTokens: 2650 }, }); - // Should have latest values, not summed + // Should have latest step's values (for context window display) expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ inputTokens: 1500, outputTokens: 100, totalTokens: 1600, }); + // Cumulative should be sum of all steps (for cost display) + expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toEqual({ + inputTokens: 2500, + outputTokens: 150, + totalTokens: 2650, + }); }); test("tracks usage independently per messageId", () => { @@ -450,6 +460,7 @@ describe("StreamingMessageAggregator", () => { workspaceId: "ws-1", messageId: "msg-1", usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, }); aggregator.handleUsageDelta({ @@ -457,6 +468,7 @@ describe("StreamingMessageAggregator", () => { workspaceId: "ws-1", messageId: "msg-2", usage: { inputTokens: 2000, outputTokens: 100, totalTokens: 2100 }, + cumulativeUsage: { inputTokens: 2000, outputTokens: 100, totalTokens: 2100 }, }); expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ @@ -470,5 +482,118 @@ describe("StreamingMessageAggregator", () => { totalTokens: 2100, }); }); + + test("stores and retrieves cumulativeProviderMetadata", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeProviderMetadata: { + anthropic: { cacheCreationInputTokens: 500, cacheReadInputTokens: 200 }, + }, + }); + + expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toEqual({ + anthropic: { cacheCreationInputTokens: 500, cacheReadInputTokens: 200 }, + }); + }); + + test("cumulativeProviderMetadata is undefined when not provided", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + // No cumulativeProviderMetadata + }); + + expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeUndefined(); + }); + + test("clearTokenState clears all usage tracking (step, cumulative, metadata)", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeProviderMetadata: { anthropic: { cacheCreationInputTokens: 500 } }, + }); + + // All should be defined + expect(aggregator.getActiveStreamUsage("msg-1")).toBeDefined(); + expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toBeDefined(); + expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeDefined(); + + aggregator.clearTokenState("msg-1"); + + // All should be cleared + expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined(); + expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toBeUndefined(); + expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeUndefined(); + }); + + test("multi-step scenario: step usage replaced, cumulative accumulated", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + // Step 1: Initial request with cache creation + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeProviderMetadata: { anthropic: { cacheCreationInputTokens: 800 } }, + }); + + // Verify step 1 state + expect(aggregator.getActiveStreamUsage("msg-1")?.inputTokens).toBe(1000); + expect(aggregator.getActiveStreamCumulativeUsage("msg-1")?.inputTokens).toBe(1000); + expect( + ( + aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")?.anthropic as { + cacheCreationInputTokens: number; + } + ).cacheCreationInputTokens + ).toBe(800); + + // Step 2: After tool call, larger context, more cache creation + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1500, outputTokens: 100, totalTokens: 1600 }, // Last step only + cumulativeUsage: { inputTokens: 2500, outputTokens: 150, totalTokens: 2650 }, // Sum of all + cumulativeProviderMetadata: { anthropic: { cacheCreationInputTokens: 1200 } }, // Sum of all + }); + + // Step usage should be REPLACED (last step only) + expect(aggregator.getActiveStreamUsage("msg-1")).toEqual({ + inputTokens: 1500, + outputTokens: 100, + totalTokens: 1600, + }); + + // Cumulative usage should show SUM of all steps + expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toEqual({ + inputTokens: 2500, + outputTokens: 150, + totalTokens: 2650, + }); + + // Cumulative metadata should show SUM of cache creation tokens + expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toEqual({ + anthropic: { cacheCreationInputTokens: 1200 }, + }); + }); }); }); diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index fcb824c195..b8f6eac45d 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -75,8 +75,12 @@ export class StreamingMessageAggregator { private deltaHistory = new Map(); // Active stream step usage (updated on each stream-step event) - // Tracks cumulative usage for the currently streaming message + // Tracks last step's usage for context window display private activeStreamStepUsage = new Map(); + // Tracks cumulative usage across all steps for live cost display + private activeStreamCumulativeUsage = new Map(); + // Tracks cumulative provider metadata for live cost display (with cache creation tokens) + private activeStreamCumulativeProviderMetadata = new Map>(); // Current TODO list (updated when todo_write succeeds, cleared on stream end) // Stream-scoped: automatically reset when stream completes @@ -1064,19 +1068,47 @@ export class StreamingMessageAggregator { clearTokenState(messageId: string): void { this.deltaHistory.delete(messageId); this.activeStreamStepUsage.delete(messageId); + this.activeStreamCumulativeUsage.delete(messageId); + this.activeStreamCumulativeProviderMetadata.delete(messageId); } /** - * Handle usage-delta event: update cumulative usage for active stream + * Handle usage-delta event: update usage tracking for active stream */ handleUsageDelta(data: UsageDeltaEvent): void { + // Store last step's usage for context window display this.activeStreamStepUsage.set(data.messageId, data.usage); + // Store cumulative usage for cost display + this.activeStreamCumulativeUsage.set(data.messageId, data.cumulativeUsage); + // Store cumulative provider metadata for live cost display (with cache creation tokens) + if (data.cumulativeProviderMetadata) { + this.activeStreamCumulativeProviderMetadata.set( + data.messageId, + data.cumulativeProviderMetadata + ); + } } /** - * Get active stream usage for a message (if streaming) + * Get active stream usage for context window display (last step's inputTokens = context size) */ getActiveStreamUsage(messageId: string): LanguageModelV2Usage | undefined { return this.activeStreamStepUsage.get(messageId); } + + /** + * Get active stream cumulative usage for cost display (sum of all steps) + */ + getActiveStreamCumulativeUsage(messageId: string): LanguageModelV2Usage | undefined { + return this.activeStreamCumulativeUsage.get(messageId); + } + + /** + * Get cumulative provider metadata for cost display (with accumulated cache creation tokens) + */ + getActiveStreamCumulativeProviderMetadata( + messageId: string + ): Record | undefined { + return this.activeStreamCumulativeProviderMetadata.get(messageId); + } } diff --git a/src/common/types/message.ts b/src/common/types/message.ts index cfb11bea7a..fb54622d61 100644 --- a/src/common/types/message.ts +++ b/src/common/types/message.ts @@ -37,8 +37,14 @@ export interface MuxMetadata { duration?: number; timestamp?: number; model?: string; - usage?: LanguageModelV2Usage; // AI SDK normalized usage (verbatim from streamResult.usage) - providerMetadata?: Record; // Raw AI SDK provider data + // Total usage across all steps (for cost calculation) + usage?: LanguageModelV2Usage; + // Last step's usage only (for context window display - inputTokens = current context size) + contextUsage?: LanguageModelV2Usage; + // Aggregated provider metadata across all steps (for cost calculation) + providerMetadata?: Record; + // Last step's provider metadata (for context window cache display) + contextProviderMetadata?: Record; systemMessageTokens?: number; // Token count for system message sent with this request (calculated by AIService) partial?: boolean; // Whether this message was interrupted and is incomplete synthetic?: boolean; // Whether this message was synthetically generated (e.g., [CONTINUE] sentinel) diff --git a/src/common/types/stream.ts b/src/common/types/stream.ts index e667f7a7e0..306abfc7f8 100644 --- a/src/common/types/stream.ts +++ b/src/common/types/stream.ts @@ -36,8 +36,14 @@ export interface StreamEndEvent { // Structured metadata from backend - directly mergeable with MuxMetadata metadata: { model: string; + // Total usage across all steps (for cost calculation) usage?: LanguageModelV2Usage; + // Last step's usage only (for context window display - inputTokens = current context size) + contextUsage?: LanguageModelV2Usage; + // Aggregated provider metadata across all steps (for cost calculation) providerMetadata?: Record; + // Last step's provider metadata (for context window cache display) + contextProviderMetadata?: Record; duration?: number; systemMessageTokens?: number; historySequence?: number; // Present when loading from history @@ -129,7 +135,12 @@ export interface UsageDeltaEvent { type: "usage-delta"; workspaceId: string; messageId: string; - usage: LanguageModelV2Usage; // This step's usage (inputTokens = full context) + // This step's usage (inputTokens = current context size, for context window display) + usage: LanguageModelV2Usage; + // Cumulative usage across all steps so far (for live cost display) + cumulativeUsage: LanguageModelV2Usage; + // Cumulative provider metadata across all steps (for live cost display with cache tokens) + cumulativeProviderMetadata?: Record; } export type AIServiceEvent = diff --git a/src/common/utils/tokens/displayUsage.test.ts b/src/common/utils/tokens/displayUsage.test.ts index b5e98e7d7a..9031cec55a 100644 --- a/src/common/utils/tokens/displayUsage.test.ts +++ b/src/common/utils/tokens/displayUsage.test.ts @@ -1,5 +1,5 @@ import { describe, test, expect } from "bun:test"; -import { collectUsageHistory } from "./displayUsage"; +import { collectUsageHistory, createDisplayUsage } from "./displayUsage"; import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import type { ChatUsageDisplay } from "./usageAggregator"; @@ -146,3 +146,134 @@ describe("collectUsageHistory", () => { expect(result[1].model).toBe("model-2"); }); }); + +describe("createDisplayUsage", () => { + describe("Provider-specific cached token handling", () => { + // OpenAI reports inputTokens INCLUSIVE of cachedInputTokens + // We must subtract cached from input to avoid double-counting + const openAIUsage: LanguageModelV2Usage = { + inputTokens: 108200, // Includes 71600 cached + outputTokens: 227, + totalTokens: 108427, + cachedInputTokens: 71600, + }; + + test("subtracts cached tokens for direct OpenAI model", () => { + const result = createDisplayUsage(openAIUsage, "openai:gpt-5.1"); + + expect(result).toBeDefined(); + expect(result!.cached.tokens).toBe(71600); + // Input should be raw minus cached: 108200 - 71600 = 36600 + expect(result!.input.tokens).toBe(36600); + }); + + test("subtracts cached tokens for gateway OpenAI model", () => { + // Gateway format: mux-gateway:openai/model-name + const result = createDisplayUsage(openAIUsage, "mux-gateway:openai/gpt-5.1"); + + expect(result).toBeDefined(); + expect(result!.cached.tokens).toBe(71600); + // Should also subtract: 108200 - 71600 = 36600 + expect(result!.input.tokens).toBe(36600); + }); + + test("does NOT subtract cached tokens for Anthropic model", () => { + // Anthropic reports inputTokens EXCLUDING cachedInputTokens + const anthropicUsage: LanguageModelV2Usage = { + inputTokens: 36600, // Already excludes cached + outputTokens: 227, + totalTokens: 108427, + cachedInputTokens: 71600, + }; + + const result = createDisplayUsage(anthropicUsage, "anthropic:claude-sonnet-4-5"); + + expect(result).toBeDefined(); + expect(result!.cached.tokens).toBe(71600); + // Input stays as-is for Anthropic + expect(result!.input.tokens).toBe(36600); + }); + + test("does NOT subtract cached tokens for gateway Anthropic model", () => { + const anthropicUsage: LanguageModelV2Usage = { + inputTokens: 36600, + outputTokens: 227, + totalTokens: 108427, + cachedInputTokens: 71600, + }; + + const result = createDisplayUsage(anthropicUsage, "mux-gateway:anthropic/claude-sonnet-4-5"); + + expect(result).toBeDefined(); + expect(result!.cached.tokens).toBe(71600); + // Input stays as-is for gateway Anthropic + expect(result!.input.tokens).toBe(36600); + }); + + test("subtracts cached tokens for direct Google model", () => { + // Google also reports inputTokens INCLUSIVE of cachedInputTokens + const googleUsage: LanguageModelV2Usage = { + inputTokens: 74300, // Includes 42600 cached + outputTokens: 1600, + totalTokens: 75900, + cachedInputTokens: 42600, + }; + + const result = createDisplayUsage(googleUsage, "google:gemini-3-pro-preview"); + + expect(result).toBeDefined(); + expect(result!.cached.tokens).toBe(42600); + // Input should be raw minus cached: 74300 - 42600 = 31700 + expect(result!.input.tokens).toBe(31700); + }); + + test("subtracts cached tokens for gateway Google model", () => { + const googleUsage: LanguageModelV2Usage = { + inputTokens: 74300, + outputTokens: 1600, + totalTokens: 75900, + cachedInputTokens: 42600, + }; + + const result = createDisplayUsage(googleUsage, "mux-gateway:google/gemini-3-pro-preview"); + + expect(result).toBeDefined(); + expect(result!.cached.tokens).toBe(42600); + // Should also subtract: 74300 - 42600 = 31700 + expect(result!.input.tokens).toBe(31700); + }); + }); + + test("returns undefined for undefined usage", () => { + expect(createDisplayUsage(undefined, "openai:gpt-5.1")).toBeUndefined(); + }); + + test("handles zero cached tokens", () => { + const usage: LanguageModelV2Usage = { + inputTokens: 1000, + outputTokens: 500, + totalTokens: 1500, + cachedInputTokens: 0, + }; + + const result = createDisplayUsage(usage, "openai:gpt-5.1"); + + expect(result).toBeDefined(); + expect(result!.input.tokens).toBe(1000); + expect(result!.cached.tokens).toBe(0); + }); + + test("handles missing cachedInputTokens field", () => { + const usage: LanguageModelV2Usage = { + inputTokens: 1000, + outputTokens: 500, + totalTokens: 1500, + }; + + const result = createDisplayUsage(usage, "openai:gpt-5.1"); + + expect(result).toBeDefined(); + expect(result!.input.tokens).toBe(1000); + expect(result!.cached.tokens).toBe(0); + }); +}); diff --git a/src/common/utils/tokens/displayUsage.ts b/src/common/utils/tokens/displayUsage.ts index 886548fd1a..db308cccfc 100644 --- a/src/common/utils/tokens/displayUsage.ts +++ b/src/common/utils/tokens/displayUsage.ts @@ -9,6 +9,7 @@ import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import { getModelStats } from "./modelStats"; import type { ChatUsageDisplay } from "./usageAggregator"; import type { MuxMessage } from "@/common/types/message"; +import { normalizeGatewayModel } from "../ai/models"; /** * Create a display-friendly usage object from AI SDK usage @@ -29,11 +30,19 @@ export function createDisplayUsage( const cachedTokens = usage.cachedInputTokens ?? 0; const rawInputTokens = usage.inputTokens ?? 0; - // Detect provider from model string - const isOpenAI = model.startsWith("openai:"); + // Normalize gateway models (e.g., "mux-gateway:openai/gpt-5.1" → "openai:gpt-5.1") + // before detecting provider, so gateway-routed requests get correct handling + const normalizedModel = normalizeGatewayModel(model); - // For OpenAI, subtract cached tokens to get uncached input tokens - const inputTokens = isOpenAI ? Math.max(0, rawInputTokens - cachedTokens) : rawInputTokens; + // Detect provider from normalized model string + const isOpenAI = normalizedModel.startsWith("openai:"); + const isGoogle = normalizedModel.startsWith("google:"); + + // OpenAI and Google report inputTokens INCLUSIVE of cachedInputTokens + // Anthropic reports them separately (inputTokens EXCLUDES cached) + // Subtract cached tokens for providers that include them to avoid double-counting + const inputTokens = + isOpenAI || isGoogle ? Math.max(0, rawInputTokens - cachedTokens) : rawInputTokens; // Extract cache creation tokens from provider metadata (Anthropic-specific) const cacheCreateTokens = @@ -92,6 +101,10 @@ export function createDisplayUsage( }; } +/** + * Collect usage history for cost calculation. + * Uses totalUsage (sum of all steps) for accurate cost reporting. + */ export function collectUsageHistory( messages: MuxMessage[], fallbackModel?: string @@ -108,7 +121,7 @@ export function collectUsageHistory( cumulativeHistorical = msg.metadata.historicalUsage; } - // Extract current message's usage + // Extract current message's usage (total across all steps) if (msg.metadata?.usage) { // Use the model from this specific message (not global) const model = msg.metadata.model ?? fallbackModel ?? "unknown"; diff --git a/src/common/utils/tokens/usageHelpers.test.ts b/src/common/utils/tokens/usageHelpers.test.ts new file mode 100644 index 0000000000..3e027a003f --- /dev/null +++ b/src/common/utils/tokens/usageHelpers.test.ts @@ -0,0 +1,245 @@ +import { describe, test, expect } from "bun:test"; +import { addUsage, accumulateProviderMetadata } from "./usageHelpers"; +import type { LanguageModelV2Usage } from "@ai-sdk/provider"; + +describe("addUsage", () => { + test("sums all fields when both arguments have values", () => { + const a: LanguageModelV2Usage = { + inputTokens: 100, + outputTokens: 50, + totalTokens: 150, + cachedInputTokens: 20, + reasoningTokens: 10, + }; + const b: LanguageModelV2Usage = { + inputTokens: 200, + outputTokens: 100, + totalTokens: 300, + cachedInputTokens: 30, + reasoningTokens: 15, + }; + + expect(addUsage(a, b)).toEqual({ + inputTokens: 300, + outputTokens: 150, + totalTokens: 450, + cachedInputTokens: 50, + reasoningTokens: 25, + }); + }); + + test("handles undefined first argument", () => { + const b: LanguageModelV2Usage = { + inputTokens: 100, + outputTokens: 50, + totalTokens: 150, + }; + + expect(addUsage(undefined, b)).toEqual({ + inputTokens: 100, + outputTokens: 50, + totalTokens: 150, + cachedInputTokens: 0, + reasoningTokens: 0, + }); + }); + + test("handles sparse usage objects (missing fields treated as 0)", () => { + // Simulating sparse SDK responses where not all fields are present + // Using Partial to represent incomplete usage data from the SDK + const a: Partial = { inputTokens: 100 }; + const b: Partial = { outputTokens: 50 }; + + expect(addUsage(a as LanguageModelV2Usage, b as LanguageModelV2Usage)).toEqual({ + inputTokens: 100, + outputTokens: 50, + totalTokens: 0, + cachedInputTokens: 0, + reasoningTokens: 0, + }); + }); + + test("handles zero values correctly", () => { + const a: LanguageModelV2Usage = { + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + cachedInputTokens: 0, + reasoningTokens: 0, + }; + const b: LanguageModelV2Usage = { + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + cachedInputTokens: 0, + reasoningTokens: 0, + }; + + expect(addUsage(a, b)).toEqual({ + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + cachedInputTokens: 0, + reasoningTokens: 0, + }); + }); + + test("accumulates across multiple calls (simulating multi-step)", () => { + let cumulative: LanguageModelV2Usage | undefined = undefined; + + // Step 1 + cumulative = addUsage(cumulative, { inputTokens: 1000, outputTokens: 100, totalTokens: 1100 }); + expect(cumulative.inputTokens).toBe(1000); + expect(cumulative.outputTokens).toBe(100); + + // Step 2 + cumulative = addUsage(cumulative, { inputTokens: 1200, outputTokens: 150, totalTokens: 1350 }); + expect(cumulative.inputTokens).toBe(2200); + expect(cumulative.outputTokens).toBe(250); + + // Step 3 + cumulative = addUsage(cumulative, { inputTokens: 1500, outputTokens: 200, totalTokens: 1700 }); + expect(cumulative.inputTokens).toBe(3700); + expect(cumulative.outputTokens).toBe(450); + }); +}); + +describe("accumulateProviderMetadata", () => { + test("returns undefined when both arguments are undefined", () => { + expect(accumulateProviderMetadata(undefined, undefined)).toBeUndefined(); + }); + + test("returns existing when step is undefined", () => { + const existing = { anthropic: { cacheCreationInputTokens: 100 } }; + expect(accumulateProviderMetadata(existing, undefined)).toBe(existing); + }); + + test("returns step when existing is undefined", () => { + const step = { anthropic: { cacheCreationInputTokens: 50 } }; + expect(accumulateProviderMetadata(undefined, step)).toBe(step); + }); + + test("returns step when neither has cache creation tokens", () => { + const existing = { anthropic: { cacheReadInputTokens: 100 } }; + const step = { anthropic: { cacheReadInputTokens: 200 } }; + expect(accumulateProviderMetadata(existing, step)).toBe(step); + }); + + test("sums cache creation tokens when both have them", () => { + const existing = { anthropic: { cacheCreationInputTokens: 100 } }; + const step = { anthropic: { cacheCreationInputTokens: 50 } }; + + const result = accumulateProviderMetadata(existing, step); + expect(result).toEqual({ + anthropic: { cacheCreationInputTokens: 150 }, + }); + }); + + test("preserves step cache tokens when existing has none", () => { + const existing = { anthropic: { cacheReadInputTokens: 100 } }; + const step = { anthropic: { cacheCreationInputTokens: 50, cacheReadInputTokens: 200 } }; + + const result = accumulateProviderMetadata(existing, step); + expect(result).toEqual({ + anthropic: { cacheCreationInputTokens: 50, cacheReadInputTokens: 200 }, + }); + }); + + test("preserves other anthropic fields when merging", () => { + const existing = { anthropic: { cacheCreationInputTokens: 100 } }; + const step = { + anthropic: { + cacheCreationInputTokens: 50, + cacheReadInputTokens: 200, + modelId: "claude-sonnet-4-5", + }, + }; + + const result = accumulateProviderMetadata(existing, step); + expect(result).toEqual({ + anthropic: { + cacheCreationInputTokens: 150, + cacheReadInputTokens: 200, + modelId: "claude-sonnet-4-5", + }, + }); + }); + + test("handles non-anthropic providers (returns step as-is when no cache tokens)", () => { + const existing = { openai: { reasoningTokens: 100 } }; + const step = { openai: { reasoningTokens: 200 } }; + + // No cache creation tokens, so returns step + expect(accumulateProviderMetadata(existing, step)).toBe(step); + }); + + test("preserves non-anthropic provider fields alongside anthropic", () => { + const existing = { + anthropic: { cacheCreationInputTokens: 100 }, + openai: { reasoningTokens: 50 }, + }; + const step = { + anthropic: { cacheCreationInputTokens: 50, cacheReadInputTokens: 200 }, + openai: { reasoningTokens: 100 }, + }; + + const result = accumulateProviderMetadata(existing, step); + expect(result).toEqual({ + anthropic: { cacheCreationInputTokens: 150, cacheReadInputTokens: 200 }, + openai: { reasoningTokens: 100 }, // From step, not accumulated + }); + }); + + test("accumulates across multiple steps (simulating multi-step tool calls)", () => { + let cumulative: Record | undefined = undefined; + + // Step 1: Initial cache creation + cumulative = accumulateProviderMetadata(cumulative, { + anthropic: { cacheCreationInputTokens: 1000, cacheReadInputTokens: 0 }, + }); + expect( + (cumulative?.anthropic as { cacheCreationInputTokens: number }).cacheCreationInputTokens + ).toBe(1000); + + // Step 2: More cache creation + cumulative = accumulateProviderMetadata(cumulative, { + anthropic: { cacheCreationInputTokens: 500, cacheReadInputTokens: 800 }, + }); + expect( + (cumulative?.anthropic as { cacheCreationInputTokens: number }).cacheCreationInputTokens + ).toBe(1500); + + // Step 3: No cache creation (reading from cache) + cumulative = accumulateProviderMetadata(cumulative, { + anthropic: { cacheCreationInputTokens: 0, cacheReadInputTokens: 1200 }, + }); + // Total should still be 1500 (0 + existing 1500) + expect( + (cumulative?.anthropic as { cacheCreationInputTokens: number }).cacheCreationInputTokens + ).toBe(1500); + }); + + test("handles missing anthropic field in existing", () => { + const existing = { someOtherProvider: { field: "value" } }; + const step = { anthropic: { cacheCreationInputTokens: 50 } }; + + const result = accumulateProviderMetadata(existing, step); + expect(result).toEqual({ + anthropic: { cacheCreationInputTokens: 50 }, + }); + }); + + test("handles missing anthropic field in step (returns step)", () => { + const existing = { anthropic: { cacheCreationInputTokens: 100 } }; + const step = { someOtherProvider: { field: "value" } }; + + // No cache creation in step means total is 100 (from existing) + // But step has no anthropic, so stepCacheCreate=0, existingCacheCreate=100 + // total=100, which is > 0, so we merge + const result = accumulateProviderMetadata(existing, step); + expect(result).toEqual({ + someOtherProvider: { field: "value" }, + anthropic: { cacheCreationInputTokens: 100 }, + }); + }); +}); diff --git a/src/common/utils/tokens/usageHelpers.ts b/src/common/utils/tokens/usageHelpers.ts new file mode 100644 index 0000000000..d8ad3b5aab --- /dev/null +++ b/src/common/utils/tokens/usageHelpers.ts @@ -0,0 +1,65 @@ +/** + * Helper functions for accumulating usage and provider metadata across multi-step tool calls. + * + * For multi-step tool calls, the AI SDK reports usage per-step. We need to: + * - Sum usage across all steps for cost calculation + * - Track last step's usage for context window display (inputTokens = actual context size) + * - Accumulate provider-specific metadata (e.g., Anthropic cache creation tokens) + */ + +import type { LanguageModelV2Usage } from "@ai-sdk/provider"; + +/** + * Add two LanguageModelV2Usage values together. + * Handles undefined first argument and undefined fields within usage objects. + */ +export function addUsage( + a: LanguageModelV2Usage | undefined, + b: LanguageModelV2Usage +): LanguageModelV2Usage { + return { + inputTokens: (a?.inputTokens ?? 0) + (b.inputTokens ?? 0), + outputTokens: (a?.outputTokens ?? 0) + (b.outputTokens ?? 0), + totalTokens: (a?.totalTokens ?? 0) + (b.totalTokens ?? 0), + cachedInputTokens: (a?.cachedInputTokens ?? 0) + (b.cachedInputTokens ?? 0), + reasoningTokens: (a?.reasoningTokens ?? 0) + (b.reasoningTokens ?? 0), + }; +} + +/** + * Accumulate provider metadata across steps, specifically for cache creation tokens. + * + * For Anthropic, cache creation tokens are reported per-step and need to be summed. + * Other provider metadata is taken from the latest step. + */ +export function accumulateProviderMetadata( + existing: Record | undefined, + step: Record | undefined +): Record | undefined { + if (!step) return existing; + if (!existing) return step; + + // Extract cache creation tokens from both + const existingCacheCreate = + (existing.anthropic as { cacheCreationInputTokens?: number } | undefined) + ?.cacheCreationInputTokens ?? 0; + const stepCacheCreate = + (step.anthropic as { cacheCreationInputTokens?: number } | undefined) + ?.cacheCreationInputTokens ?? 0; + + const totalCacheCreate = existingCacheCreate + stepCacheCreate; + + // If no cache creation tokens to aggregate, just return step's metadata + if (totalCacheCreate === 0) { + return step; + } + + // Merge with accumulated cache creation tokens + return { + ...step, + anthropic: { + ...(step.anthropic as Record | undefined), + cacheCreationInputTokens: totalCacheCreate, + }, + }; +} diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index c09d2ec716..c6912c1c51 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -28,6 +28,7 @@ import type { SendMessageError, StreamErrorType } from "@/common/types/errors"; import type { MuxMetadata, MuxMessage } from "@/common/types/message"; import type { PartialService } from "./partialService"; import type { HistoryService } from "./historyService"; +import { addUsage, accumulateProviderMetadata } from "@/common/utils/tokens/usageHelpers"; import { AsyncMutex } from "@/node/utils/concurrency/asyncMutex"; import type { ToolPolicy } from "@/common/utils/tools/toolPolicy"; import { StreamingTokenTracker } from "@/node/utils/main/StreamingTokenTracker"; @@ -116,6 +117,14 @@ interface WorkspaceStreamInfo { runtimeTempDir: string; // Runtime for temp directory cleanup runtime: Runtime; + // Cumulative usage across all steps (for live cost display during streaming) + cumulativeUsage: LanguageModelV2Usage; + // Cumulative provider metadata across all steps (for live cost display with cache tokens) + cumulativeProviderMetadata?: Record; + // Last step's usage (for context window display during streaming) + lastStepUsage?: LanguageModelV2Usage; + // Last step's provider metadata (for context window cache display) + lastStepProviderMetadata?: Record; } /** @@ -326,8 +335,10 @@ export class StreamManager extends EventEmitter { let usage = undefined; try { // Race usage retrieval against timeout to prevent hanging on abort + // CRITICAL: Use totalUsage (sum of all steps) not usage (last step only) + // For multi-step tool calls, usage would severely undercount actual token consumption usage = await Promise.race([ - streamInfo.streamResult.usage, + streamInfo.streamResult.totalUsage, new Promise((resolve) => setTimeout(() => resolve(undefined), timeoutMs)), ]); } catch (error) { @@ -340,6 +351,66 @@ export class StreamManager extends EventEmitter { }; } + /** + * Aggregate provider metadata across all steps. + * + * CRITICAL: For multi-step tool calls, cache creation tokens are reported per-step. + * streamResult.providerMetadata only contains the LAST step's metadata, missing + * cache creation tokens from earlier steps. We must sum across all steps. + */ + private async getAggregatedProviderMetadata( + streamInfo: WorkspaceStreamInfo, + timeoutMs = 1000 + ): Promise | undefined> { + try { + const steps = await Promise.race([ + streamInfo.streamResult.steps, + new Promise((resolve) => setTimeout(() => resolve(undefined), timeoutMs)), + ]); + + if (!steps || steps.length === 0) { + // Fall back to last step's provider metadata + return await streamInfo.streamResult.providerMetadata; + } + + // If only one step, no aggregation needed + if (steps.length === 1) { + return steps[0].providerMetadata; + } + + // Aggregate cache creation tokens across all steps + let totalCacheCreationTokens = 0; + let lastStepMetadata: Record | undefined; + + for (const step of steps) { + lastStepMetadata = step.providerMetadata; + const anthropicMeta = step.providerMetadata?.anthropic as + | { cacheCreationInputTokens?: number } + | undefined; + if (anthropicMeta?.cacheCreationInputTokens) { + totalCacheCreationTokens += anthropicMeta.cacheCreationInputTokens; + } + } + + // If no cache creation tokens found, just return last step's metadata + if (totalCacheCreationTokens === 0) { + return lastStepMetadata; + } + + // Merge aggregated cache creation tokens into the last step's metadata + return { + ...lastStepMetadata, + anthropic: { + ...(lastStepMetadata?.anthropic as Record | undefined), + cacheCreationInputTokens: totalCacheCreationTokens, + }, + }; + } catch (error) { + log.debug("Could not aggregate provider metadata:", error); + return undefined; + } + } + /** * Safely cancels an existing stream with proper cleanup * @@ -557,6 +628,9 @@ export class StreamManager extends EventEmitter { processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream runtimeTempDir, // Stream-scoped temp directory for tool outputs runtime, // Runtime for temp directory cleanup + // Initialize cumulative tracking for multi-step streams + cumulativeUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + cumulativeProviderMetadata: undefined, }; // Atomically register the stream @@ -865,13 +939,27 @@ export class StreamManager extends EventEmitter { const finishStepPart = part as { type: "finish-step"; usage: LanguageModelV2Usage; + providerMetadata?: Record; }; + // Update cumulative totals for this stream + streamInfo.cumulativeUsage = addUsage(streamInfo.cumulativeUsage, finishStepPart.usage); + streamInfo.cumulativeProviderMetadata = accumulateProviderMetadata( + streamInfo.cumulativeProviderMetadata, + finishStepPart.providerMetadata + ); + + // Track last step's data for context window display + streamInfo.lastStepUsage = finishStepPart.usage; + streamInfo.lastStepProviderMetadata = finishStepPart.providerMetadata; + const usageEvent: UsageDeltaEvent = { type: "usage-delta", workspaceId: workspaceId as string, messageId: streamInfo.messageId, - usage: finishStepPart.usage, + usage: finishStepPart.usage, // For context window display + cumulativeUsage: streamInfo.cumulativeUsage, // For live cost display + cumulativeProviderMetadata: streamInfo.cumulativeProviderMetadata, // For live cache costs }; this.emit("usage-delta", usageEvent); break; @@ -896,8 +984,15 @@ export class StreamManager extends EventEmitter { // Check if stream completed successfully if (!streamInfo.abortController.signal.aborted) { // Get usage, duration, and provider metadata from stream result + // CRITICAL: Use totalUsage (via getStreamMetadata) and aggregated providerMetadata + // to correctly account for all steps in multi-tool-call conversations const { usage, duration } = await this.getStreamMetadata(streamInfo); - const providerMetadata = await streamInfo.streamResult.providerMetadata; + const providerMetadata = await this.getAggregatedProviderMetadata(streamInfo); + + // For context window display, use last step's usage (inputTokens = current context size) + // This is stored in streamInfo during finish-step handling + const contextUsage = streamInfo.lastStepUsage; + const contextProviderMetadata = streamInfo.lastStepProviderMetadata; // Emit stream end event with parts preserved in temporal order const streamEndEvent: StreamEndEvent = { @@ -907,8 +1002,10 @@ export class StreamManager extends EventEmitter { metadata: { ...streamInfo.initialMetadata, // AIService-provided metadata (systemMessageTokens, etc) model: streamInfo.model, - usage, // AI SDK normalized usage - providerMetadata, // Raw provider metadata + usage, // Total across all steps (for cost calculation) + contextUsage, // Last step only (for context window display) + providerMetadata, // Aggregated (for cost calculation) + contextProviderMetadata, // Last step (for context window display) duration, }, parts: streamInfo.parts, // Parts array with temporal ordering (includes reasoning)