From 520a55a6796566bb12298737ad17d8739d139dd3 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 10 Dec 2025 17:36:00 +1100 Subject: [PATCH 1/6] feat: add session-usage.json for cumulative cost tracking Introduces SessionUsageService that persists per-model usage to ~/.mux/sessions/{workspaceId}/session-usage.json. Costs are accumulated on stream-end and immune to message deletion/truncation/editing. - SessionUsageService with recordUsage(), getSessionUsage(), rebuild - Frontend WorkspaceStore mirrors accumulation for instant UI updates - CostsTab displays sessionTotal from persisted data - workspace.getSessionUsage oRPC endpoint - Copies session-usage.json on workspace fork - Rebuilds from messages if file missing/corrupted --- .../components/RightSidebar/CostsTab.tsx | 20 +- .../contexts/WorkspaceContext.test.tsx | 1 + src/browser/stores/WorkspaceStore.test.ts | 3 + src/browser/stores/WorkspaceStore.ts | 111 ++++++--- .../compaction/autoCompactionCheck.test.ts | 39 ++-- src/cli/cli.test.ts | 1 + src/cli/server.test.ts | 1 + src/cli/server.ts | 1 + src/common/orpc/schemas.ts | 1 + src/common/orpc/schemas/api.ts | 10 +- src/common/orpc/schemas/chatStats.ts | 16 ++ src/desktop/main.ts | 1 + src/node/orpc/context.ts | 2 + src/node/orpc/router.ts | 10 +- src/node/services/aiService.ts | 6 +- src/node/services/serviceContainer.ts | 6 +- src/node/services/sessionUsageService.test.ts | 210 ++++++++++++++++++ src/node/services/sessionUsageService.ts | 201 +++++++++++++++++ src/node/services/streamManager.ts | 24 +- src/node/services/workspaceService.ts | 10 + tests/ipc/setup.ts | 1 + 21 files changed, 595 insertions(+), 80 deletions(-) create mode 100644 src/node/services/sessionUsageService.test.ts create mode 100644 src/node/services/sessionUsageService.ts diff --git a/src/browser/components/RightSidebar/CostsTab.tsx b/src/browser/components/RightSidebar/CostsTab.tsx index 1fe9a88890..ea88455f59 100644 --- a/src/browser/components/RightSidebar/CostsTab.tsx +++ b/src/browser/components/RightSidebar/CostsTab.tsx @@ -1,7 +1,7 @@ import React from "react"; import { useWorkspaceUsage, useWorkspaceConsumers } from "@/browser/stores/WorkspaceStore"; import { getModelStats } from "@/common/utils/tokens/modelStats"; -import { sumUsageHistory } from "@/common/utils/tokens/usageAggregator"; +import { sumUsageHistory, type ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator"; import { usePersistedState } from "@/browser/hooks/usePersistedState"; import { ToggleGroup, type ToggleOption } from "../ToggleGroup"; import { useProviderOptions } from "@/browser/hooks/useProviderOptions"; @@ -83,17 +83,17 @@ const CostsTabComponent: React.FC = ({ workspaceId }) => { useAutoCompactionSettings(workspaceId, currentModel); // Session usage for cost calculation - // Uses usageHistory (total across all steps) + liveCostUsage (cumulative during streaming) + // Uses sessionTotal (pre-computed) + liveCostUsage (cumulative during streaming) const sessionUsage = React.useMemo(() => { - const historicalSum = sumUsageHistory(usage.usageHistory); - if (!usage.liveCostUsage) return historicalSum; - if (!historicalSum) return usage.liveCostUsage; - return sumUsageHistory([historicalSum, usage.liveCostUsage]); - }, [usage.usageHistory, usage.liveCostUsage]); + const parts: ChatUsageDisplay[] = []; + if (usage.sessionTotal) parts.push(usage.sessionTotal); + if (usage.liveCostUsage) parts.push(usage.liveCostUsage); + return parts.length > 0 ? sumUsageHistory(parts) : undefined; + }, [usage.sessionTotal, usage.liveCostUsage]); const hasUsageData = usage && - (usage.usageHistory.length > 0 || + (usage.sessionTotal !== undefined || usage.lastContextUsage !== undefined || usage.liveUsage !== undefined); const hasConsumerData = consumers && (consumers.totalTokens > 0 || consumers.isCalculating); @@ -111,8 +111,8 @@ const CostsTabComponent: React.FC = ({ workspaceId }) => { ); } - // Last Request (for Cost section): always the last completed request - const lastRequestUsage = usage.usageHistory[usage.usageHistory.length - 1]; + // Last Request (for Cost section): from persisted data + const lastRequestUsage = usage.lastRequest?.usage; // Cost and Details table use viewMode const displayUsage = viewMode === "last-request" ? lastRequestUsage : sessionUsage; diff --git a/src/browser/contexts/WorkspaceContext.test.tsx b/src/browser/contexts/WorkspaceContext.test.tsx index e11b79818f..b4c95a7024 100644 --- a/src/browser/contexts/WorkspaceContext.test.tsx +++ b/src/browser/contexts/WorkspaceContext.test.tsx @@ -605,6 +605,7 @@ function createMockAPI(options: MockAPIOptions = {}) { ); }) ), + getSessionUsage: mock(options.workspace?.getSessionUsage ?? (() => Promise.resolve(undefined))), onChat: mock( options.workspace?.onChat ?? (async () => { diff --git a/src/browser/stores/WorkspaceStore.test.ts b/src/browser/stores/WorkspaceStore.test.ts index cd10c34513..d38891fc5d 100644 --- a/src/browser/stores/WorkspaceStore.test.ts +++ b/src/browser/stores/WorkspaceStore.test.ts @@ -11,9 +11,12 @@ const mockOnChat = mock(async function* (): AsyncGenerator Promise.resolve(undefined)); + const mockClient = { workspace: { onChat: mockOnChat, + getSessionUsage: mockGetSessionUsage, }, }; diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 4b7e4bd735..5c4ffde176 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -20,10 +20,14 @@ import { } from "@/common/orpc/types"; import type { StreamEndEvent, StreamAbortEvent } from "@/common/types/stream"; import { MapStore } from "./MapStore"; -import { collectUsageHistory, createDisplayUsage } from "@/common/utils/tokens/displayUsage"; +import { createDisplayUsage } from "@/common/utils/tokens/displayUsage"; import { WorkspaceConsumerManager } from "./WorkspaceConsumerManager"; import type { ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator"; +import { sumUsageHistory } from "@/common/utils/tokens/usageAggregator"; import type { TokenConsumer } from "@/common/types/chatStats"; +import { normalizeGatewayModel } from "@/common/utils/ai/models"; +import type { z } from "zod"; +import type { SessionUsageFileSchema } from "@/common/orpc/schemas/chatStats"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import { createFreshRetryState } from "@/browser/utils/messages/retryState"; import { trackStreamCompleted } from "@/common/telemetry"; @@ -65,12 +69,19 @@ type DerivedState = Record; * Updates instantly when usage metadata arrives. * * For multi-step tool calls, cost and context usage differ: - * - usageHistory: Total usage per message (sum of all steps) for cost calculation + * - sessionTotal: Pre-computed sum of all models from session-usage.json + * - lastRequest: Last completed request (persisted for app restart) * - lastContextUsage: Last step's usage for context window display (inputTokens = actual context size) */ export interface WorkspaceUsageState { - /** Usage history for cost calculation (total across all steps per message) */ - usageHistory: ChatUsageDisplay[]; + /** Pre-computed session total (sum of all models) */ + sessionTotal?: ChatUsageDisplay; + /** Last completed request (persisted) */ + lastRequest?: { + model: string; + usage: ChatUsageDisplay; + timestamp: number; + }; /** Last message's context usage (last step only, for context window display) */ lastContextUsage?: ChatUsageDisplay; totalTokens: number; @@ -124,6 +135,8 @@ export class WorkspaceStore { private pendingStreamEvents = new Map(); private workspaceMetadata = new Map(); // Store metadata for name lookup private queuedMessages = new Map(); // Cached queued messages + // Cumulative session usage (from session-usage.json) + private sessionUsage = new Map>(); // Idle callback handles for high-frequency delta events to reduce re-renders during streaming. // Data is always updated immediately in the aggregator; only UI notification is scheduled. @@ -172,6 +185,26 @@ export class WorkspaceStore { // Reset retry state on successful stream completion updatePersistedState(getRetryStateKey(workspaceId), createFreshRetryState()); + // Update local session usage (mirrors backend's addUsage) + const model = streamEndData.metadata?.model; + const rawUsage = streamEndData.metadata?.usage; + const providerMetadata = streamEndData.metadata?.providerMetadata; + if (model && rawUsage) { + const usage = createDisplayUsage(rawUsage, model, providerMetadata); + if (usage) { + const normalizedModel = normalizeGatewayModel(model); + const current = this.sessionUsage.get(workspaceId) ?? { + byModel: {}, + version: 1 as const, + }; + const existing = current.byModel[normalizedModel]; + // CRITICAL: Accumulate, don't overwrite (same logic as backend) + current.byModel[normalizedModel] = existing ? sumUsageHistory([existing, usage])! : usage; + current.lastRequest = { model: normalizedModel, usage, timestamp: Date.now() }; + this.sessionUsage.set(workspaceId, current); + } + } + // Flush any pending debounced bump before final bump to avoid double-bump this.cancelPendingIdleBump(workspaceId); this.states.bump(workspaceId); @@ -548,8 +581,7 @@ export class WorkspaceStore { } /** - * Extract usage from messages (no tokenization). - * Each usage entry calculated with its own model for accurate costs. + * Extract usage from session-usage.json (no tokenization or message iteration). * * Returns empty state if workspace doesn't exist (e.g., creation mode). */ @@ -557,39 +589,37 @@ export class WorkspaceStore { return this.usageStore.get(workspaceId, () => { const aggregator = this.aggregators.get(workspaceId); if (!aggregator) { - return { usageHistory: [], totalTokens: 0 }; + return { totalTokens: 0 }; } - const messages = aggregator.getAllMessages(); const model = aggregator.getCurrentModel(); + const sessionData = this.sessionUsage.get(workspaceId); - // Collect usage history for cost calculation (total across all steps per message) - const usageHistory = collectUsageHistory(messages, model); - - // Calculate total from usage history (now includes historical) - const totalTokens = usageHistory.reduce( - (sum, u) => - sum + - u.input.tokens + - u.cached.tokens + - u.cacheCreate.tokens + - u.output.tokens + - u.reasoning.tokens, - 0 - ); + // Session total: sum all models from persisted data + const sessionTotal = + sessionData && Object.keys(sessionData.byModel).length > 0 + ? sumUsageHistory(Object.values(sessionData.byModel)) + : undefined; + + // Last request from persisted data + const lastRequest = sessionData?.lastRequest; - // Get last message's context usage for context window display - // Uses contextUsage (last step) if available, falls back to usage for old messages - // Skips compacted messages - their usage reflects pre-compaction context, not current + // Calculate total tokens from session total + const totalTokens = sessionTotal + ? sessionTotal.input.tokens + + sessionTotal.cached.tokens + + sessionTotal.cacheCreate.tokens + + sessionTotal.output.tokens + + sessionTotal.reasoning.tokens + : 0; + + // Get last message's context usage (unchanged from before) + const messages = aggregator.getAllMessages(); const lastContextUsage = (() => { for (let i = messages.length - 1; i >= 0; i--) { const msg = messages[i]; if (msg.role === "assistant") { - // Skip compacted messages - their usage is from pre-compaction context - // and doesn't reflect current context window size - if (msg.metadata?.compacted) { - continue; - } + if (msg.metadata?.compacted) continue; const rawUsage = msg.metadata?.contextUsage; const providerMeta = msg.metadata?.contextProviderMetadata ?? msg.metadata?.providerMetadata; @@ -602,10 +632,8 @@ export class WorkspaceStore { return undefined; })(); - // Include active stream usage if currently streaming + // Live streaming data (unchanged) const activeStreamId = aggregator.getActiveStreamMessageId(); - - // Live context usage (last step's inputTokens = current context window) const rawContextUsage = activeStreamId ? aggregator.getActiveStreamUsage(activeStreamId) : undefined; @@ -617,7 +645,6 @@ export class WorkspaceStore { ? createDisplayUsage(rawContextUsage, model, rawStepProviderMetadata) : undefined; - // Live cost usage (cumulative across all steps, with accumulated cache creation tokens) const rawCumulativeUsage = activeStreamId ? aggregator.getActiveStreamCumulativeUsage(activeStreamId) : undefined; @@ -629,7 +656,7 @@ export class WorkspaceStore { ? createDisplayUsage(rawCumulativeUsage, model, rawCumulativeProviderMetadata) : undefined; - return { usageHistory, lastContextUsage, totalTokens, liveUsage, liveCostUsage }; + return { sessionTotal, lastRequest, lastContextUsage, totalTokens, liveUsage, liveCostUsage }; }); } @@ -793,6 +820,19 @@ export class WorkspaceStore { })(); this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); + + // Fetch persisted session usage (fire-and-forget) + this.client.workspace + .getSessionUsage({ workspaceId }) + .then((data) => { + if (data) { + this.sessionUsage.set(workspaceId, data); + this.usageStore.bump(workspaceId); + } + }) + .catch((error) => { + console.warn(`Failed to fetch session usage for ${workspaceId}:`, error); + }); } else { console.warn(`[WorkspaceStore] No ORPC client available for workspace ${workspaceId}`); } @@ -831,6 +871,7 @@ export class WorkspaceStore { this.previousSidebarValues.delete(workspaceId); this.sidebarStateCache.delete(workspaceId); this.workspaceCreatedAt.delete(workspaceId); + this.sessionUsage.delete(workspaceId); } /** diff --git a/src/browser/utils/compaction/autoCompactionCheck.test.ts b/src/browser/utils/compaction/autoCompactionCheck.test.ts index 9900992e14..bd5670aed0 100644 --- a/src/browser/utils/compaction/autoCompactionCheck.test.ts +++ b/src/browser/utils/compaction/autoCompactionCheck.test.ts @@ -27,23 +27,14 @@ const createUsageEntry = ( // Helper to create mock WorkspaceUsageState const createMockUsage = ( lastEntryTokens: number, - historicalTokens?: number, + _historicalTokens?: number, // Kept for backward compat but unused (session-usage.json handles historical) model: string = KNOWN_MODELS.SONNET.id, liveUsage?: ChatUsageDisplay ): WorkspaceUsageState => { - const usageHistory: ChatUsageDisplay[] = []; + // Create lastContextUsage representing the most recent context window state + const lastContextUsage = createUsageEntry(lastEntryTokens, model); - if (historicalTokens !== undefined) { - // Add historical usage (from compaction) - usageHistory.push(createUsageEntry(historicalTokens, "historical-model")); - } - - // Add recent usage - const recentUsage = createUsageEntry(lastEntryTokens, model); - usageHistory.push(recentUsage); - - // lastContextUsage is the most recent context window state - return { usageHistory, lastContextUsage: recentUsage, totalTokens: 0, liveUsage }; + return { lastContextUsage, totalTokens: 0, liveUsage }; }; describe("checkAutoCompaction", () => { @@ -60,8 +51,8 @@ describe("checkAutoCompaction", () => { expect(result.thresholdPercentage).toBe(70); }); - test("returns false when usage history is empty", () => { - const usage: WorkspaceUsageState = { usageHistory: [], totalTokens: 0 }; + test("returns false when no context usage data", () => { + const usage: WorkspaceUsageState = { totalTokens: 0 }; const result = checkAutoCompaction(usage, KNOWN_MODELS.SONNET.id, false); expect(result.shouldShowWarning).toBe(false); @@ -146,7 +137,6 @@ describe("checkAutoCompaction", () => { model: KNOWN_MODELS.SONNET.id, }; const usage: WorkspaceUsageState = { - usageHistory: [usageEntry], lastContextUsage: usageEntry, totalTokens: 0, }; @@ -195,8 +185,8 @@ describe("checkAutoCompaction", () => { }); describe("Edge Cases", () => { - test("empty usageHistory array returns safe defaults", () => { - const usage: WorkspaceUsageState = { usageHistory: [], totalTokens: 0 }; + test("missing context usage returns safe defaults", () => { + const usage: WorkspaceUsageState = { totalTokens: 0 }; const result = checkAutoCompaction(usage, KNOWN_MODELS.SONNET.id, false); expect(result.shouldShowWarning).toBe(false); @@ -204,7 +194,7 @@ describe("checkAutoCompaction", () => { expect(result.thresholdPercentage).toBe(70); }); - test("single entry in usageHistory works correctly", () => { + test("single context usage entry works correctly", () => { const usage = createMockUsage(140_000); const result = checkAutoCompaction(usage, KNOWN_MODELS.SONNET.id, false); @@ -242,7 +232,6 @@ describe("checkAutoCompaction", () => { model: KNOWN_MODELS.SONNET.id, }; const usage: WorkspaceUsageState = { - usageHistory: [zeroEntry], lastContextUsage: zeroEntry, totalTokens: 0, }; @@ -356,24 +345,22 @@ describe("checkAutoCompaction", () => { expect(result.shouldForceCompact).toBe(true); }); - test("shouldForceCompact triggers with empty history but liveUsage at force threshold", () => { + test("shouldForceCompact triggers with liveUsage at force threshold (no lastContextUsage)", () => { const liveUsage = createUsageEntry(150_000); // 75% const usage: WorkspaceUsageState = { - usageHistory: [], totalTokens: 0, liveUsage, }; const result = checkAutoCompaction(usage, KNOWN_MODELS.SONNET.id, false); expect(result.shouldForceCompact).toBe(true); - expect(result.usagePercentage).toBe(75); // usagePercentage reflects live even with empty history + expect(result.usagePercentage).toBe(75); // usagePercentage reflects live }); - test("shouldShowWarning uses live usage when no history exists", () => { - // No lastUsage, liveUsage at 65% - should show warning (65% >= 60%) + test("shouldShowWarning uses live usage when no lastContextUsage exists", () => { + // No lastContextUsage, liveUsage at 65% - should show warning (65% >= 60%) const liveUsage = createUsageEntry(130_000); // 65% const usage: WorkspaceUsageState = { - usageHistory: [], totalTokens: 0, liveUsage, }; diff --git a/src/cli/cli.test.ts b/src/cli/cli.test.ts index 578f18899b..c825c5b927 100644 --- a/src/cli/cli.test.ts +++ b/src/cli/cli.test.ts @@ -73,6 +73,7 @@ async function createTestServer(authToken?: string): Promise { menuEventService: services.menuEventService, voiceService: services.voiceService, telemetryService: services.telemetryService, + sessionUsageService: services.sessionUsageService, }; // Use the actual createOrpcServer function diff --git a/src/cli/server.test.ts b/src/cli/server.test.ts index ce3be52355..1883036ac9 100644 --- a/src/cli/server.test.ts +++ b/src/cli/server.test.ts @@ -76,6 +76,7 @@ async function createTestServer(): Promise { menuEventService: services.menuEventService, voiceService: services.voiceService, telemetryService: services.telemetryService, + sessionUsageService: services.sessionUsageService, }; // Use the actual createOrpcServer function diff --git a/src/cli/server.ts b/src/cli/server.ts index 90ac44a5a2..3651520cde 100644 --- a/src/cli/server.ts +++ b/src/cli/server.ts @@ -91,6 +91,7 @@ const mockWindow: BrowserWindow = { mcpServerManager: serviceContainer.mcpServerManager, voiceService: serviceContainer.voiceService, telemetryService: serviceContainer.telemetryService, + sessionUsageService: serviceContainer.sessionUsageService, }; const server = await createOrpcServer({ diff --git a/src/common/orpc/schemas.ts b/src/common/orpc/schemas.ts index 6e76877657..53a58f1f9f 100644 --- a/src/common/orpc/schemas.ts +++ b/src/common/orpc/schemas.ts @@ -23,6 +23,7 @@ export { ChatStatsSchema, ChatUsageComponentSchema, ChatUsageDisplaySchema, + SessionUsageFileSchema, TokenConsumerSchema, } from "./schemas/chatStats"; diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index 8dc885a6cf..5a6185605d 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -1,6 +1,6 @@ import { eventIterator } from "@orpc/server"; import { z } from "zod"; -import { ChatStatsSchema } from "./chatStats"; +import { ChatStatsSchema, SessionUsageFileSchema } from "./chatStats"; import { SendMessageErrorSchema } from "./errors"; import { BranchListResultSchema, ImagePartSchema, MuxMessageSchema } from "./message"; import { ProjectConfigSchema } from "./project"; @@ -323,7 +323,7 @@ export const workspace = { ), }, }, - /** +/** * Get the current plan file content for a workspace. * Used by UI to refresh plan display when file is edited externally. */ @@ -366,7 +366,7 @@ export const workspace = { output: ResultSchema(z.void(), z.string()), }, }, - /** +/** * Get post-compaction context state for a workspace. * Returns plan path (if exists) and tracked file paths that will be injected. */ @@ -390,6 +390,10 @@ export const workspace = { }), output: ResultSchema(z.void(), z.string()), }, + getSessionUsage: { + input: z.object({ workspaceId: z.string() }), + output: SessionUsageFileSchema.optional(), + }, }; export type WorkspaceSendMessageOutput = z.infer; diff --git a/src/common/orpc/schemas/chatStats.ts b/src/common/orpc/schemas/chatStats.ts index 7c0fb621cd..3c902303a8 100644 --- a/src/common/orpc/schemas/chatStats.ts +++ b/src/common/orpc/schemas/chatStats.ts @@ -37,3 +37,19 @@ export const ChatStatsSchema = z.object({ .array(ChatUsageDisplaySchema) .meta({ description: "Ordered array of actual usage statistics from API responses" }), }); + +/** + * Cumulative session usage file format. + * Stored in ~/.mux/sessions/{workspaceId}/session-usage.json + */ +export const SessionUsageFileSchema = z.object({ + byModel: z.record(z.string(), ChatUsageDisplaySchema), + lastRequest: z + .object({ + model: z.string(), + usage: ChatUsageDisplaySchema, + timestamp: z.number(), + }) + .optional(), + version: z.literal(1), +}); diff --git a/src/desktop/main.ts b/src/desktop/main.ts index 1a4311cf9c..f4873fee4a 100644 --- a/src/desktop/main.ts +++ b/src/desktop/main.ts @@ -338,6 +338,7 @@ async function loadServices(): Promise { menuEventService: services.menuEventService, voiceService: services.voiceService, telemetryService: services.telemetryService, + sessionUsageService: services.sessionUsageService, }; electronIpcMain.on("start-orpc-server", (event) => { diff --git a/src/node/orpc/context.ts b/src/node/orpc/context.ts index 9d22148ab0..b61efd9034 100644 --- a/src/node/orpc/context.ts +++ b/src/node/orpc/context.ts @@ -15,6 +15,7 @@ import type { VoiceService } from "@/node/services/voiceService"; import type { MCPConfigService } from "@/node/services/mcpConfigService"; import type { MCPServerManager } from "@/node/services/mcpServerManager"; import type { TelemetryService } from "@/node/services/telemetryService"; +import type { SessionUsageService } from "@/node/services/sessionUsageService"; export interface ORPCContext { config: Config; @@ -33,5 +34,6 @@ export interface ORPCContext { mcpConfigService: MCPConfigService; mcpServerManager: MCPServerManager; telemetryService: TelemetryService; + sessionUsageService: SessionUsageService; headers?: IncomingHttpHeaders; } diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index b264331ead..e261c9ba75 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -594,7 +594,7 @@ export const router = (authToken?: string) => { } }), }, - getPlanContent: t +getPlanContent: t .input(schemas.workspace.getPlanContent.input) .output(schemas.workspace.getPlanContent.output) .handler(async ({ context, input }) => { @@ -677,7 +677,7 @@ export const router = (authToken?: string) => { return { success: true, data: undefined }; }), }, - getPostCompactionState: t +getPostCompactionState: t .input(schemas.workspace.getPostCompactionState.input) .output(schemas.workspace.getPostCompactionState.output) .handler(({ context, input }) => { @@ -693,6 +693,12 @@ export const router = (authToken?: string) => { input.excluded ); }), + getSessionUsage: t + .input(schemas.workspace.getSessionUsage.input) + .output(schemas.workspace.getSessionUsage.output) + .handler(async ({ context, input }) => { + return context.sessionUsageService.getSessionUsage(input.workspaceId); + }), }, window: { setTitle: t diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index b47fc44b46..bd74ead3ef 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -42,6 +42,7 @@ import type { PostCompactionAttachment } from "@/common/types/attachment"; import { applyCacheControl } from "@/common/utils/ai/cacheStrategy"; import type { HistoryService } from "./historyService"; import type { PartialService } from "./partialService"; +import type { SessionUsageService } from "./sessionUsageService"; import { buildSystemMessage, readToolInstructions } from "./systemMessage"; import { getTokenizerForModel } from "@/node/utils/main/tokenizer"; import type { MCPServerManager } from "@/node/services/mcpServerManager"; @@ -284,7 +285,8 @@ export class AIService extends EventEmitter { historyService: HistoryService, partialService: PartialService, initStateManager: InitStateManager, - backgroundProcessManager?: BackgroundProcessManager + backgroundProcessManager?: BackgroundProcessManager, + sessionUsageService?: SessionUsageService ) { super(); // Increase max listeners to accommodate multiple concurrent workspace listeners @@ -295,7 +297,7 @@ export class AIService extends EventEmitter { this.partialService = partialService; this.initStateManager = initStateManager; this.backgroundProcessManager = backgroundProcessManager; - this.streamManager = new StreamManager(historyService, partialService); + this.streamManager = new StreamManager(historyService, partialService, sessionUsageService); void this.ensureSessionsDir(); this.setupStreamEventForwarding(); this.mockModeEnabled = process.env.MUX_MOCK_AI === "1"; diff --git a/src/node/services/serviceContainer.ts b/src/node/services/serviceContainer.ts index 8807c754e7..d3927ed7a5 100644 --- a/src/node/services/serviceContainer.ts +++ b/src/node/services/serviceContainer.ts @@ -23,6 +23,7 @@ import { TelemetryService } from "@/node/services/telemetryService"; import { BackgroundProcessManager } from "@/node/services/backgroundProcessManager"; import { MCPConfigService } from "@/node/services/mcpConfigService"; import { MCPServerManager } from "@/node/services/mcpServerManager"; +import { SessionUsageService } from "@/node/services/sessionUsageService"; /** * ServiceContainer - Central dependency container for all backend services. @@ -49,6 +50,7 @@ export class ServiceContainer { public readonly mcpConfigService: MCPConfigService; public readonly mcpServerManager: MCPServerManager; public readonly telemetryService: TelemetryService; + public readonly sessionUsageService: SessionUsageService; private readonly initStateManager: InitStateManager; private readonly extensionMetadata: ExtensionMetadataService; private readonly ptyService: PTYService; @@ -68,12 +70,14 @@ export class ServiceContainer { path.join(os.tmpdir(), "mux-bashes") ); this.mcpServerManager = new MCPServerManager(this.mcpConfigService); + this.sessionUsageService = new SessionUsageService(config, this.historyService); this.aiService = new AIService( config, this.historyService, this.partialService, this.initStateManager, - this.backgroundProcessManager + this.backgroundProcessManager, + this.sessionUsageService ); this.aiService.setMCPServerManager(this.mcpServerManager); this.workspaceService = new WorkspaceService( diff --git a/src/node/services/sessionUsageService.test.ts b/src/node/services/sessionUsageService.test.ts new file mode 100644 index 0000000000..820155d3de --- /dev/null +++ b/src/node/services/sessionUsageService.test.ts @@ -0,0 +1,210 @@ +import { describe, it, expect, beforeEach, afterEach, mock } from "bun:test"; +import { SessionUsageService } from "./sessionUsageService"; +import type { HistoryService } from "./historyService"; +import { Config } from "@/node/config"; +import { createMuxMessage } from "@/common/types/message"; +import type { ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator"; +import { Ok } from "@/common/types/result"; +import * as fs from "fs/promises"; +import * as path from "path"; +import * as os from "os"; + +function createMockHistoryService( + messages: Array> = [] +): HistoryService { + return { + getHistory: mock(() => Promise.resolve(Ok(messages))), + appendToHistory: mock(() => Promise.resolve(Ok(undefined))), + updateHistory: mock(() => Promise.resolve(Ok(undefined))), + truncateAfterMessage: mock(() => Promise.resolve(Ok(undefined))), + clearHistory: mock(() => Promise.resolve(Ok([]))), + } as unknown as HistoryService; +} + +function createUsage(input: number, output: number): ChatUsageDisplay { + return { + input: { tokens: input }, + output: { tokens: output }, + cached: { tokens: 0 }, + cacheCreate: { tokens: 0 }, + reasoning: { tokens: 0 }, + }; +} + +describe("SessionUsageService", () => { + let service: SessionUsageService; + let config: Config; + let tempDir: string; + let mockHistoryService: HistoryService; + + beforeEach(async () => { + tempDir = path.join(os.tmpdir(), `mux-session-usage-test-${Date.now()}-${Math.random()}`); + await fs.mkdir(tempDir, { recursive: true }); + config = new Config(tempDir); + mockHistoryService = createMockHistoryService(); + service = new SessionUsageService(config, mockHistoryService); + }); + + afterEach(async () => { + try { + await fs.rm(tempDir, { recursive: true, force: true }); + } catch { + // Ignore cleanup errors + } + }); + + describe("recordUsage", () => { + it("should accumulate usage for same model (not overwrite)", async () => { + const workspaceId = "test-workspace"; + const model = "claude-sonnet-4-20250514"; + const usage1 = createUsage(100, 50); + const usage2 = createUsage(200, 75); + + await service.recordUsage(workspaceId, model, usage1); + await service.recordUsage(workspaceId, model, usage2); + + const result = await service.getSessionUsage(workspaceId); + expect(result).toBeDefined(); + expect(result!.byModel[model].input.tokens).toBe(300); // 100 + 200 + expect(result!.byModel[model].output.tokens).toBe(125); // 50 + 75 + }); + + it("should track separate usage per model", async () => { + const workspaceId = "test-workspace"; + const sonnet = createUsage(100, 50); + const opus = createUsage(500, 200); + + await service.recordUsage(workspaceId, "claude-sonnet-4-20250514", sonnet); + await service.recordUsage(workspaceId, "claude-opus-4-20250514", opus); + + const result = await service.getSessionUsage(workspaceId); + expect(result).toBeDefined(); + expect(result!.byModel["claude-sonnet-4-20250514"].input.tokens).toBe(100); + expect(result!.byModel["claude-opus-4-20250514"].input.tokens).toBe(500); + }); + + it("should update lastRequest with each recordUsage call", async () => { + const workspaceId = "test-workspace"; + const usage1 = createUsage(100, 50); + const usage2 = createUsage(200, 75); + + await service.recordUsage(workspaceId, "claude-sonnet-4-20250514", usage1); + let result = await service.getSessionUsage(workspaceId); + expect(result?.lastRequest?.model).toBe("claude-sonnet-4-20250514"); + expect(result?.lastRequest?.usage.input.tokens).toBe(100); + + await service.recordUsage(workspaceId, "claude-opus-4-20250514", usage2); + result = await service.getSessionUsage(workspaceId); + expect(result?.lastRequest?.model).toBe("claude-opus-4-20250514"); + expect(result?.lastRequest?.usage.input.tokens).toBe(200); + }); + }); + + describe("getSessionUsage", () => { + it("should rebuild from messages when file missing (ENOENT)", async () => { + const workspaceId = "test-workspace"; + const messages = [ + createMuxMessage("msg1", "assistant", "Hello", { + model: "claude-sonnet-4-20250514", + usage: { inputTokens: 100, outputTokens: 50, totalTokens: 150 }, + }), + createMuxMessage("msg2", "assistant", "World", { + model: "claude-sonnet-4-20250514", + usage: { inputTokens: 200, outputTokens: 75, totalTokens: 275 }, + }), + ]; + mockHistoryService = createMockHistoryService(messages); + service = new SessionUsageService(config, mockHistoryService); + + // Create session dir but NOT the session-usage.json file + await fs.mkdir(config.getSessionDir(workspaceId), { recursive: true }); + + const result = await service.getSessionUsage(workspaceId); + + expect(result).toBeDefined(); + // Should have rebuilt and summed the usage + expect(result!.byModel["claude-sonnet-4-20250514"]).toBeDefined(); + }); + }); + + describe("rebuildFromMessages", () => { + it("should rebuild from messages when file is corrupted JSON", async () => { + const workspaceId = "test-workspace"; + const messages = [ + createMuxMessage("msg1", "assistant", "Hello", { + model: "claude-sonnet-4-20250514", + usage: { inputTokens: 100, outputTokens: 50, totalTokens: 150 }, + }), + ]; + mockHistoryService = createMockHistoryService(messages); + service = new SessionUsageService(config, mockHistoryService); + + // Create session dir with corrupted JSON + const sessionDir = config.getSessionDir(workspaceId); + await fs.mkdir(sessionDir, { recursive: true }); + await fs.writeFile(path.join(sessionDir, "session-usage.json"), "{ invalid json"); + + const result = await service.getSessionUsage(workspaceId); + + expect(result).toBeDefined(); + // Should have rebuilt from messages + expect(result!.byModel["claude-sonnet-4-20250514"]).toBeDefined(); + expect(result!.byModel["claude-sonnet-4-20250514"].input.tokens).toBe(100); + }); + + it("should capture historicalUsage from compaction summary under 'pre-compaction' key", async () => { + const workspaceId = "test-workspace"; + const historicalUsage = createUsage(1000, 500); // Pre-compaction costs + + const messages = [ + createMuxMessage("summary", "assistant", "Compaction summary", { + compacted: true, + model: "claude-sonnet-4-20250514", + usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, + historicalUsage, // This preserves pre-compaction costs + }), + ]; + + await service.rebuildFromMessages(workspaceId, messages); + const result = await service.getSessionUsage(workspaceId); + + expect(result).toBeDefined(); + expect(result!.byModel["pre-compaction"]).toBeDefined(); + expect(result!.byModel["pre-compaction"].input.tokens).toBe(1000); + expect(result!.byModel["pre-compaction"].output.tokens).toBe(500); + }); + + it("should use LAST historicalUsage when multiple compactions exist (each is cumulative)", async () => { + const workspaceId = "test-workspace"; + const firstHistorical = createUsage(500, 250); + const secondHistorical = createUsage(1500, 750); // Cumulative - includes first + + const messages = [ + createMuxMessage("summary1", "assistant", "First compaction", { + compacted: true, + model: "claude-sonnet-4-20250514", + usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, + historicalUsage: firstHistorical, + }), + createMuxMessage("msg1", "assistant", "Regular message", { + model: "claude-sonnet-4-20250514", + usage: { inputTokens: 100, outputTokens: 50, totalTokens: 150 }, + }), + createMuxMessage("summary2", "assistant", "Second compaction", { + compacted: true, + model: "claude-sonnet-4-20250514", + usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, + historicalUsage: secondHistorical, // This is cumulative, should replace first + }), + ]; + + await service.rebuildFromMessages(workspaceId, messages); + const result = await service.getSessionUsage(workspaceId); + + expect(result).toBeDefined(); + // Should have the LAST historicalUsage, not sum of both + expect(result!.byModel["pre-compaction"].input.tokens).toBe(1500); + expect(result!.byModel["pre-compaction"].output.tokens).toBe(750); + }); + }); +}); diff --git a/src/node/services/sessionUsageService.ts b/src/node/services/sessionUsageService.ts new file mode 100644 index 0000000000..4773691279 --- /dev/null +++ b/src/node/services/sessionUsageService.ts @@ -0,0 +1,201 @@ +import * as fs from "fs/promises"; +import * as path from "path"; +import writeFileAtomic from "write-file-atomic"; +import type { Config } from "@/node/config"; +import type { HistoryService } from "./historyService"; +import { workspaceFileLocks } from "@/node/utils/concurrency/workspaceFileLocks"; +import type { ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator"; +import { sumUsageHistory } from "@/common/utils/tokens/usageAggregator"; +import { createDisplayUsage } from "@/common/utils/tokens/displayUsage"; +import { normalizeGatewayModel } from "@/common/utils/ai/models"; +import type { MuxMessage } from "@/common/types/message"; +import { log } from "./log"; + +export interface SessionUsageFile { + byModel: Record; + lastRequest?: { + model: string; + usage: ChatUsageDisplay; + timestamp: number; + }; + version: 1; +} + +/** + * Service for managing cumulative session usage tracking. + * + * Replaces O(n) message iteration with a persistent JSON file that stores + * per-model usage breakdowns. Usage is accumulated on stream-end, never + * subtracted, making costs immune to message deletion. + */ +export class SessionUsageService { + private readonly SESSION_USAGE_FILE = "session-usage.json"; + private readonly fileLocks = workspaceFileLocks; + private readonly config: Config; + private readonly historyService: HistoryService; + + constructor(config: Config, historyService: HistoryService) { + this.config = config; + this.historyService = historyService; + } + + private getFilePath(workspaceId: string): string { + return path.join(this.config.getSessionDir(workspaceId), this.SESSION_USAGE_FILE); + } + + private async readFile(workspaceId: string): Promise { + try { + const data = await fs.readFile(this.getFilePath(workspaceId), "utf-8"); + return JSON.parse(data) as SessionUsageFile; + } catch (error) { + if (error && typeof error === "object" && "code" in error && error.code === "ENOENT") { + return { byModel: {}, version: 1 }; + } + throw error; + } + } + + private async writeFile(workspaceId: string, data: SessionUsageFile): Promise { + const filePath = this.getFilePath(workspaceId); + await fs.mkdir(path.dirname(filePath), { recursive: true }); + await writeFileAtomic(filePath, JSON.stringify(data, null, 2)); + } + + /** + * Record usage from a completed stream. Accumulates with existing usage + * AND updates lastRequest in a single atomic write. + * Model should already be normalized via normalizeGatewayModel(). + */ + async recordUsage(workspaceId: string, model: string, usage: ChatUsageDisplay): Promise { + return this.fileLocks.withLock(workspaceId, async () => { + const current = await this.readFile(workspaceId); + const existing = current.byModel[model]; + // CRITICAL: Accumulate, don't overwrite + current.byModel[model] = existing ? sumUsageHistory([existing, usage])! : usage; + current.lastRequest = { model, usage, timestamp: Date.now() }; + await this.writeFile(workspaceId, current); + }); + } + + /** + * Read current session usage. Returns undefined if file missing/corrupted + * and no messages to rebuild from. + */ + async getSessionUsage(workspaceId: string): Promise { + return this.fileLocks.withLock(workspaceId, async () => { + try { + const filePath = this.getFilePath(workspaceId); + const data = await fs.readFile(filePath, "utf-8"); + return JSON.parse(data) as SessionUsageFile; + } catch (error) { + // File missing or corrupted - try to rebuild from messages + if (error && typeof error === "object" && "code" in error && error.code === "ENOENT") { + const historyResult = await this.historyService.getHistory(workspaceId); + if (historyResult.success && historyResult.data.length > 0) { + await this.rebuildFromMessagesInternal(workspaceId, historyResult.data); + return this.readFile(workspaceId); + } + return undefined; // Truly empty session + } + // Parse error - try rebuild + log.warn(`session-usage.json corrupted for ${workspaceId}, rebuilding`); + const historyResult = await this.historyService.getHistory(workspaceId); + if (historyResult.success && historyResult.data.length > 0) { + await this.rebuildFromMessagesInternal(workspaceId, historyResult.data); + return this.readFile(workspaceId); + } + return undefined; + } + }); + } + + /** + * Rebuild session usage from messages (for migration/recovery). + * Internal version - called within lock. + * + * Handles historicalUsage from compaction summaries - this is a cumulative + * ChatUsageDisplay representing all pre-compaction costs (model-agnostic). + * We store it under the special "pre-compaction" key so it's included in totals. + */ + private async rebuildFromMessagesInternal( + workspaceId: string, + messages: MuxMessage[] + ): Promise { + const result: SessionUsageFile = { byModel: {}, version: 1 }; + let lastAssistantUsage: { model: string; usage: ChatUsageDisplay } | undefined; + // Track the last historicalUsage seen (each compaction summary contains + // cumulative history, so only the last one matters) + let lastHistoricalUsage: ChatUsageDisplay | undefined; + + for (const msg of messages) { + if (msg.role === "assistant") { + // Check for historicalUsage from compaction summaries + // This preserves costs from messages deleted during compaction + if (msg.metadata?.historicalUsage) { + lastHistoricalUsage = msg.metadata.historicalUsage; + } + + // Extract current message's usage + if (msg.metadata?.usage) { + const rawModel = msg.metadata.model ?? "unknown"; + const model = normalizeGatewayModel(rawModel); + const usage = createDisplayUsage( + msg.metadata.usage, + rawModel, + msg.metadata.providerMetadata + ); + + if (usage) { + const existing = result.byModel[model]; + result.byModel[model] = existing ? sumUsageHistory([existing, usage])! : usage; + lastAssistantUsage = { model, usage }; + } + } + } + } + + // Include historical usage from compaction under a special key + // This ensures pre-compaction costs are included in session totals + if (lastHistoricalUsage) { + const existing = result.byModel["pre-compaction"]; + result.byModel["pre-compaction"] = existing + ? sumUsageHistory([existing, lastHistoricalUsage])! + : lastHistoricalUsage; + } + + if (lastAssistantUsage) { + result.lastRequest = { + model: lastAssistantUsage.model, + usage: lastAssistantUsage.usage, + timestamp: Date.now(), + }; + } + + await this.writeFile(workspaceId, result); + log.info(`Rebuilt session-usage.json for ${workspaceId} from ${messages.length} messages`); + } + + /** + * Public rebuild method (acquires lock). + */ + async rebuildFromMessages(workspaceId: string, messages: MuxMessage[]): Promise { + return this.fileLocks.withLock(workspaceId, async () => { + await this.rebuildFromMessagesInternal(workspaceId, messages); + }); + } + + /** + * Delete session usage file (when workspace is deleted). + */ + async deleteSessionUsage(workspaceId: string): Promise { + return this.fileLocks.withLock(workspaceId, async () => { + try { + await fs.unlink(this.getFilePath(workspaceId)); + } catch (error) { + if (!(error && typeof error === "object" && "code" in error && error.code === "ENOENT")) { + throw error; + } + } + }); + } +} diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 815d8d0210..44f40dc918 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -38,6 +38,9 @@ import { createCachedSystemMessage, applyCacheControlToTools, } from "@/common/utils/ai/cacheStrategy"; +import type { SessionUsageService } from "./sessionUsageService"; +import { createDisplayUsage } from "@/common/utils/tokens/displayUsage"; +import { normalizeGatewayModel } from "@/common/utils/ai/models"; // Type definitions for stream parts with extended properties interface ReasoningDeltaPart { @@ -144,16 +147,22 @@ export class StreamManager extends EventEmitter { private readonly PARTIAL_WRITE_THROTTLE_MS = 500; private readonly historyService: HistoryService; private readonly partialService: PartialService; + private readonly sessionUsageService?: SessionUsageService; // Token tracker for live streaming statistics private tokenTracker = new StreamingTokenTracker(); // Track OpenAI previousResponseIds that have been invalidated // When frontend retries, buildProviderOptions will omit these IDs private lostResponseIds = new Set(); - constructor(historyService: HistoryService, partialService: PartialService) { + constructor( + historyService: HistoryService, + partialService: PartialService, + sessionUsageService?: SessionUsageService + ) { super(); this.historyService = historyService; this.partialService = partialService; + this.sessionUsageService = sessionUsageService; } /** @@ -1113,6 +1122,19 @@ export class StreamManager extends EventEmitter { // Update the placeholder message in chat.jsonl with final content await this.historyService.updateHistory(workspaceId as string, finalAssistantMessage); + + // Update cumulative session usage (if service is available) + if (this.sessionUsageService && totalUsage) { + const messageUsage = createDisplayUsage(totalUsage, streamInfo.model, providerMetadata); + if (messageUsage) { + const normalizedModel = normalizeGatewayModel(streamInfo.model); + await this.sessionUsageService.recordUsage( + workspaceId as string, + normalizedModel, + messageUsage + ); + } + } } } } catch (error) { diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index ee670f2c14..1330afebb1 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -857,6 +857,16 @@ export class WorkspaceService extends EventEmitter { throw error; } } + + const sourceUsagePath = path.join(sourceSessionDir, "session-usage.json"); + const newUsagePath = path.join(newSessionDir, "session-usage.json"); + try { + await fsPromises.copyFile(sourceUsagePath, newUsagePath); + } catch (error) { + if (!(error && typeof error === "object" && "code" in error && error.code === "ENOENT")) { + throw error; + } + } } catch (copyError) { await runtime.deleteWorkspace(foundProjectPath, newName, true); try { diff --git a/tests/ipc/setup.ts b/tests/ipc/setup.ts index 8cd1908c1d..a98f1dbb58 100644 --- a/tests/ipc/setup.ts +++ b/tests/ipc/setup.ts @@ -85,6 +85,7 @@ export async function createTestEnvironment(): Promise { menuEventService: services.menuEventService, voiceService: services.voiceService, telemetryService: services.telemetryService, + sessionUsageService: services.sessionUsageService, }; const orpc = createOrpcTestClient(orpcContext); From 52e7fd64bb3d84a880f9ac27b2217fbce0e860fa Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 10 Dec 2025 17:36:07 +1100 Subject: [PATCH 2/6] refactor: remove historicalUsage dead code Now that session-usage.json accumulates costs on every stream-end, historicalUsage in compaction summaries is redundant. The new system makes costs immune to compaction without needing this field. - Remove historicalUsage from MuxMetadata type and schema - Remove historicalUsage calculation from compactionHandler - Remove historicalUsage handling from sessionUsageService rebuild - Remove historicalUsage handling from collectUsageHistory - Remove related tests --- src/common/orpc/schemas/api.ts | 2 +- src/common/orpc/schemas/message.ts | 2 - src/common/types/message.ts | 2 - src/common/utils/tokens/displayUsage.test.ts | 56 +------------------ src/common/utils/tokens/displayUsage.ts | 13 ----- src/node/orpc/router.ts | 2 +- src/node/services/compactionHandler.test.ts | 42 -------------- src/node/services/compactionHandler.ts | 22 +++----- src/node/services/sessionUsageService.test.ts | 55 ------------------ src/node/services/sessionUsageService.ts | 22 -------- 10 files changed, 11 insertions(+), 207 deletions(-) diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index 5a6185605d..8133d75c95 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -323,7 +323,7 @@ export const workspace = { ), }, }, -/** + /** * Get the current plan file content for a workspace. * Used by UI to refresh plan display when file is edited externally. */ diff --git a/src/common/orpc/schemas/message.ts b/src/common/orpc/schemas/message.ts index 927eb3d011..2f9689409a 100644 --- a/src/common/orpc/schemas/message.ts +++ b/src/common/orpc/schemas/message.ts @@ -1,5 +1,4 @@ import { z } from "zod"; -import { ChatUsageDisplaySchema } from "./chatStats"; import { StreamErrorTypeSchema } from "./errors"; export const ImagePartSchema = z.object({ @@ -90,7 +89,6 @@ export const MuxMessageSchema = z.object({ synthetic: z.boolean().optional(), error: z.string().optional(), errorType: StreamErrorTypeSchema.optional(), - historicalUsage: ChatUsageDisplaySchema.optional(), }) .optional(), }); diff --git a/src/common/types/message.ts b/src/common/types/message.ts index 78617aba5b..f8af4dd71c 100644 --- a/src/common/types/message.ts +++ b/src/common/types/message.ts @@ -2,7 +2,6 @@ import type { UIMessage } from "ai"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import type { StreamErrorType } from "./errors"; import type { ToolPolicy } from "@/common/utils/tools/toolPolicy"; -import type { ChatUsageDisplay } from "@/common/utils/tokens/usageAggregator"; import type { ImagePart, MuxToolPartSchema } from "@/common/orpc/schemas"; import type { z } from "zod"; import { type ReviewNoteData, formatReviewForModel } from "./review"; @@ -116,7 +115,6 @@ export interface MuxMetadata { mode?: string; // The mode (plan/exec/etc) active when this message was sent (assistant messages only) cmuxMetadata?: MuxFrontendMetadata; // Frontend-defined metadata, backend treats as black-box muxMetadata?: MuxFrontendMetadata; // Frontend-defined metadata, backend treats as black-box - historicalUsage?: ChatUsageDisplay; // Cumulative usage from all messages before this compaction (only present on compaction summaries) } // Extended tool part type that supports interrupted tool calls (input-available state) diff --git a/src/common/utils/tokens/displayUsage.test.ts b/src/common/utils/tokens/displayUsage.test.ts index e0cc3f5b74..0b0a788036 100644 --- a/src/common/utils/tokens/displayUsage.test.ts +++ b/src/common/utils/tokens/displayUsage.test.ts @@ -1,21 +1,15 @@ import { describe, test, expect } from "bun:test"; import { collectUsageHistory, createDisplayUsage } from "./displayUsage"; -import { sumUsageHistory, type ChatUsageDisplay } from "./usageAggregator"; +import { sumUsageHistory } from "./usageAggregator"; import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; // Helper to create assistant message with usage -const createAssistant = ( - id: string, - usage?: LanguageModelV2Usage, - model?: string, - historicalUsage?: ChatUsageDisplay -): MuxMessage => { +const createAssistant = (id: string, usage?: LanguageModelV2Usage, model?: string): MuxMessage => { const msg = createMuxMessage(id, "assistant", "Response", { historySequence: 0, usage, model, - historicalUsage, }); return msg; }; @@ -87,52 +81,6 @@ describe("collectUsageHistory", () => { expect(result[0].model).toBe("unknown"); }); - test("prepends historical usage from compaction summary", () => { - const historicalUsage: ChatUsageDisplay = { - input: { tokens: 500, cost_usd: 0.01 }, - output: { tokens: 250, cost_usd: 0.02 }, - cached: { tokens: 0 }, - cacheCreate: { tokens: 0 }, - reasoning: { tokens: 0 }, - model: "historical-model", - }; - - const msg = createAssistant("a1", basicUsage, "claude-sonnet-4-5", historicalUsage); - const result = collectUsageHistory([msg]); - - expect(result).toHaveLength(2); - expect(result[0]).toBe(historicalUsage); // Historical comes first - expect(result[1].model).toBe("claude-sonnet-4-5"); // Current message second - }); - - test("uses latest historical usage when multiple messages have it", () => { - const historical1: ChatUsageDisplay = { - input: { tokens: 100 }, - output: { tokens: 50 }, - cached: { tokens: 0 }, - cacheCreate: { tokens: 0 }, - reasoning: { tokens: 0 }, - model: "first", - }; - - const historical2: ChatUsageDisplay = { - input: { tokens: 200 }, - output: { tokens: 100 }, - cached: { tokens: 0 }, - cacheCreate: { tokens: 0 }, - reasoning: { tokens: 0 }, - model: "second", - }; - - const msg1 = createAssistant("a1", basicUsage, "model-1", historical1); - const msg2 = createAssistant("a2", basicUsage, "model-2", historical2); - const result = collectUsageHistory([msg1, msg2]); - - expect(result).toHaveLength(3); // historical2 + msg1 + msg2 - expect(result[0]).toBe(historical2); // Latest historical usage wins - expect(result[0].model).toBe("second"); - }); - test("handles mixed message order correctly", () => { const userMsg = createMuxMessage("u1", "user", "Hello", { historySequence: 0 }); const assistantMsg1 = createAssistant("a1", basicUsage, "model-1"); diff --git a/src/common/utils/tokens/displayUsage.ts b/src/common/utils/tokens/displayUsage.ts index 72c0aa1bba..e00964dc6d 100644 --- a/src/common/utils/tokens/displayUsage.ts +++ b/src/common/utils/tokens/displayUsage.ts @@ -111,16 +111,9 @@ export function collectUsageHistory( ): ChatUsageDisplay[] { // Extract usage from assistant messages const usageHistory: ChatUsageDisplay[] = []; - let cumulativeHistorical: ChatUsageDisplay | undefined; for (const msg of messages) { if (msg.role === "assistant") { - // Check for historical usage from compaction summaries - // This preserves costs from messages deleted during compaction - if (msg.metadata?.historicalUsage) { - cumulativeHistorical = msg.metadata.historicalUsage; - } - // Extract current message's usage (total across all steps) if (msg.metadata?.usage) { // Use the model from this specific message (not global) @@ -134,11 +127,5 @@ export function collectUsageHistory( } } - // If we have historical usage from a compaction, prepend it to history - // This ensures costs from pre-compaction messages are included in totals - if (cumulativeHistorical) { - usageHistory.unshift(cumulativeHistorical); - } - return usageHistory; } diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index e261c9ba75..0be2dfdeda 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -594,7 +594,7 @@ export const router = (authToken?: string) => { } }), }, -getPlanContent: t + getPlanContent: t .input(schemas.workspace.getPlanContent.input) .output(schemas.workspace.getPlanContent.output) .handler(async ({ context, input }) => { diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index 8e498ae7ed..db2b0201d6 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -6,7 +6,6 @@ import type { EventEmitter } from "events"; import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import type { StreamEndEvent } from "@/common/types/stream"; import { Ok, Err, type Result } from "@/common/types/result"; -import type { LanguageModelV2Usage } from "@ai-sdk/provider"; interface EmittedEvent { event: string; @@ -86,23 +85,6 @@ const createCompactionRequest = (id = "req-1"): MuxMessage => muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} }, }); -const createAssistantMessage = ( - content: string, - options: { - id?: string; - historySequence?: number; - model?: string; - usage?: { inputTokens: number; outputTokens: number; totalTokens?: number }; - duration?: number; - } = {} -): MuxMessage => - createMuxMessage(options.id ?? "asst-1", "assistant", content, { - historySequence: options.historySequence ?? 1, - model: options.model ?? "claude-3-5-sonnet-20241022", - usage: options.usage as LanguageModelV2Usage | undefined, - duration: options.duration, - }); - const createStreamEndEvent = ( summary: string, metadata?: Record @@ -334,30 +316,6 @@ describe("CompactionHandler", () => { expect(streamMsg.metadata.duration).toBe(1234); }); - it("should calculate historicalUsage from all messages using collectUsageHistory and sumUsageHistory", async () => { - const compactionReq = createCompactionRequest(); - const msg1 = createAssistantMessage("Response 1", { - historySequence: 1, - usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, - }); - const msg2 = createAssistantMessage("Response 2", { - historySequence: 2, - usage: { inputTokens: 100, outputTokens: 50, totalTokens: 150 }, - }); - mockHistoryService.mockGetHistory(Ok([compactionReq, msg1, msg2])); - mockHistoryService.mockClearHistory(Ok([0, 1, 2])); - mockHistoryService.mockAppendToHistory(Ok(undefined)); - - const event = createStreamEndEvent("Summary"); - await handler.handleCompletion(event); - - const appendedMsg = mockHistoryService.appendToHistory.mock.calls[0][1] as MuxMessage; - expect(appendedMsg.metadata?.historicalUsage).toBeDefined(); - // historicalUsage is ChatUsageDisplay with input/output/cached/reasoning components - expect(appendedMsg.metadata?.historicalUsage).toHaveProperty("input"); - expect(appendedMsg.metadata?.historicalUsage).toHaveProperty("output"); - }); - it("should set compacted: true in summary metadata", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 05206813bc..6aa5f1217e 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -6,8 +6,7 @@ import type { WorkspaceChatMessage, DeleteMessage } from "@/common/orpc/types"; import type { Result } from "@/common/types/result"; import { Ok, Err } from "@/common/types/result"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; -import { collectUsageHistory } from "@/common/utils/tokens/displayUsage"; -import { sumUsageHistory } from "@/common/utils/tokens/usageAggregator"; + import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import { log } from "@/node/services/log"; import { @@ -124,15 +123,14 @@ export class CompactionHandler { * Perform history compaction by replacing all messages with a summary * * Steps: - * 1. Calculate cumulative usage from all messages (for historicalUsage field) - * 2. Clear entire history and get deleted sequence numbers - * 3. Append summary message with metadata - * 4. Emit delete event for old messages - * 5. Emit summary message to frontend + * 1. Clear entire history and get deleted sequence numbers + * 2. Append summary message with metadata + * 3. Emit delete event for old messages + * 4. Emit summary message to frontend */ private async performCompaction( summary: string, - messages: MuxMessage[], + _messages: MuxMessage[], metadata: { model: string; usage?: LanguageModelV2Usage; @@ -141,10 +139,6 @@ export class CompactionHandler { systemMessageTokens?: number; } ): Promise> { - const usageHistory = collectUsageHistory(messages, undefined); - - const historicalUsage = usageHistory.length > 0 ? sumUsageHistory(usageHistory) : undefined; - // CRITICAL: Delete partial.json BEFORE clearing history // This prevents a race condition where: // 1. CompactionHandler clears history and appends summary @@ -169,8 +163,7 @@ export class CompactionHandler { // Create summary message with metadata. // We omit providerMetadata because it contains cacheCreationInputTokens from the - // pre-compaction context, which inflates context usage display. The historicalUsage - // field preserves full cost accounting from pre-compaction messages. + // pre-compaction context, which inflates context usage display. const summaryMessage = createMuxMessage( `summary-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`, "assistant", @@ -180,7 +173,6 @@ export class CompactionHandler { compacted: true, model: metadata.model, usage: metadata.usage, - historicalUsage, duration: metadata.duration, systemMessageTokens: metadata.systemMessageTokens, muxMetadata: { type: "normal" }, diff --git a/src/node/services/sessionUsageService.test.ts b/src/node/services/sessionUsageService.test.ts index 820155d3de..981383ec0f 100644 --- a/src/node/services/sessionUsageService.test.ts +++ b/src/node/services/sessionUsageService.test.ts @@ -151,60 +151,5 @@ describe("SessionUsageService", () => { expect(result!.byModel["claude-sonnet-4-20250514"]).toBeDefined(); expect(result!.byModel["claude-sonnet-4-20250514"].input.tokens).toBe(100); }); - - it("should capture historicalUsage from compaction summary under 'pre-compaction' key", async () => { - const workspaceId = "test-workspace"; - const historicalUsage = createUsage(1000, 500); // Pre-compaction costs - - const messages = [ - createMuxMessage("summary", "assistant", "Compaction summary", { - compacted: true, - model: "claude-sonnet-4-20250514", - usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, - historicalUsage, // This preserves pre-compaction costs - }), - ]; - - await service.rebuildFromMessages(workspaceId, messages); - const result = await service.getSessionUsage(workspaceId); - - expect(result).toBeDefined(); - expect(result!.byModel["pre-compaction"]).toBeDefined(); - expect(result!.byModel["pre-compaction"].input.tokens).toBe(1000); - expect(result!.byModel["pre-compaction"].output.tokens).toBe(500); - }); - - it("should use LAST historicalUsage when multiple compactions exist (each is cumulative)", async () => { - const workspaceId = "test-workspace"; - const firstHistorical = createUsage(500, 250); - const secondHistorical = createUsage(1500, 750); // Cumulative - includes first - - const messages = [ - createMuxMessage("summary1", "assistant", "First compaction", { - compacted: true, - model: "claude-sonnet-4-20250514", - usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, - historicalUsage: firstHistorical, - }), - createMuxMessage("msg1", "assistant", "Regular message", { - model: "claude-sonnet-4-20250514", - usage: { inputTokens: 100, outputTokens: 50, totalTokens: 150 }, - }), - createMuxMessage("summary2", "assistant", "Second compaction", { - compacted: true, - model: "claude-sonnet-4-20250514", - usage: { inputTokens: 50, outputTokens: 25, totalTokens: 75 }, - historicalUsage: secondHistorical, // This is cumulative, should replace first - }), - ]; - - await service.rebuildFromMessages(workspaceId, messages); - const result = await service.getSessionUsage(workspaceId); - - expect(result).toBeDefined(); - // Should have the LAST historicalUsage, not sum of both - expect(result!.byModel["pre-compaction"].input.tokens).toBe(1500); - expect(result!.byModel["pre-compaction"].output.tokens).toBe(750); - }); }); }); diff --git a/src/node/services/sessionUsageService.ts b/src/node/services/sessionUsageService.ts index 4773691279..932149da61 100644 --- a/src/node/services/sessionUsageService.ts +++ b/src/node/services/sessionUsageService.ts @@ -112,10 +112,6 @@ export class SessionUsageService { /** * Rebuild session usage from messages (for migration/recovery). * Internal version - called within lock. - * - * Handles historicalUsage from compaction summaries - this is a cumulative - * ChatUsageDisplay representing all pre-compaction costs (model-agnostic). - * We store it under the special "pre-compaction" key so it's included in totals. */ private async rebuildFromMessagesInternal( workspaceId: string, @@ -123,18 +119,9 @@ export class SessionUsageService { ): Promise { const result: SessionUsageFile = { byModel: {}, version: 1 }; let lastAssistantUsage: { model: string; usage: ChatUsageDisplay } | undefined; - // Track the last historicalUsage seen (each compaction summary contains - // cumulative history, so only the last one matters) - let lastHistoricalUsage: ChatUsageDisplay | undefined; for (const msg of messages) { if (msg.role === "assistant") { - // Check for historicalUsage from compaction summaries - // This preserves costs from messages deleted during compaction - if (msg.metadata?.historicalUsage) { - lastHistoricalUsage = msg.metadata.historicalUsage; - } - // Extract current message's usage if (msg.metadata?.usage) { const rawModel = msg.metadata.model ?? "unknown"; @@ -154,15 +141,6 @@ export class SessionUsageService { } } - // Include historical usage from compaction under a special key - // This ensures pre-compaction costs are included in session totals - if (lastHistoricalUsage) { - const existing = result.byModel["pre-compaction"]; - result.byModel["pre-compaction"] = existing - ? sumUsageHistory([existing, lastHistoricalUsage])! - : lastHistoricalUsage; - } - if (lastAssistantUsage) { result.lastRequest = { model: lastAssistantUsage.model, From 630a422680a90b3094becab5fbcfb7191e64622a Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 10 Dec 2025 18:04:27 +1100 Subject: [PATCH 3/6] refactor: remove collectUsageHistory and unused _messages param --- src/common/utils/tokens/displayUsage.test.ts | 170 +------------------ src/common/utils/tokens/displayUsage.ts | 30 ---- src/node/services/compactionHandler.ts | 5 +- 3 files changed, 3 insertions(+), 202 deletions(-) diff --git a/src/common/utils/tokens/displayUsage.test.ts b/src/common/utils/tokens/displayUsage.test.ts index 0b0a788036..a11015938c 100644 --- a/src/common/utils/tokens/displayUsage.test.ts +++ b/src/common/utils/tokens/displayUsage.test.ts @@ -1,100 +1,7 @@ import { describe, test, expect } from "bun:test"; -import { collectUsageHistory, createDisplayUsage } from "./displayUsage"; -import { sumUsageHistory } from "./usageAggregator"; -import { createMuxMessage, type MuxMessage } from "@/common/types/message"; +import { createDisplayUsage } from "./displayUsage"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; -// Helper to create assistant message with usage -const createAssistant = (id: string, usage?: LanguageModelV2Usage, model?: string): MuxMessage => { - const msg = createMuxMessage(id, "assistant", "Response", { - historySequence: 0, - usage, - model, - }); - return msg; -}; - -describe("collectUsageHistory", () => { - const basicUsage: LanguageModelV2Usage = { - inputTokens: 100, - outputTokens: 50, - totalTokens: 150, - }; - - test("returns empty array for empty messages", () => { - expect(collectUsageHistory([])).toEqual([]); - }); - - test("returns empty array when no assistant messages", () => { - const userMsg = createMuxMessage("u1", "user", "Hello", { historySequence: 0 }); - expect(collectUsageHistory([userMsg])).toEqual([]); - }); - - test("extracts usage from single assistant message", () => { - const msg = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); - const result = collectUsageHistory([msg]); - - expect(result).toHaveLength(1); - expect(result[0].model).toBe("claude-sonnet-4-5"); - expect(result[0].input.tokens).toBe(100); - expect(result[0].output.tokens).toBe(50); - }); - - test("extracts usage from multiple assistant messages", () => { - const msg1 = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); - const msg2 = createAssistant("a2", { ...basicUsage, inputTokens: 200 }, "claude-sonnet-4-5"); - const result = collectUsageHistory([msg1, msg2]); - - expect(result).toHaveLength(2); - expect(result[0].input.tokens).toBe(100); - expect(result[1].input.tokens).toBe(200); - }); - - test("skips assistant messages without usage", () => { - const msg1 = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); - const msg2 = createAssistant("a2", undefined, "claude-sonnet-4-5"); // No usage - const msg3 = createAssistant("a3", basicUsage, "claude-sonnet-4-5"); - const result = collectUsageHistory([msg1, msg2, msg3]); - - expect(result).toHaveLength(2); // msg2 excluded - }); - - test("filters out user messages", () => { - const userMsg = createMuxMessage("u1", "user", "Hello", { historySequence: 0 }); - const assistantMsg = createAssistant("a1", basicUsage, "claude-sonnet-4-5"); - const result = collectUsageHistory([userMsg, assistantMsg]); - - expect(result).toHaveLength(1); - }); - - test("uses fallbackModel when message has no model", () => { - const msg = createAssistant("a1", basicUsage, undefined); - const result = collectUsageHistory([msg], "fallback-model"); - - expect(result[0].model).toBe("fallback-model"); - }); - - test("defaults to 'unknown' when no model provided", () => { - const msg = createAssistant("a1", basicUsage, undefined); - const result = collectUsageHistory([msg]); - - expect(result[0].model).toBe("unknown"); - }); - - test("handles mixed message order correctly", () => { - const userMsg = createMuxMessage("u1", "user", "Hello", { historySequence: 0 }); - const assistantMsg1 = createAssistant("a1", basicUsage, "model-1"); - const userMsg2 = createMuxMessage("u2", "user", "More", { historySequence: 2 }); - const assistantMsg2 = createAssistant("a2", basicUsage, "model-2"); - - const result = collectUsageHistory([userMsg, assistantMsg1, userMsg2, assistantMsg2]); - - expect(result).toHaveLength(2); - expect(result[0].model).toBe("model-1"); - expect(result[1].model).toBe("model-2"); - }); -}); - describe("createDisplayUsage", () => { describe("Provider-specific cached token handling", () => { // OpenAI reports inputTokens INCLUSIVE of cachedInputTokens @@ -289,78 +196,3 @@ describe("createDisplayUsage", () => { }); }); }); - -describe("multi-model cost calculation", () => { - test("calculates correct total cost across different models", () => { - // Create messages with different models and raw token counts - const claudeMsg = createAssistant( - "a1", - { - inputTokens: 10000, - outputTokens: 1000, - totalTokens: 11000, - }, - "anthropic:claude-sonnet-4-5" - ); - - const gptMsg = createAssistant( - "a2", - { - inputTokens: 20000, - outputTokens: 2000, - totalTokens: 22000, - }, - "openai:gpt-4o" - ); - - // Run through full pipeline - const usageHistory = collectUsageHistory([claudeMsg, gptMsg]); - const total = sumUsageHistory(usageHistory); - - // Verify per-model costs are calculated correctly - // Claude: $3/M input, $15/M output - expect(usageHistory[0].input.cost_usd).toBeCloseTo(0.03); // 10k × $0.000003 - expect(usageHistory[0].output.cost_usd).toBeCloseTo(0.015); // 1k × $0.000015 - - // GPT-4o: $2.50/M input, $10/M output - expect(usageHistory[1].input.cost_usd).toBeCloseTo(0.05); // 20k × $0.0000025 - expect(usageHistory[1].output.cost_usd).toBeCloseTo(0.02); // 2k × $0.00001 - - // Verify total sums correctly - expect(total?.input.cost_usd).toBeCloseTo(0.08); // $0.03 + $0.05 - expect(total?.output.cost_usd).toBeCloseTo(0.035); // $0.015 + $0.02 - expect(total?.hasUnknownCosts).toBeFalsy(); - }); - - test("flags hasUnknownCosts when one model has no pricing", () => { - const claudeMsg = createAssistant( - "a1", - { - inputTokens: 10000, - outputTokens: 1000, - totalTokens: 11000, - }, - "anthropic:claude-sonnet-4-5" - ); - - const unknownMsg = createAssistant( - "a2", - { - inputTokens: 5000, - outputTokens: 500, - totalTokens: 5500, - }, - "unknown:custom-model" - ); - - const usageHistory = collectUsageHistory([claudeMsg, unknownMsg]); - const total = sumUsageHistory(usageHistory); - - // Claude costs should still be included - expect(total?.input.cost_usd).toBeCloseTo(0.03); - expect(total?.output.cost_usd).toBeCloseTo(0.015); - - // Flag indicates incomplete total - expect(total?.hasUnknownCosts).toBe(true); - }); -}); diff --git a/src/common/utils/tokens/displayUsage.ts b/src/common/utils/tokens/displayUsage.ts index e00964dc6d..f936d67926 100644 --- a/src/common/utils/tokens/displayUsage.ts +++ b/src/common/utils/tokens/displayUsage.ts @@ -8,7 +8,6 @@ import type { LanguageModelV2Usage } from "@ai-sdk/provider"; import { getModelStats } from "./modelStats"; import type { ChatUsageDisplay } from "./usageAggregator"; -import type { MuxMessage } from "@/common/types/message"; import { normalizeGatewayModel } from "../ai/models"; /** @@ -100,32 +99,3 @@ export function createDisplayUsage( model, // Include model for display purposes }; } - -/** - * Collect usage history for cost calculation. - * Uses totalUsage (sum of all steps) for accurate cost reporting. - */ -export function collectUsageHistory( - messages: MuxMessage[], - fallbackModel?: string -): ChatUsageDisplay[] { - // Extract usage from assistant messages - const usageHistory: ChatUsageDisplay[] = []; - - for (const msg of messages) { - if (msg.role === "assistant") { - // Extract current message's usage (total across all steps) - if (msg.metadata?.usage) { - // Use the model from this specific message (not global) - const model = msg.metadata.model ?? fallbackModel ?? "unknown"; - const usage = createDisplayUsage(msg.metadata.usage, model, msg.metadata.providerMetadata); - - if (usage) { - usageHistory.push(usage); - } - } - } - } - - return usageHistory; -} diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 6aa5f1217e..861a1f30c4 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -7,7 +7,7 @@ import type { Result } from "@/common/types/result"; import { Ok, Err } from "@/common/types/result"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; -import { createMuxMessage, type MuxMessage } from "@/common/types/message"; +import { createMuxMessage } from "@/common/types/message"; import { log } from "@/node/services/log"; import { extractEditedFileDiffs, @@ -108,7 +108,7 @@ export class CompactionHandler { // Mark as processed before performing compaction this.processedCompactionRequestIds.add(lastUserMsg.id); - const result = await this.performCompaction(summary, messages, event.metadata); + const result = await this.performCompaction(summary, event.metadata); if (!result.success) { log.error("Compaction failed:", result.error); return false; @@ -130,7 +130,6 @@ export class CompactionHandler { */ private async performCompaction( summary: string, - _messages: MuxMessage[], metadata: { model: string; usage?: LanguageModelV2Usage; From 5b8a85febcfa9b5cdea4535e6f5c9fedd9ef98b8 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 10 Dec 2025 18:24:36 +1100 Subject: [PATCH 4/6] fix: read historicalUsage from legacy compaction summaries during rebuild --- src/node/services/sessionUsageService.test.ts | 44 +++++++++++++++++++ src/node/services/sessionUsageService.ts | 12 +++++ 2 files changed, 56 insertions(+) diff --git a/src/node/services/sessionUsageService.test.ts b/src/node/services/sessionUsageService.test.ts index 981383ec0f..a1183e7846 100644 --- a/src/node/services/sessionUsageService.test.ts +++ b/src/node/services/sessionUsageService.test.ts @@ -151,5 +151,49 @@ describe("SessionUsageService", () => { expect(result!.byModel["claude-sonnet-4-20250514"]).toBeDefined(); expect(result!.byModel["claude-sonnet-4-20250514"].input.tokens).toBe(100); }); + + it("should include historicalUsage from legacy compaction summaries", async () => { + const workspaceId = "test-workspace"; + + // Create a compaction summary with historicalUsage (legacy format) + const compactionSummary = createMuxMessage("summary-1", "assistant", "Compacted summary", { + historySequence: 1, + compacted: true, + model: "anthropic:claude-sonnet-4-5", + usage: { inputTokens: 100, outputTokens: 50, totalTokens: 150 }, + }); + + // Add historicalUsage - this field was removed from MuxMetadata type + // but may still exist in persisted data from before the change + (compactionSummary.metadata as Record).historicalUsage = createUsage( + 5000, + 1000 + ); + + // Add a post-compaction message + const postCompactionMsg = createMuxMessage("msg2", "assistant", "New response", { + historySequence: 2, + model: "anthropic:claude-sonnet-4-5", + usage: { inputTokens: 200, outputTokens: 75, totalTokens: 275 }, + }); + + mockHistoryService = createMockHistoryService([compactionSummary, postCompactionMsg]); + service = new SessionUsageService(config, mockHistoryService); + + // Create session dir but NOT the session-usage.json file (triggers rebuild) + await fs.mkdir(config.getSessionDir(workspaceId), { recursive: true }); + + const result = await service.getSessionUsage(workspaceId); + + expect(result).toBeDefined(); + // Should include historical usage under "historical" key + expect(result!.byModel.historical).toBeDefined(); + expect(result!.byModel.historical.input.tokens).toBe(5000); + expect(result!.byModel.historical.output.tokens).toBe(1000); + + // Should also include current model usage (compaction summary + post-compaction) + expect(result!.byModel["anthropic:claude-sonnet-4-5"]).toBeDefined(); + expect(result!.byModel["anthropic:claude-sonnet-4-5"].input.tokens).toBe(300); // 100 + 200 + }); }); }); diff --git a/src/node/services/sessionUsageService.ts b/src/node/services/sessionUsageService.ts index 932149da61..170e589e12 100644 --- a/src/node/services/sessionUsageService.ts +++ b/src/node/services/sessionUsageService.ts @@ -122,6 +122,18 @@ export class SessionUsageService { for (const msg of messages) { if (msg.role === "assistant") { + // Include historicalUsage from legacy compaction summaries. + // This field was removed from MuxMetadata but may exist in persisted data. + // It's a ChatUsageDisplay representing all pre-compaction costs (model-agnostic). + const historicalUsage = (msg.metadata as { historicalUsage?: ChatUsageDisplay }) + ?.historicalUsage; + if (historicalUsage) { + const existing = result.byModel.historical; + result.byModel.historical = existing + ? sumUsageHistory([existing, historicalUsage])! + : historicalUsage; + } + // Extract current message's usage if (msg.metadata?.usage) { const rawModel = msg.metadata.model ?? "unknown"; From 04b31df6b116d0638837a4df2a0360c3a83d9959 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 10 Dec 2025 18:26:07 +1100 Subject: [PATCH 5/6] fix: add getSessionUsage to Storybook mock client --- .storybook/mocks/orpc.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/.storybook/mocks/orpc.ts b/.storybook/mocks/orpc.ts index 837c49ccfc..a1b5fee1da 100644 --- a/.storybook/mocks/orpc.ts +++ b/.storybook/mocks/orpc.ts @@ -207,6 +207,7 @@ export function createMockORPCClient(options: MockORPCClientOptions = {}): APICl terminate: async () => ({ success: true, data: undefined }), sendToBackground: async () => ({ success: true, data: undefined }), }, + getSessionUsage: async () => undefined, }, window: { setTitle: async () => undefined, From 503fc83fd835675669a59e63263c7489abc9ea31 Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 11 Dec 2025 11:18:40 +1100 Subject: [PATCH 6/6] docs: clarify historicalUsage removal in compaction handler --- src/common/orpc/schemas/api.ts | 2 +- src/node/orpc/router.ts | 2 +- src/node/services/compactionHandler.ts | 11 ++++++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index 8133d75c95..a0c1daf95a 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -366,7 +366,7 @@ export const workspace = { output: ResultSchema(z.void(), z.string()), }, }, -/** + /** * Get post-compaction context state for a workspace. * Returns plan path (if exists) and tracked file paths that will be injected. */ diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index 0be2dfdeda..ad59086dd5 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -677,7 +677,7 @@ export const router = (authToken?: string) => { return { success: true, data: undefined }; }), }, -getPostCompactionState: t + getPostCompactionState: t .input(schemas.workspace.getPostCompactionState.input) .output(schemas.workspace.getPostCompactionState.output) .handler(({ context, input }) => { diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index 861a1f30c4..dfdd02d345 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -7,7 +7,7 @@ import type { Result } from "@/common/types/result"; import { Ok, Err } from "@/common/types/result"; import type { LanguageModelV2Usage } from "@ai-sdk/provider"; -import { createMuxMessage } from "@/common/types/message"; +import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import { log } from "@/node/services/log"; import { extractEditedFileDiffs, @@ -108,7 +108,7 @@ export class CompactionHandler { // Mark as processed before performing compaction this.processedCompactionRequestIds.add(lastUserMsg.id); - const result = await this.performCompaction(summary, event.metadata); + const result = await this.performCompaction(summary, event.metadata, messages); if (!result.success) { log.error("Compaction failed:", result.error); return false; @@ -136,7 +136,8 @@ export class CompactionHandler { duration?: number; providerMetadata?: Record; systemMessageTokens?: number; - } + }, + messages: MuxMessage[] ): Promise> { // CRITICAL: Delete partial.json BEFORE clearing history // This prevents a race condition where: @@ -163,6 +164,10 @@ export class CompactionHandler { // Create summary message with metadata. // We omit providerMetadata because it contains cacheCreationInputTokens from the // pre-compaction context, which inflates context usage display. + // Note: We no longer store historicalUsage here. Cumulative costs are tracked in + // session-usage.json, which is updated on every stream-end. If that file is deleted + // or corrupted, pre-compaction costs are lost - this is acceptable since manual + // file deletion is out of scope for data recovery. const summaryMessage = createMuxMessage( `summary-${Date.now()}-${Math.random().toString(36).substring(2, 11)}`, "assistant",