From fd8a296e92e5be53919ff3bc7490a26c0d946201 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 3 Dec 2025 14:54:44 +1100 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=A4=96=20fix:=20include=20step=20prov?= =?UTF-8?q?iderMetadata=20in=20liveUsage=20for=20cache=20creation=20displa?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During streaming, liveUsage (context window display) showed 0 cache creation tokens because it wasn't receiving providerMetadata. The cache creation token count is Anthropic-specific and only available in providerMetadata.anthropic. cacheCreationInputTokens, not in the standard LanguageModelV2Usage. Thread step-level providerMetadata from backend through IPC to frontend, mirroring the existing pattern for cumulative provider metadata. _Generated with mux_ --- src/browser/stores/WorkspaceStore.ts | 7 ++++++- .../utils/messages/StreamingMessageAggregator.ts | 14 ++++++++++++++ src/common/types/stream.ts | 2 ++ src/node/services/streamManager.ts | 1 + 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index c7b2c86c8b..64cf82a990 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -493,8 +493,13 @@ export class WorkspaceStore { const rawContextUsage = activeStreamId ? aggregator.getActiveStreamUsage(activeStreamId) : undefined; + const rawStepProviderMetadata = activeStreamId + ? aggregator.getActiveStreamStepProviderMetadata(activeStreamId) + : undefined; const liveUsage = - rawContextUsage && model ? createDisplayUsage(rawContextUsage, model) : undefined; + rawContextUsage && model + ? createDisplayUsage(rawContextUsage, model, rawStepProviderMetadata) + : undefined; // Live cost usage (cumulative across all steps, with accumulated cache creation tokens) const rawCumulativeUsage = activeStreamId diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 03fe5c0b2f..7e73e793e6 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -78,6 +78,8 @@ export class StreamingMessageAggregator { // Active stream step usage (updated on each stream-step event) // Tracks last step's usage for context window display private activeStreamStepUsage = new Map(); + // Tracks step provider metadata for context window cache display + private activeStreamStepProviderMetadata = new Map>(); // Tracks cumulative usage across all steps for live cost display private activeStreamCumulativeUsage = new Map(); // Tracks cumulative provider metadata for live cost display (with cache creation tokens) @@ -1061,6 +1063,7 @@ export class StreamingMessageAggregator { clearTokenState(messageId: string): void { this.deltaHistory.delete(messageId); this.activeStreamStepUsage.delete(messageId); + this.activeStreamStepProviderMetadata.delete(messageId); this.activeStreamCumulativeUsage.delete(messageId); this.activeStreamCumulativeProviderMetadata.delete(messageId); } @@ -1071,6 +1074,10 @@ export class StreamingMessageAggregator { handleUsageDelta(data: UsageDeltaEvent): void { // Store last step's usage for context window display this.activeStreamStepUsage.set(data.messageId, data.usage); + // Store step provider metadata for context window cache display + if (data.providerMetadata) { + this.activeStreamStepProviderMetadata.set(data.messageId, data.providerMetadata); + } // Store cumulative usage for cost display this.activeStreamCumulativeUsage.set(data.messageId, data.cumulativeUsage); // Store cumulative provider metadata for live cost display (with cache creation tokens) @@ -1104,4 +1111,11 @@ export class StreamingMessageAggregator { ): Record | undefined { return this.activeStreamCumulativeProviderMetadata.get(messageId); } + + /** + * Get step provider metadata for context window cache display + */ + getActiveStreamStepProviderMetadata(messageId: string): Record | undefined { + return this.activeStreamStepProviderMetadata.get(messageId); + } } diff --git a/src/common/types/stream.ts b/src/common/types/stream.ts index 6407ced589..db1d59cbc9 100644 --- a/src/common/types/stream.ts +++ b/src/common/types/stream.ts @@ -148,6 +148,8 @@ export interface UsageDeltaEvent { cumulativeUsage: LanguageModelV2Usage; // Cumulative provider metadata across all steps (for live cost display with cache tokens) cumulativeProviderMetadata?: Record; + // This step's provider metadata (for context window cache display) + providerMetadata?: Record; } export type AIServiceEvent = diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index cae73395d0..7500cd21db 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -972,6 +972,7 @@ export class StreamManager extends EventEmitter { usage: finishStepPart.usage, // For context window display cumulativeUsage: streamInfo.cumulativeUsage, // For live cost display cumulativeProviderMetadata: streamInfo.cumulativeProviderMetadata, // For live cache costs + providerMetadata: finishStepPart.providerMetadata, // For context window cache display }; this.emit("usage-delta", usageEvent); break; From 038b236c3161a1db3b3978d1f7e329921dfe5e91 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 3 Dec 2025 14:59:54 +1100 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A4=96=20test:=20add=20regression=20t?= =?UTF-8?q?ests=20for=20step=20providerMetadata=20and=20cache=20creation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover the fix to prevent regression: - StreamingMessageAggregator: test step providerMetadata storage/retrieval/clear - displayUsage: test cacheCreationInputTokens extraction from providerMetadata _Generated with mux_ --- scripts/bump_tag.sh | 4 +- .../StreamingMessageAggregator.test.ts | 37 ++++++++++ .../messages/StreamingMessageAggregator.ts | 67 ++++++++----------- src/common/types/stream.ts | 10 +-- src/common/utils/tokens/displayUsage.test.ts | 64 ++++++++++++++++++ src/node/services/streamManager.ts | 10 +-- 6 files changed, 143 insertions(+), 49 deletions(-) diff --git a/scripts/bump_tag.sh b/scripts/bump_tag.sh index 069cf9fddd..db16e557a0 100755 --- a/scripts/bump_tag.sh +++ b/scripts/bump_tag.sh @@ -18,7 +18,7 @@ if [[ -z "$CURRENT_VERSION" || "$CURRENT_VERSION" == "null" ]]; then fi # Parse semver components -IFS='.' read -r MAJOR MINOR_V PATCH <<< "$CURRENT_VERSION" +IFS='.' read -r MAJOR MINOR_V PATCH <<<"$CURRENT_VERSION" # Calculate new version if [[ "$MINOR" == "true" ]]; then @@ -30,7 +30,7 @@ fi echo "Bumping version: $CURRENT_VERSION -> $NEW_VERSION" # Update package.json -jq --arg v "$NEW_VERSION" '.version = $v' package.json > package.json.tmp +jq --arg v "$NEW_VERSION" '.version = $v' package.json >package.json.tmp mv package.json.tmp package.json # Commit and tag diff --git a/src/browser/utils/messages/StreamingMessageAggregator.test.ts b/src/browser/utils/messages/StreamingMessageAggregator.test.ts index 2e239a626f..4477c3b08f 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.test.ts @@ -517,6 +517,40 @@ describe("StreamingMessageAggregator", () => { expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeUndefined(); }); + test("stores and retrieves step providerMetadata for cache creation display", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + providerMetadata: { + anthropic: { cacheCreationInputTokens: 800 }, + }, + }); + + expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toEqual({ + anthropic: { cacheCreationInputTokens: 800 }, + }); + }); + + test("step providerMetadata is undefined when not provided", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + aggregator.handleUsageDelta({ + type: "usage-delta", + workspaceId: "ws-1", + messageId: "msg-1", + usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + // No providerMetadata + }); + + expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toBeUndefined(); + }); + test("clearTokenState clears all usage tracking (step, cumulative, metadata)", () => { const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); @@ -526,11 +560,13 @@ describe("StreamingMessageAggregator", () => { messageId: "msg-1", usage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, cumulativeUsage: { inputTokens: 1000, outputTokens: 50, totalTokens: 1050 }, + providerMetadata: { anthropic: { cacheCreationInputTokens: 300 } }, cumulativeProviderMetadata: { anthropic: { cacheCreationInputTokens: 500 } }, }); // All should be defined expect(aggregator.getActiveStreamUsage("msg-1")).toBeDefined(); + expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toBeDefined(); expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toBeDefined(); expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeDefined(); @@ -538,6 +574,7 @@ describe("StreamingMessageAggregator", () => { // All should be cleared expect(aggregator.getActiveStreamUsage("msg-1")).toBeUndefined(); + expect(aggregator.getActiveStreamStepProviderMetadata("msg-1")).toBeUndefined(); expect(aggregator.getActiveStreamCumulativeUsage("msg-1")).toBeUndefined(); expect(aggregator.getActiveStreamCumulativeProviderMetadata("msg-1")).toBeUndefined(); }); diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 7e73e793e6..3d2ba48f19 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -75,15 +75,17 @@ export class StreamingMessageAggregator { // Delta history for token counting and TPS calculation private deltaHistory = new Map(); - // Active stream step usage (updated on each stream-step event) - // Tracks last step's usage for context window display - private activeStreamStepUsage = new Map(); - // Tracks step provider metadata for context window cache display - private activeStreamStepProviderMetadata = new Map>(); - // Tracks cumulative usage across all steps for live cost display - private activeStreamCumulativeUsage = new Map(); - // Tracks cumulative provider metadata for live cost display (with cache creation tokens) - private activeStreamCumulativeProviderMetadata = new Map>(); + // Active stream usage tracking (updated on each usage-delta event) + // Consolidates step-level (context window) and cumulative (cost) usage by messageId + private activeStreamUsage = new Map< + string, + { + // Step-level: this step only (for context window display) + step: { usage: LanguageModelV2Usage; providerMetadata?: Record }; + // Cumulative: sum across all steps (for live cost display) + cumulative: { usage: LanguageModelV2Usage; providerMetadata?: Record }; + } + >(); // Current TODO list (updated when todo_write succeeds, cleared on stream end) // Stream-scoped: automatically reset when stream completes @@ -1062,45 +1064,41 @@ export class StreamingMessageAggregator { */ clearTokenState(messageId: string): void { this.deltaHistory.delete(messageId); - this.activeStreamStepUsage.delete(messageId); - this.activeStreamStepProviderMetadata.delete(messageId); - this.activeStreamCumulativeUsage.delete(messageId); - this.activeStreamCumulativeProviderMetadata.delete(messageId); + this.activeStreamUsage.delete(messageId); } /** * Handle usage-delta event: update usage tracking for active stream */ handleUsageDelta(data: UsageDeltaEvent): void { - // Store last step's usage for context window display - this.activeStreamStepUsage.set(data.messageId, data.usage); - // Store step provider metadata for context window cache display - if (data.providerMetadata) { - this.activeStreamStepProviderMetadata.set(data.messageId, data.providerMetadata); - } - // Store cumulative usage for cost display - this.activeStreamCumulativeUsage.set(data.messageId, data.cumulativeUsage); - // Store cumulative provider metadata for live cost display (with cache creation tokens) - if (data.cumulativeProviderMetadata) { - this.activeStreamCumulativeProviderMetadata.set( - data.messageId, - data.cumulativeProviderMetadata - ); - } + this.activeStreamUsage.set(data.messageId, { + step: { usage: data.usage, providerMetadata: data.providerMetadata }, + cumulative: { + usage: data.cumulativeUsage, + providerMetadata: data.cumulativeProviderMetadata, + }, + }); } /** * Get active stream usage for context window display (last step's inputTokens = context size) */ getActiveStreamUsage(messageId: string): LanguageModelV2Usage | undefined { - return this.activeStreamStepUsage.get(messageId); + return this.activeStreamUsage.get(messageId)?.step.usage; + } + + /** + * Get step provider metadata for context window cache display + */ + getActiveStreamStepProviderMetadata(messageId: string): Record | undefined { + return this.activeStreamUsage.get(messageId)?.step.providerMetadata; } /** * Get active stream cumulative usage for cost display (sum of all steps) */ getActiveStreamCumulativeUsage(messageId: string): LanguageModelV2Usage | undefined { - return this.activeStreamCumulativeUsage.get(messageId); + return this.activeStreamUsage.get(messageId)?.cumulative.usage; } /** @@ -1109,13 +1107,6 @@ export class StreamingMessageAggregator { getActiveStreamCumulativeProviderMetadata( messageId: string ): Record | undefined { - return this.activeStreamCumulativeProviderMetadata.get(messageId); - } - - /** - * Get step provider metadata for context window cache display - */ - getActiveStreamStepProviderMetadata(messageId: string): Record | undefined { - return this.activeStreamStepProviderMetadata.get(messageId); + return this.activeStreamUsage.get(messageId)?.cumulative.providerMetadata; } } diff --git a/src/common/types/stream.ts b/src/common/types/stream.ts index db1d59cbc9..bedb9bebbf 100644 --- a/src/common/types/stream.ts +++ b/src/common/types/stream.ts @@ -142,14 +142,14 @@ export interface UsageDeltaEvent { type: "usage-delta"; workspaceId: string; messageId: string; - // This step's usage (inputTokens = current context size, for context window display) + + // Step-level: this step only (for context window display) usage: LanguageModelV2Usage; - // Cumulative usage across all steps so far (for live cost display) + providerMetadata?: Record; + + // Cumulative: sum across all steps (for live cost display) cumulativeUsage: LanguageModelV2Usage; - // Cumulative provider metadata across all steps (for live cost display with cache tokens) cumulativeProviderMetadata?: Record; - // This step's provider metadata (for context window cache display) - providerMetadata?: Record; } export type AIServiceEvent = diff --git a/src/common/utils/tokens/displayUsage.test.ts b/src/common/utils/tokens/displayUsage.test.ts index 9031cec55a..0dabf37ca7 100644 --- a/src/common/utils/tokens/displayUsage.test.ts +++ b/src/common/utils/tokens/displayUsage.test.ts @@ -276,4 +276,68 @@ describe("createDisplayUsage", () => { expect(result!.input.tokens).toBe(1000); expect(result!.cached.tokens).toBe(0); }); + + describe("Anthropic cache creation tokens from providerMetadata", () => { + // Cache creation tokens are Anthropic-specific and only available in + // providerMetadata.anthropic.cacheCreationInputTokens, not in LanguageModelV2Usage. + // This is critical for liveUsage display during streaming. + + test("extracts cacheCreationInputTokens from providerMetadata", () => { + const usage: LanguageModelV2Usage = { + inputTokens: 1000, + outputTokens: 50, + totalTokens: 1050, + }; + + const result = createDisplayUsage(usage, "anthropic:claude-sonnet-4-20250514", { + anthropic: { cacheCreationInputTokens: 800 }, + }); + + expect(result).toBeDefined(); + expect(result!.cacheCreate.tokens).toBe(800); + }); + + test("cacheCreate is 0 when providerMetadata is undefined", () => { + const usage: LanguageModelV2Usage = { + inputTokens: 1000, + outputTokens: 50, + totalTokens: 1050, + }; + + const result = createDisplayUsage(usage, "anthropic:claude-sonnet-4-20250514"); + + expect(result).toBeDefined(); + expect(result!.cacheCreate.tokens).toBe(0); + }); + + test("cacheCreate is 0 when anthropic metadata lacks cacheCreationInputTokens", () => { + const usage: LanguageModelV2Usage = { + inputTokens: 1000, + outputTokens: 50, + totalTokens: 1050, + }; + + const result = createDisplayUsage(usage, "anthropic:claude-sonnet-4-20250514", { + anthropic: { someOtherField: 123 }, + }); + + expect(result).toBeDefined(); + expect(result!.cacheCreate.tokens).toBe(0); + }); + + test("handles gateway Anthropic model with cache creation", () => { + const usage: LanguageModelV2Usage = { + inputTokens: 2000, + outputTokens: 100, + totalTokens: 2100, + }; + + const result = createDisplayUsage(usage, "mux-gateway:anthropic/claude-sonnet-4-5", { + anthropic: { cacheCreationInputTokens: 1500 }, + }); + + expect(result).toBeDefined(); + expect(result!.cacheCreate.tokens).toBe(1500); + }); + }); }); diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 7500cd21db..4fdd60add5 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -969,10 +969,12 @@ export class StreamManager extends EventEmitter { type: "usage-delta", workspaceId: workspaceId as string, messageId: streamInfo.messageId, - usage: finishStepPart.usage, // For context window display - cumulativeUsage: streamInfo.cumulativeUsage, // For live cost display - cumulativeProviderMetadata: streamInfo.cumulativeProviderMetadata, // For live cache costs - providerMetadata: finishStepPart.providerMetadata, // For context window cache display + // Step-level (for context window display) + usage: finishStepPart.usage, + providerMetadata: finishStepPart.providerMetadata, + // Cumulative (for live cost display) + cumulativeUsage: streamInfo.cumulativeUsage, + cumulativeProviderMetadata: streamInfo.cumulativeProviderMetadata, }; this.emit("usage-delta", usageEvent); break;