From 0bd4faf978501429cf1810e9f35f95485ba667f9 Mon Sep 17 00:00:00 2001 From: Contentrain Date: Sat, 16 May 2026 01:10:10 +0300 Subject: [PATCH] perf(ai): anthropic prompt cache + cache token accounting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit System prompt + tools were rebuilt and re-sent uncached on every request. Brain content index alone can grow past 10K tokens, so a typical 10-turn session was paying for the same prefix ten times. Anthropic's prompt cache cuts that prefix to ~10% of base input price when reused within 5 minutes; this PR wires the markers up. The naive shape — wrap the existing `buildSystemPrompt(...)` string in a cached block — would have miscached because `buildSchemaSection` embeds the active-model marker and `buildRulesSection` injects the out-of-scope rule based on intent. Cache markers placed over content that varies per request bust the prefix and pay the 1.25x creation penalty on every turn. The prompt builder is now actually split: - `buildStaticBody` role, architecture, config, schema (NO active-model marker), relations, vocab, permissions, base rules, custom instructions - `buildDynamicBody` UI context (active-model annotation lives here), inferred intent, project state, intent-specific rules (off-topic, etc.) - `buildContentIndex` already separate (brain cache); rendered as its own cached block `buildSystemPromptBlocks(...)` returns the three pieces; `toSystemBlocks(...)` materializes them as an `AISystemBlock[]` with `cache_control` markers on the static body and the content index (2 of Anthropic's 4 explicit breakpoints). The Studio chat handler and the Conversation API handler both compose the prompt this way and additionally tag the last AITool with `cache_control`, so the tools array gets the third breakpoint — tools rarely change within a session, very high hit rate. `AIProvider.system` accepts `string | AISystemBlock[]`; string callers (legacy paths, tests) get a single uncached block automatically. The Anthropic provider also captures `cache_creation_input_tokens` and `cache_read_input_tokens` from `message_start`/`message_delta` and surfaces them through a 4-field `AIUsage` shape. The engine accumulates all four buckets across the tool loop and forwards them on the `done` event. Persistence: Migration 008 ships as additive `_v2` RPCs and new columns: agent_usage / api_message_usage / messages + cache_creation_input_tokens + cache_read_input_tokens increment_agent_usage_tokens_v2 increment_api_usage_tokens_v2 `_v1` RPCs stay registered so a rolling deploy doesn't have to coordinate schema-and-app cutover. App code calls `_v2` exclusively; `_v1` becomes legacy and can be dropped in a future cleanup migration. `saveChatResult` / `saveApiChatResult` switched to object-form arguments — the positional list had grown unwieldy with four token fields plus ten other params. Business semantic preserved: cache is a Contentrain-side cost win. Plan quotas stay message-based; cache_read tokens DO NOT earn extra messages. `input_tokens` semantic is unchanged (= non-cached input), so existing dashboard queries summing it stay correct. Tests: - anthropic-ai: three-bucket stream-event capture; system-block + tools `cache_control` mapping to SDK shape. - agent-system-prompt-cache: static body is byte-identical across UI-context / intent / state changes (the actual cache-hit invariant); contentIndex separates; `toSystemBlocks` emits 2 cached blocks max so tools breakpoint stays available. - db: cache tokens propagate through `saveChatResult` to both `agent_usage` and the `messages` row. - chat-route / overage-soft-cap integration mocks updated to the new helper names and object-form save signature. Out of scope (separate follow-ups): - Message-level cache breakpoints (history mutates per turn — needs prefix-stability analysis). - 1-hour cache TTL beta. - Cache hit-rate dashboard UI. - History budget increase — defer until we have observed hit rates from this PR's accounting. --- ee/enterprise/conversation-api.ts | 50 +-- .../projects/[projectId]/chat.post.ts | 59 +++- server/providers/ai.ts | 55 +++- server/providers/anthropic-ai.ts | 104 +++++-- server/providers/database.ts | 20 +- server/providers/supabase-db/conversations.ts | 15 +- server/utils/agent-system-prompt.ts | 285 +++++++++++++----- server/utils/conversation-engine.ts | 23 +- server/utils/db.ts | 120 ++++---- .../migrations/008_cache_token_accounting.sql | 100 ++++++ .../chat-route.integration.test.ts | 39 +-- .../overage-soft-cap.integration.test.ts | 1 + tests/unit/agent-system-prompt-cache.test.ts | 214 +++++++++++++ tests/unit/anthropic-ai.test.ts | 117 ++++++- tests/unit/db.test.ts | 124 +++++--- 15 files changed, 1078 insertions(+), 248 deletions(-) create mode 100644 supabase/migrations/008_cache_token_accounting.sql create mode 100644 tests/unit/agent-system-prompt-cache.test.ts diff --git a/ee/enterprise/conversation-api.ts b/ee/enterprise/conversation-api.ts index 80a5574..ce8ab31 100644 --- a/ee/enterprise/conversation-api.ts +++ b/ee/enterprise/conversation-api.ts @@ -7,7 +7,7 @@ import type { ChatUIContext } from '../../server/utils/agent-types' import { toAITools } from '../../server/utils/agent-types' import { classifyIntent } from '../../server/utils/agent-context' import { deriveProjectPhase } from '../../server/utils/agent-state-machine' -import { buildSystemPrompt } from '../../server/utils/agent-system-prompt' +import { buildSystemPromptBlocks, toSystemBlocks } from '../../server/utils/agent-system-prompt' import { STUDIO_TOOLS, filterToolsByPermissions } from '../../server/utils/agent-tools' import { buildContentIndex, getOrBuildBrainCache } from '../../server/utils/brain-cache' import { createContentEngine } from '../../server/utils/content-engine' @@ -252,26 +252,30 @@ async function runConversationMessage( const phase = deriveProjectPhase(projectConfig, pendingBranches, project.status ?? 'active') const intent = classifyIntent(body.message, uiContext, phase) - let systemPrompt = buildSystemPrompt( + const contentIndex = buildContentIndex(brain) + const promptBlocks = buildSystemPromptBlocks( projectConfig, models, permissions, { initialized: !!projectConfig, pendingBranches, projectStatus: project.status ?? 'active', phase, contentContext }, uiContext, intent, + contentIndex || null, vocabulary, plan, keyData.customInstructions, ) - - const contentIndex = buildContentIndex(brain) - if (contentIndex) - systemPrompt += `\n\n${contentIndex}` + const systemPrompt = toSystemBlocks(promptBlocks) const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as typeof STUDIO_TOOLS const phaseFiltered = permissionFiltered.filter(tool => tool.requiredPhase.includes(phase)) const aiTools = toAITools(phaseFiltered) + // Cache the tools array — same rationale as the Studio chat path. + if (aiTools.length > 0) { + aiTools[aiTools.length - 1]!.cacheControl = { type: 'ephemeral' } + } + const runtimeConfig = useRuntimeConfig() const apiKey = runtimeConfig.anthropic.apiKey if (!apiKey) @@ -312,6 +316,8 @@ async function runConversationMessage( let responseText = '' let totalInputTokens = 0 let totalOutputTokens = 0 + let totalCacheCreationInputTokens = 0 + let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] for await (const evt of runConversationLoop( @@ -346,26 +352,32 @@ async function runConversationMessage( case 'tool_result': toolResults.push({ id: evt.id as string, name: evt.name as string, result: evt.result }) break - case 'done': - totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 - totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 + case 'done': { + const u = evt.usage as { inputTokens?: number, outputTokens?: number, cacheCreationInputTokens?: number, cacheReadInputTokens?: number } | undefined + totalInputTokens = u?.inputTokens ?? 0 + totalOutputTokens = u?.outputTokens ?? 0 + totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0 lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] break + } } } - await saveApiChatResult( + await saveApiChatResult({ conversationId, - body.message, - responseText, - lastAssistantContent, + userMessage: body.message, + assistantText: responseText, + assistantContent: lastAssistantContent, model, - totalInputTokens, - totalOutputTokens, - keyData.workspaceId, - keyData.keyId, + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, + workspaceId: keyData.workspaceId, + apiKeyId: keyData.keyId, usageMonth, - ) + }) return { conversationId, @@ -374,6 +386,8 @@ async function runConversationMessage( usage: { inputTokens: totalInputTokens, outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, }, } } diff --git a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts index 56d03c2..4b70fbd 100644 --- a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts +++ b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts @@ -189,19 +189,32 @@ export default defineEventHandler(async (event) => { contentContext, } - let systemPrompt = buildSystemPrompt(projectConfig, models, permissions, projectState, uiContext, intent, vocabulary, plan) - - // Append content index from brain cache (compact summary of all content) + // Build the system prompt as cache-aware blocks: the static body + // (role, architecture, schema, vocab, permissions, base rules, + // custom instructions) and the brain content index get their own + // `cache_control` markers; the dynamic block (UI context, intent, + // state) follows uncached so request-by-request changes never + // invalidate the cached prefix. const contentIndex = buildContentIndex(brain) - if (contentIndex) { - systemPrompt += `\n\n${contentIndex}` - } + const promptBlocks = buildSystemPromptBlocks( + projectConfig, models, permissions, projectState, uiContext, intent, + contentIndex || null, + vocabulary, plan, + ) + const systemPrompt = toSystemBlocks(promptBlocks) // === FILTER TOOLS by permissions + phase === const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as StudioTool[] const phaseFiltered = permissionFiltered.filter(t => t.requiredPhase.includes(phase)) const aiTools = toAITools(phaseFiltered) + // Mark the last tool with cache_control so Anthropic caches the + // entire tools block. Tools rarely change within a session — this + // is a high-hit-rate third breakpoint after the two system blocks. + if (aiTools.length > 0) { + aiTools[aiTools.length - 1]!.cacheControl = { type: 'ephemeral' } + } + // Workflow: plans without review feature always auto-merge regardless of config const configWorkflow = projectConfig?.workflow ?? 'auto-merge' const workflow = hasFeature(plan, 'workflow.review') ? configWorkflow : 'auto-merge' @@ -216,6 +229,8 @@ export default defineEventHandler(async (event) => { let totalInputTokens = 0 let totalOutputTokens = 0 + let totalCacheCreationInputTokens = 0 + let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] try { @@ -238,9 +253,15 @@ export default defineEventHandler(async (event) => { // Forward all events to SSE stream if (evt.type === 'done') { - // Extract final state from done event before forwarding - totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 - totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 + // Extract final state from done event before forwarding. + // Engine accumulates all four token buckets (regular + // input/output + cache creation + cache read) across + // streaming and non-streaming iterations. + const u = evt.usage as { inputTokens?: number, outputTokens?: number, cacheCreationInputTokens?: number, cacheReadInputTokens?: number } | undefined + totalInputTokens = u?.inputTokens ?? 0 + totalOutputTokens = u?.outputTokens ?? 0 + totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0 lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] // Forward the done event without lastContent (not needed by client) @@ -261,11 +282,21 @@ export default defineEventHandler(async (event) => { .map(b => (b as { text: string }).text) .join('') - await saveChatResult( - conversationId, body.message, assistantText, - lastAssistantContent, model, totalInputTokens, totalOutputTokens, - workspaceId, session.user.id, usageSource, usageMonth, - ) + await saveChatResult({ + conversationId, + userMessage: body.message, + assistantText, + assistantContent: lastAssistantContent, + model, + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, + workspaceId, + userId: session.user.id, + usageSource, + usageMonth, + }) // Webhook events are now emitted from conversation-engine.ts per tool execution } diff --git a/server/providers/ai.ts b/server/providers/ai.ts index d1d69b1..40ce3cd 100644 --- a/server/providers/ai.ts +++ b/server/providers/ai.ts @@ -13,6 +13,12 @@ export interface AITool { name: string description: string inputSchema: Record // JSON Schema + /** + * Provider-agnostic prompt cache marker. When set on the LAST tool + * in the array, supporting providers (Anthropic) cache the full + * tools block. Unsupported providers ignore the field. + */ + cacheControl?: AICacheControl } export interface AIMessage { @@ -25,6 +31,43 @@ export type AIContentBlock | { type: 'tool_use', id: string, name: string, input: unknown } | { type: 'tool_result', toolUseId: string, content: string } +/** + * Prompt cache marker. `ephemeral` is Anthropic's 5-minute TTL bucket. + * Provider-agnostic shape — providers that don't support prompt + * caching ignore the marker entirely and the request still works + * (just without cache benefits). + */ +export interface AICacheControl { + type: 'ephemeral' +} + +/** + * Structured system prompt block. Use the array form of + * `AICompletionRequest.system` to place cache breakpoints between + * blocks. A plain `string` is equivalent to a single uncached block + * and stays accepted for backward compatibility. + */ +export interface AISystemBlock { + type: 'text' + text: string + cacheControl?: AICacheControl +} + +/** + * Token usage on a single completion. Anthropic returns four disjoint + * buckets — `input_tokens` semantic is "non-cached input only," so + * the existing dashboards that sum it stay correct after this change. + * Total billable input cost (Contentrain-side) is approximately: + * inputTokens * 1x + cacheCreationInputTokens * 1.25x + cacheReadInputTokens * 0.1x + * at the base per-MTok price for the model. + */ +export interface AIUsage { + inputTokens: number + outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number +} + export interface AIStreamEvent { type: 'text' | 'tool_use_start' | 'tool_use_input' | 'tool_use_end' | 'message_end' | 'error' // text @@ -35,14 +78,20 @@ export interface AIStreamEvent { toolInput?: unknown // message_end stopReason?: 'end_turn' | 'tool_use' | 'max_tokens' - usage?: { inputTokens: number, outputTokens: number } + usage?: AIUsage // error error?: string } export interface AICompletionRequest { model: string - system: string + /** + * String form is treated as a single uncached system block. Array + * form lets callers place cache breakpoints between blocks; up to + * 4 cache_control markers are honored per request (Anthropic + * limit), shared across system + tools + messages. + */ + system: string | AISystemBlock[] messages: AIMessage[] tools: AITool[] maxTokens: number @@ -52,7 +101,7 @@ export interface AICompletionRequest { export interface AICompletionResponse { content: AIContentBlock[] stopReason: 'end_turn' | 'tool_use' | 'max_tokens' - usage: { inputTokens: number, outputTokens: number } + usage: AIUsage } export interface AIProvider { diff --git a/server/providers/anthropic-ai.ts b/server/providers/anthropic-ai.ts index 4c9e45b..16e0abc 100644 --- a/server/providers/anthropic-ai.ts +++ b/server/providers/anthropic-ai.ts @@ -5,6 +5,9 @@ import type { AIContentBlock, AIProvider, AIStreamEvent, + AISystemBlock, + AITool, + AIUsage, } from './ai' /** @@ -12,6 +15,11 @@ import type { * * Uses @anthropic-ai/sdk for streaming and tool use. * Normalizes Anthropic-specific formats to Studio's standard events. + * + * Prompt cache: `AISystemBlock.cacheControl` and `AITool.cacheControl` + * map to Anthropic's `cache_control: { type: 'ephemeral' }`. The SDK + * returns three input-token buckets in `usage` (input, cache_creation, + * cache_read); all four are forwarded through `AIUsage`. */ export function createAnthropicProvider(): AIProvider { return { @@ -23,7 +31,7 @@ export function createAnthropicProvider(): AIProvider { const stream = client.messages.stream({ model: request.model, - system: request.system, + system: toAnthropicSystem(request.system), messages: toAnthropicMessages(request.messages), tools: toAnthropicTools(request.tools), max_tokens: request.maxTokens, @@ -32,9 +40,36 @@ export function createAnthropicProvider(): AIProvider { let currentToolId: string | undefined let currentToolName: string | undefined let currentToolInput = '' + // `message_start` carries the initial usage snapshot (input, + // cache_creation, cache_read). `message_delta` then carries the + // final output_tokens and any final usage adjustments. We + // accumulate as events arrive and emit a single normalized + // usage payload at message_end. + const usage: AIUsage = { + inputTokens: 0, + outputTokens: 0, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + } + let stopReason: AIStreamEvent['stopReason'] for await (const event of stream) { switch (event.type) { + case 'message_start': + if ('message' in event && event.message?.usage) { + const m = event.message.usage as { + input_tokens?: number + output_tokens?: number + cache_creation_input_tokens?: number + cache_read_input_tokens?: number + } + usage.inputTokens = m.input_tokens ?? 0 + usage.outputTokens = m.output_tokens ?? 0 + usage.cacheCreationInputTokens = m.cache_creation_input_tokens ?? 0 + usage.cacheReadInputTokens = m.cache_read_input_tokens ?? 0 + } + break + case 'content_block_start': if (event.content_block.type === 'text') { // Text block starting — nothing to emit yet @@ -86,19 +121,25 @@ export function createAnthropicProvider(): AIProvider { } break - case 'message_stop': - break - case 'message_delta': if ('usage' in event) { - yield { - type: 'message_end', - stopReason: (event.delta as { stop_reason?: string }).stop_reason as AIStreamEvent['stopReason'], - usage: { - inputTokens: (event.usage as { input_tokens?: number }).input_tokens ?? 0, - outputTokens: (event.usage as { output_tokens?: number }).output_tokens ?? 0, - }, - } + const d = event.usage as { output_tokens?: number, input_tokens?: number, cache_creation_input_tokens?: number, cache_read_input_tokens?: number } + // Anthropic spec: message_delta carries output_tokens + // (and sometimes a refined input bucket). Add to what + // message_start already captured rather than overwriting. + if (typeof d.output_tokens === 'number') usage.outputTokens = d.output_tokens + if (typeof d.input_tokens === 'number') usage.inputTokens = d.input_tokens + if (typeof d.cache_creation_input_tokens === 'number') usage.cacheCreationInputTokens = d.cache_creation_input_tokens + if (typeof d.cache_read_input_tokens === 'number') usage.cacheReadInputTokens = d.cache_read_input_tokens + } + stopReason = (event.delta as { stop_reason?: string }).stop_reason as AIStreamEvent['stopReason'] + break + + case 'message_stop': + yield { + type: 'message_end', + stopReason, + usage, } break } @@ -113,24 +154,46 @@ export function createAnthropicProvider(): AIProvider { const response = await client.messages.create({ model: request.model, - system: request.system, + system: toAnthropicSystem(request.system), messages: toAnthropicMessages(request.messages), tools: toAnthropicTools(request.tools), max_tokens: request.maxTokens, }, { signal: request.abortSignal }) + const respUsage = response.usage as { + input_tokens: number + output_tokens: number + cache_creation_input_tokens?: number + cache_read_input_tokens?: number + } + return { content: fromAnthropicContent(response.content), stopReason: response.stop_reason as AICompletionResponse['stopReason'], usage: { - inputTokens: response.usage.input_tokens, - outputTokens: response.usage.output_tokens, + inputTokens: respUsage.input_tokens, + outputTokens: respUsage.output_tokens, + cacheCreationInputTokens: respUsage.cache_creation_input_tokens ?? 0, + cacheReadInputTokens: respUsage.cache_read_input_tokens ?? 0, }, } }, } } +/** + * Convert Studio system input (string or block array) to Anthropic's + * `string | TextBlockParam[]`. Cache markers map directly. + */ +function toAnthropicSystem(system: string | AISystemBlock[]): string | Anthropic.TextBlockParam[] { + if (typeof system === 'string') return system + return system.map(block => ({ + type: 'text' as const, + text: block.text, + ...(block.cacheControl ? { cache_control: { type: 'ephemeral' as const } } : {}), + })) +} + /** * Convert Studio messages to Anthropic format. */ @@ -159,13 +222,18 @@ function toAnthropicMessages(messages: AICompletionRequest['messages']): Anthrop } /** - * Convert Studio tools to Anthropic format. + * Convert Studio tools to Anthropic format. The optional `cacheControl` + * on a tool maps to Anthropic's `cache_control` and is honored only on + * the last tool — placing it earlier would split the cached prefix and + * cost extra creation writes. Studio callers conventionally mark only + * the final tool. */ -function toAnthropicTools(tools: AICompletionRequest['tools']): Anthropic.Tool[] { - return tools.map(tool => ({ +function toAnthropicTools(tools: AITool[]): Anthropic.Tool[] { + return tools.map((tool): Anthropic.Tool => ({ name: tool.name, description: tool.description, input_schema: tool.inputSchema as Anthropic.Tool['input_schema'], + ...(tool.cacheControl ? { cache_control: { type: 'ephemeral' as const } } : {}), })) } diff --git a/server/providers/database.ts b/server/providers/database.ts index fa704a3..4f85c80 100644 --- a/server/providers/database.ts +++ b/server/providers/database.ts @@ -285,6 +285,8 @@ export interface DatabaseProvider { toolCalls?: unknown[] | null tokenCountInput?: number tokenCountOutput?: number + cacheCreationInputTokens?: number + cacheReadInputTokens?: number model?: string }) => Promise @@ -318,7 +320,13 @@ export interface DatabaseProvider { limit: number }) => Promise<{ allowed: boolean, currentCount: number }> - /** Update token counts on an existing agent_usage row (after chat completes). */ + /** + * Update token counts on an existing `agent_usage` row after the + * chat completes. Cache token fields are required (callers pass 0 + * when the provider didn't report cache usage) so the call shape + * stays stable and the underlying `_v2` RPC always receives all + * four values. + */ updateAgentUsageTokens: (input: { workspaceId: string userId: string @@ -326,6 +334,8 @@ export interface DatabaseProvider { source: string inputTokens: number outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number }) => Promise /** @@ -366,13 +376,19 @@ export interface DatabaseProvider { current: number }> - /** Increment token counters on the `api_message_usage` row after the AI call settles. */ + /** + * Increment token counters on the `api_message_usage` row after the + * AI call settles. Cache token fields are required for the same + * stability reason as `updateAgentUsageTokens`. + */ updateAPIUsageTokens: (input: { workspaceId: string apiKeyId: string month: string inputTokens: number outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number }) => Promise /** diff --git a/server/providers/supabase-db/conversations.ts b/server/providers/supabase-db/conversations.ts index f09fc08..f06dd6b 100644 --- a/server/providers/supabase-db/conversations.ts +++ b/server/providers/supabase-db/conversations.ts @@ -134,6 +134,8 @@ export function conversationMethods(): ConversationMethods { if (input.toolCalls) row.tool_calls = input.toolCalls if (input.tokenCountInput) row.token_count_input = input.tokenCountInput if (input.tokenCountOutput) row.token_count_output = input.tokenCountOutput + if (input.cacheCreationInputTokens) row.cache_creation_input_tokens = input.cacheCreationInputTokens + if (input.cacheReadInputTokens) row.cache_read_input_tokens = input.cacheReadInputTokens if (input.model) row.model = input.model await admin.from('messages').insert(row) @@ -236,14 +238,19 @@ export function conversationMethods(): ConversationMethods { async updateAgentUsageTokens(input) { const admin = getAdmin() - // Use RPC for atomic token increment to prevent concurrent overwrites - await admin.rpc('increment_agent_usage_tokens', { + // _v2 RPC carries the cache token columns alongside the base + // input/output counters; the legacy _v1 entry point stays + // registered for rolling-deploy safety but is no longer called + // from app code. + await admin.rpc('increment_agent_usage_tokens_v2', { p_workspace_id: input.workspaceId, p_user_id: input.userId, p_month: input.month, p_source: input.source, p_input_tokens: input.inputTokens, p_output_tokens: input.outputTokens, + p_cache_creation_input_tokens: input.cacheCreationInputTokens, + p_cache_read_input_tokens: input.cacheReadInputTokens, }) }, @@ -282,12 +289,14 @@ export function conversationMethods(): ConversationMethods { async updateAPIUsageTokens(input) { const admin = getAdmin() - await admin.rpc('increment_api_usage_tokens', { + await admin.rpc('increment_api_usage_tokens_v2', { p_workspace_id: input.workspaceId, p_api_key_id: input.apiKeyId, p_month: input.month, p_input_tokens: input.inputTokens, p_output_tokens: input.outputTokens, + p_cache_creation_input_tokens: input.cacheCreationInputTokens, + p_cache_read_input_tokens: input.cacheReadInputTokens, }) }, diff --git a/server/utils/agent-system-prompt.ts b/server/utils/agent-system-prompt.ts index ed3a6b9..47b9541 100644 --- a/server/utils/agent-system-prompt.ts +++ b/server/utils/agent-system-prompt.ts @@ -1,4 +1,5 @@ import type { ModelDefinition, ContentrainConfig, FieldDef } from '@contentrain/types' +import type { AISystemBlock } from '../providers/ai' import type { Branch } from '../providers/git' import type { AgentPermissions } from './agent-permissions' import type { ChatUIContext, ClassifiedIntent, ProjectPhase } from './agent-types' @@ -6,8 +7,22 @@ import type { ChatUIContext, ClassifiedIntent, ProjectPhase } from './agent-type /** * Bounded Task Executor system prompt. * - * Structure: Role → Contentrain Architecture → UI Context → Intent → State → Schema → Permissions → Rules - * Each section is purpose-built to constrain the agent's behavior. + * Two ways to consume: + * + * - `buildSystemPrompt(...)` returns a single concatenated string, + * preserved for callers that don't want prompt-cache markers + * (legacy paths, tests, alternative providers). + * + * - `buildSystemPromptBlocks(...)` returns a `static` / + * `contentIndex` / `dynamic` split shaped for Anthropic's + * `cache_control` breakpoints. The caller assembles the final + * `AISystemBlock[]` and places markers on the first two blocks. + * + * Static/dynamic split rule: a section is "static" only if its + * content is byte-identical across requests within the same project + * (modulo brain refreshes). Anything keyed on `uiContext`, `intent`, + * or `state` lives in the dynamic block so a single-character change + * doesn't invalidate the cached prefix. */ export interface ProjectState { @@ -19,29 +34,103 @@ export interface ProjectState { contentContext?: Record | null } -export function buildSystemPrompt( +/** + * Static prompt body — content that does NOT vary with `uiContext`, + * `intent`, or `state`. Safe to wrap in a cached system block. + */ +function buildStaticBody( config: ContentrainConfig | null, models: ModelDefinition[], permissions: AgentPermissions, - state: ProjectState, - uiContext: ChatUIContext, - intent: ClassifiedIntent, vocabulary?: Record> | null, plan?: import('./license').Plan, customInstructions?: string | null, ): string { const sections: string[] = [] - // 1. ROLE — strict, bounded + // ROLE sections.push(agentPrompt('role.definition')) - // 2. CONTENTRAIN ARCHITECTURE — the agent must know this to work correctly + // CONTENTRAIN ARCHITECTURE sections.push(buildArchitectureSection()) - // 3. UI CONTEXT — what the user is looking at RIGHT NOW + // CONFIG + if (config) { + sections.push(`## Configuration +- Stack: ${config.stack} +- Locales: ${config.locales.supported.join(', ')} (default: ${config.locales.default}) +- Domains: ${config.domains.join(', ')} +- Workflow: ${config.workflow}`) + } + + // SCHEMA — model list without active-model marker + if (models.length > 0) { + sections.push(buildSchemaSection(models)) + } + + // RELATION GRAPH + const relationGraph = buildRelationGraph(models) + if (relationGraph) { + sections.push(relationGraph) + } + + // VOCABULARY + if (vocabulary && Object.keys(vocabulary).length > 0) { + const termCount = Object.keys(vocabulary).length + const sampleTerms = Object.entries(vocabulary).slice(0, 10) + const termLines = sampleTerms.map(([key, translations]) => { + const locales = Object.entries(translations).map(([l, v]) => `${l}: "${v}"`).join(', ') + return ` - ${key}: ${locales}` + }) + let vocabSection = `## Vocabulary (${termCount} terms)\nShared terminology from .contentrain/vocabulary.json:\n${termLines.join('\n')}` + if (termCount > 10) { + vocabSection += `\n ... and ${termCount - 10} more terms` + } + sections.push(vocabSection) + } + + // PERMISSIONS + const roleDisplay = permissions.projectRole + ? `${permissions.workspaceRole} / ${permissions.projectRole}` + : permissions.workspaceRole + + sections.push(`## Permissions +- Role: ${roleDisplay} +- Available tools: ${permissions.availableTools.join(', ')}${ + permissions.specificModels + ? `\n- Model access restricted to: ${permissions.allowedModels.join(', ')}` + : '' +}`) + + // BASE RULES — intent-independent + sections.push(buildBaseRulesSection(config, permissions, plan)) + + // CUSTOM INSTRUCTIONS (per Conversation API key, stable across the key's lifetime) + if (customInstructions) { + sections.push(`### Custom Instructions (from project admin)\n${customInstructions}`) + } + + return sections.join('\n\n') +} + +/** + * Dynamic prompt body — UI context, intent, state, intent-specific + * rules. Anything that changes per request lives here so the cache + * marker on the static block stays valid. + */ +function buildDynamicBody( + models: ModelDefinition[], + state: ProjectState, + uiContext: ChatUIContext, + intent: ClassifiedIntent, + config: ContentrainConfig | null, +): string { + const sections: string[] = [] + + // UI CONTEXT — what the user is looking at RIGHT NOW (includes active model annotation) sections.push(buildContextSection(uiContext, models, config)) - // 4. INFERRED INTENT + // INFERRED INTENT if (intent.category !== 'out_of_scope') { const inferredLines: string[] = [`## Inferred Intent: ${intent.category}`] if (intent.inferred.modelId) inferredLines.push(`Default model: ${intent.inferred.modelId}`) @@ -53,7 +142,7 @@ export function buildSystemPrompt( sections.push(inferredLines.join('\n')) } - // 5. PROJECT STATE + // PROJECT STATE const stateLines: string[] = ['## Project State'] stateLines.push(`- Phase: ${state.phase}`) stateLines.push(`- Initialized: ${state.initialized ? 'YES' : 'NO'}`) @@ -65,7 +154,6 @@ export function buildSystemPrompt( } } - // Context.json — last operation tracking if (state.contentContext) { const lastOp = state.contentContext.lastOperation as { tool?: string, model?: string, locale?: string, timestamp?: string } | undefined const stats = state.contentContext.stats as { models?: number, entries?: number, locales?: string[] } | undefined @@ -86,63 +174,95 @@ export function buildSystemPrompt( sections.push(stateLines.join('\n')) - // 6. PROJECT CONFIG - if (config) { - sections.push(`## Configuration -- Stack: ${config.stack} -- Locales: ${config.locales.supported.join(', ')} (default: ${config.locales.default}) -- Domains: ${config.domains.join(', ')} -- Workflow: ${config.workflow}`) - } + // INTENT-SPECIFIC RULES (out-of-scope, etc.) + const intentRules = buildIntentRulesSection(intent) + if (intentRules) sections.push(intentRules) - // 7. SCHEMA — full detail for all models with relation graph - if (models.length > 0) { - sections.push(buildSchemaSection(models, uiContext)) - } + return sections.join('\n\n') +} - // 8. RELATION GRAPH — cross-model references - const relationGraph = buildRelationGraph(models) - if (relationGraph) { - sections.push(relationGraph) - } +/** + * Structured system prompt split into prompt-cache friendly blocks. + * + * - `static`: stable across requests; safe behind a `cache_control` + * marker (Block 1). + * - `contentIndex`: brain content index, refreshes on its own TTL; + * gets its own cache breakpoint (Block 2) so a schema change + * doesn't invalidate the content cache and vice versa. + * - `dynamic`: UI context / intent / state — must come after the + * cache breakpoints so request-by-request changes don't break the + * cached prefix. + * + * Callers compose `AISystemBlock[]` and place markers on the first + * two blocks. The `contentIndex` field is `null` when the brain + * hasn't produced one. + */ +export interface SystemPromptBlocks { + static: string + contentIndex: string | null + dynamic: string +} - // 9. VOCABULARY — shared terminology across locales - if (vocabulary && Object.keys(vocabulary).length > 0) { - const termCount = Object.keys(vocabulary).length - const sampleTerms = Object.entries(vocabulary).slice(0, 10) - const termLines = sampleTerms.map(([key, translations]) => { - const locales = Object.entries(translations).map(([l, v]) => `${l}: "${v}"`).join(', ') - return ` - ${key}: ${locales}` - }) - let vocabSection = `## Vocabulary (${termCount} terms)\nShared terminology from .contentrain/vocabulary.json:\n${termLines.join('\n')}` - if (termCount > 10) { - vocabSection += `\n ... and ${termCount - 10} more terms` - } - sections.push(vocabSection) +export function buildSystemPromptBlocks( + config: ContentrainConfig | null, + models: ModelDefinition[], + permissions: AgentPermissions, + state: ProjectState, + uiContext: ChatUIContext, + intent: ClassifiedIntent, + contentIndex: string | null, + vocabulary?: Record> | null, + plan?: import('./license').Plan, + customInstructions?: string | null, +): SystemPromptBlocks { + return { + static: buildStaticBody(config, models, permissions, vocabulary, plan, customInstructions), + contentIndex: contentIndex && contentIndex.trim() ? contentIndex : null, + dynamic: buildDynamicBody(models, state, uiContext, intent, config), } +} - // 9. PERMISSIONS - const roleDisplay = permissions.projectRole - ? `${permissions.workspaceRole} / ${permissions.projectRole}` - : permissions.workspaceRole - - sections.push(`## Permissions -- Role: ${roleDisplay} -- Available tools: ${permissions.availableTools.join(', ')}${ - permissions.specificModels - ? `\n- Model access restricted to: ${permissions.allowedModels.join(', ')}` - : '' -}`) - - // 10. RULES — hardened, workflow-aware, architecture-aware, role-aware, plan-aware - sections.push(buildRulesSection(config, intent, permissions, plan)) - - // Custom instructions (Conversation API keys) - if (customInstructions) { - sections.push(`### Custom Instructions (from project admin)\n${customInstructions}`) +/** + * Materialize the cache-aware blocks as an `AISystemBlock[]` ready + * to hand to `AIProvider`. The first two blocks (static + brain + * content index) get `cache_control` markers; the dynamic block + * stays uncached so request-level changes don't poison the prefix. + */ +export function toSystemBlocks(prompt: SystemPromptBlocks): AISystemBlock[] { + const blocks: AISystemBlock[] = [] + blocks.push({ type: 'text', text: prompt.static, cacheControl: { type: 'ephemeral' } }) + if (prompt.contentIndex) { + blocks.push({ type: 'text', text: prompt.contentIndex, cacheControl: { type: 'ephemeral' } }) } + if (prompt.dynamic.trim()) { + blocks.push({ type: 'text', text: prompt.dynamic }) + } + return blocks +} - return sections.join('\n\n') +/** + * Legacy single-string composition, preserved for callers that don't + * want cache markers (alternative providers, certain test paths). + * Equivalent to `buildSystemPromptBlocks(...)` concatenated, no + * cache_control markers — semantically identical to the old function. + */ +export function buildSystemPrompt( + config: ContentrainConfig | null, + models: ModelDefinition[], + permissions: AgentPermissions, + state: ProjectState, + uiContext: ChatUIContext, + intent: ClassifiedIntent, + vocabulary?: Record> | null, + plan?: import('./license').Plan, + customInstructions?: string | null, +): string { + const blocks = buildSystemPromptBlocks( + config, models, permissions, state, uiContext, intent, + null, // contentIndex appended by caller via `${prompt}\n\n${contentIndex}` + vocabulary, plan, customInstructions, + ) + return [blocks.static, blocks.dynamic].filter(Boolean).join('\n\n') } // ─── Architecture Section ─── @@ -162,19 +282,18 @@ function buildArchitectureSection(): string { // ─── Schema Section ─── -function buildSchemaSection(models: ModelDefinition[], uiContext: ChatUIContext): string { - const activeModel = uiContext.activeModelId - ? models.find(m => m.id === uiContext.activeModelId) - : null - +/** + * Pure model-list rendering — no active-model marker. The active + * model annotation lives in the dynamic UI Context block so the + * schema itself stays byte-identical across requests and the + * cache_control marker placed on the static system block can + * actually hit. + */ +function buildSchemaSection(models: ModelDefinition[]): string { const lines: string[] = ['## Content Schema'] - // Show ALL models with full field details (not just active one) for (const model of models) { - const isActive = model.id === activeModel?.id - const prefix = isActive ? '### ▶ ' : '### ' - - lines.push(`${prefix}${model.name} (\`${model.id}\`)`) + lines.push(`### ${model.name} (\`${model.id}\`)`) lines.push(`Kind: ${model.kind}, domain: ${model.domain}, i18n: ${model.i18n}`) if (model.fields && Object.keys(model.fields).length > 0) { @@ -334,7 +453,12 @@ function buildContextSection( // ─── Rules Section ─── -function buildRulesSection(config: ContentrainConfig | null, intent: ClassifiedIntent, permissions: AgentPermissions, plan?: import('./license').Plan): string { +/** + * Base rules block — depends on `config` (project-stable), + * `permissions` (role-stable per request), and `plan` (workspace- + * stable). No intent dependency, so safe for the cached prefix. + */ +function buildBaseRulesSection(config: ContentrainConfig | null, permissions: AgentPermissions, plan?: import('./license').Plan): string { const effectivePlan = plan ?? 'starter' const workflow = config?.workflow ?? 'auto-merge' const isPrivileged = permissions.workspaceRole === 'owner' || permissions.workspaceRole === 'admin' @@ -412,10 +536,17 @@ function buildRulesSection(config: ContentrainConfig | null, intent: ClassifiedI } rules.push(agentPrompt('plan.tiers', tierParams)) - // Out of scope + return `## Rules\n${rules.map(r => `- ${r}`).join('\n')}` +} + +/** + * Intent-dependent rules. Returns `null` when no intent-specific rule + * applies (the common case), so the caller can omit the section + * entirely and keep the dynamic block minimal. + */ +function buildIntentRulesSection(intent: ClassifiedIntent): string | null { if (intent.category === 'out_of_scope') { - rules.push(agentPrompt('rules.off_topic')) + return `## Additional Rules\n- ${agentPrompt('rules.off_topic')}` } - - return `## Rules\n${rules.map(r => `- ${r}`).join('\n')}` + return null } diff --git a/server/utils/conversation-engine.ts b/server/utils/conversation-engine.ts index c4b259d..7fe3254 100644 --- a/server/utils/conversation-engine.ts +++ b/server/utils/conversation-engine.ts @@ -1,5 +1,5 @@ import type { ModelDefinition } from '@contentrain/types' -import type { AIMessage, AIContentBlock, AITool } from '~~/server/providers/ai' +import type { AIMessage, AIContentBlock, AISystemBlock, AITool } from '~~/server/providers/ai' import type { ChatUIContext, AffectedResources, ProjectPhase } from '~~/server/utils/agent-types' import type { AgentPermissions } from '~~/server/utils/agent-permissions' @@ -29,7 +29,13 @@ export interface ConversationEvent { export interface ConversationConfig { model: string apiKey: string - systemPrompt: string + /** + * Either a plain string (single uncached system block) or an array + * of `AISystemBlock`s with optional cache markers. The engine + * forwards the value verbatim to the provider — callers that want + * prompt-cache hits build their blocks via `buildSystemPromptBlocks`. + */ + systemPrompt: string | AISystemBlock[] messages: AIMessage[] tools: AITool[] maxToolIterations?: number @@ -84,6 +90,8 @@ export async function* runConversationLoop( let totalInputTokens = 0 let totalOutputTokens = 0 + let totalCacheCreationInputTokens = 0 + let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] let accumulatedAffected: AffectedResources = emptyAffected() @@ -142,6 +150,8 @@ export async function* runConversationLoop( flushText() totalInputTokens += streamEvent.usage?.inputTokens ?? 0 totalOutputTokens += streamEvent.usage?.outputTokens ?? 0 + totalCacheCreationInputTokens += streamEvent.usage?.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens += streamEvent.usage?.cacheReadInputTokens ?? 0 stopReason = streamEvent.stopReason break case 'error': @@ -157,6 +167,8 @@ export async function* runConversationLoop( ) totalInputTokens += response.usage.inputTokens totalOutputTokens += response.usage.outputTokens + totalCacheCreationInputTokens += response.usage.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens += response.usage.cacheReadInputTokens ?? 0 stopReason = response.stopReason for (const block of response.content) { @@ -224,7 +236,12 @@ export async function* runConversationLoop( // === DONE with affected resources === yield { type: 'done', - usage: { inputTokens: totalInputTokens, outputTokens: totalOutputTokens }, + usage: { + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, + }, affected: accumulatedAffected, lastContent: lastAssistantContent, } diff --git a/server/utils/db.ts b/server/utils/db.ts index 866be47..d98b798 100644 --- a/server/utils/db.ts +++ b/server/utils/db.ts @@ -227,6 +227,8 @@ async function persistChatMessages(input: { model: string inputTokens: number outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number }) { const db = useDatabaseProvider() await db.insertMessage({ conversationId: input.conversationId, role: 'user', content: input.userMessage }) @@ -238,6 +240,8 @@ async function persistChatMessages(input: { toolCalls: input.assistantContent.length > 0 ? input.assistantContent : null, tokenCountInput: input.inputTokens, tokenCountOutput: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, model: input.model, }) } @@ -258,41 +262,47 @@ async function persistChatMessages(input: { * Conversation API has its own actor model (key-keyed, not user-keyed) * and a dedicated `api_message_usage` table — use `saveApiChatResult`. */ -export async function saveChatResult( - conversationId: string, - userMessage: string, - assistantText: string, - assistantContent: unknown[], - model: string, - inputTokens: number, - outputTokens: number, - workspaceId: string, - userId: string, - usageSource: 'byoa' | 'studio', - usageMonth: string, -) { +export async function saveChatResult(input: { + conversationId: string + userMessage: string + assistantText: string + assistantContent: unknown[] + model: string + inputTokens: number + outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number + workspaceId: string + userId: string + usageSource: 'byoa' | 'studio' + usageMonth: string +}) { const db = useDatabaseProvider() await persistChatMessages({ - conversationId, - userMessage, - assistantText, - assistantContent, - model, - inputTokens, - outputTokens, + conversationId: input.conversationId, + userMessage: input.userMessage, + assistantText: input.assistantText, + assistantContent: input.assistantContent, + model: input.model, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) await db.updateAgentUsageTokens({ - workspaceId, - userId, - month: usageMonth, - source: usageSource, - inputTokens, - outputTokens, + workspaceId: input.workspaceId, + userId: input.userId, + month: input.usageMonth, + source: input.usageSource, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) - await db.updateConversationTimestamp(conversationId) + await db.updateConversationTimestamp(input.conversationId) } /** @@ -300,37 +310,43 @@ export async function saveChatResult( * chat. The actor here is an API key (no workspace member identity), * so usage flows through `api_message_usage` instead of `agent_usage`. */ -export async function saveApiChatResult( - conversationId: string, - userMessage: string, - assistantText: string, - assistantContent: unknown[], - model: string, - inputTokens: number, - outputTokens: number, - workspaceId: string, - apiKeyId: string, - usageMonth: string, -) { +export async function saveApiChatResult(input: { + conversationId: string + userMessage: string + assistantText: string + assistantContent: unknown[] + model: string + inputTokens: number + outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number + workspaceId: string + apiKeyId: string + usageMonth: string +}) { const db = useDatabaseProvider() await persistChatMessages({ - conversationId, - userMessage, - assistantText, - assistantContent, - model, - inputTokens, - outputTokens, + conversationId: input.conversationId, + userMessage: input.userMessage, + assistantText: input.assistantText, + assistantContent: input.assistantContent, + model: input.model, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) await db.updateAPIUsageTokens({ - workspaceId, - apiKeyId, - month: usageMonth, - inputTokens, - outputTokens, + workspaceId: input.workspaceId, + apiKeyId: input.apiKeyId, + month: input.usageMonth, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) - await db.updateConversationTimestamp(conversationId) + await db.updateConversationTimestamp(input.conversationId) } diff --git a/supabase/migrations/008_cache_token_accounting.sql b/supabase/migrations/008_cache_token_accounting.sql new file mode 100644 index 0000000..81ddcf2 --- /dev/null +++ b/supabase/migrations/008_cache_token_accounting.sql @@ -0,0 +1,100 @@ +-- Anthropic prompt cache token accounting. +-- +-- Anthropic's prompt cache splits an input call into three token +-- buckets in `response.usage`: +-- +-- - input_tokens non-cached input, 1.0x base price +-- - cache_creation_input_tokens written to cache this turn, 1.25x +-- - cache_read_input_tokens served from cache this turn, 0.1x +-- +-- The Studio billing model stays message-based (`ai.messages_per_month`, +-- `api.messages_per_month`) — cache tokens are observability for +-- Contentrain's cost-of-goods analysis, never customer-facing quota. +-- A workspace that benefits from cache hits does NOT earn extra +-- messages; gross margin improves silently. +-- +-- Schema choices: +-- 1. Separate columns mirror the existing `input_tokens`/`output_tokens` +-- pattern. Trivial to query, no jsonb gymnastics. +-- 2. `input_tokens` semantic is UNCHANGED — still "non-cached input +-- tokens." Anthropic returns the buckets disjoint, so existing +-- dashboards built on `input_tokens` keep their meaning. +-- 3. RPCs ship as `_v2` rather than mutating signatures in place. +-- Mid-deploy, the old app version + new RPC schema (or vice +-- versa) is a real failure mode on Supabase / PostgREST. The v1 +-- RPCs stay registered and unused for safety; a follow-up +-- cleanup migration can drop them once every deploy is on v2. + +-- ───────────────────────────────────────────────────────────────── +-- 1. Columns +-- ───────────────────────────────────────────────────────────────── + +ALTER TABLE public.agent_usage + ADD COLUMN cache_creation_input_tokens bigint NOT NULL DEFAULT 0, + ADD COLUMN cache_read_input_tokens bigint NOT NULL DEFAULT 0; + +ALTER TABLE public.api_message_usage + ADD COLUMN cache_creation_input_tokens bigint NOT NULL DEFAULT 0, + ADD COLUMN cache_read_input_tokens bigint NOT NULL DEFAULT 0; + +ALTER TABLE public.messages + ADD COLUMN cache_creation_input_tokens integer NOT NULL DEFAULT 0, + ADD COLUMN cache_read_input_tokens integer NOT NULL DEFAULT 0; + +-- ───────────────────────────────────────────────────────────────── +-- 2. v2 token increment RPCs +-- ───────────────────────────────────────────────────────────────── + +CREATE FUNCTION public.increment_agent_usage_tokens_v2( + p_workspace_id uuid, + p_user_id uuid, + p_month text, + p_source text, + p_input_tokens bigint, + p_output_tokens bigint, + p_cache_creation_input_tokens bigint, + p_cache_read_input_tokens bigint +) RETURNS void +LANGUAGE plpgsql SECURITY DEFINER +SET search_path TO '' +AS $$ +BEGIN + UPDATE public.agent_usage + SET + input_tokens = input_tokens + p_input_tokens, + output_tokens = output_tokens + p_output_tokens, + cache_creation_input_tokens = cache_creation_input_tokens + p_cache_creation_input_tokens, + cache_read_input_tokens = cache_read_input_tokens + p_cache_read_input_tokens, + updated_at = now() + WHERE workspace_id = p_workspace_id + AND user_id = p_user_id + AND month = p_month + AND source = p_source; +END; +$$; + +CREATE FUNCTION public.increment_api_usage_tokens_v2( + p_workspace_id uuid, + p_api_key_id uuid, + p_month text, + p_input_tokens bigint, + p_output_tokens bigint, + p_cache_creation_input_tokens bigint, + p_cache_read_input_tokens bigint +) RETURNS void +LANGUAGE plpgsql SECURITY DEFINER +SET search_path TO '' +AS $$ +BEGIN + UPDATE public.api_message_usage + SET + input_tokens = input_tokens + p_input_tokens, + output_tokens = output_tokens + p_output_tokens, + cache_creation_input_tokens = cache_creation_input_tokens + p_cache_creation_input_tokens, + cache_read_input_tokens = cache_read_input_tokens + p_cache_read_input_tokens, + updated_at = now() + WHERE workspace_id = p_workspace_id + AND api_key_id = p_api_key_id + AND month = p_month; +END; +$$; diff --git a/tests/integration/chat-route.integration.test.ts b/tests/integration/chat-route.integration.test.ts index 7b6b6e4..5af805c 100644 --- a/tests/integration/chat-route.integration.test.ts +++ b/tests/integration/chat-route.integration.test.ts @@ -4,6 +4,7 @@ import { withTestServer } from '../helpers/http' vi.mock('~~/server/utils/agent-types', async () => await import('../../server/utils/agent-types')) vi.mock('~~/server/utils/agent-state-machine', async () => await import('../../server/utils/agent-state-machine')) vi.mock('~~/server/utils/agent-context', async () => await import('../../server/utils/agent-context')) +vi.mock('~~/server/utils/agent-system-prompt', async () => await import('../../server/utils/agent-system-prompt')) vi.mock('~~/server/utils/conversation-engine', async () => await import('../../server/utils/conversation-engine')) vi.mock('~~/server/utils/conversation-history', async () => await import('../../server/utils/conversation-history')) @@ -149,7 +150,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', saveChatResult) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, @@ -195,19 +197,19 @@ describe('chat route integration', () => { // budget math is covered by conversation-history.test.ts. Here we only // check that the handler asked for some bounded history slice. expect(mockLoadMessages).toHaveBeenCalledWith('conversation-new', expect.any(Number)) - expect(saveChatResult).toHaveBeenCalledWith( - 'conversation-new', - 'hello', - 'Hello from the agent.', - [{ type: 'text', text: 'Hello from the agent.' }], - expect.any(String), - 12, - 24, - 'workspace-1', - 'user-1', - 'studio', - expect.any(String), - ) + expect(saveChatResult).toHaveBeenCalledWith(expect.objectContaining({ + conversationId: 'conversation-new', + userMessage: 'hello', + assistantText: 'Hello from the agent.', + assistantContent: [{ type: 'text', text: 'Hello from the agent.' }], + model: expect.any(String), + inputTokens: 12, + outputTokens: 24, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'studio', + usageMonth: expect.any(String), + })) }) }) @@ -334,7 +336,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', saveChatResult) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, @@ -418,7 +421,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', vi.fn().mockResolvedValue(undefined)) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, @@ -501,7 +505,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', vi.fn().mockResolvedValue(undefined)) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, diff --git a/tests/integration/overage-soft-cap.integration.test.ts b/tests/integration/overage-soft-cap.integration.test.ts index f1620a8..54f9d35 100644 --- a/tests/integration/overage-soft-cap.integration.test.ts +++ b/tests/integration/overage-soft-cap.integration.test.ts @@ -4,6 +4,7 @@ import { withTestServer } from '../helpers/http' vi.mock('~~/server/utils/agent-types', async () => await import('../../server/utils/agent-types')) vi.mock('~~/server/utils/agent-state-machine', async () => await import('../../server/utils/agent-state-machine')) vi.mock('~~/server/utils/agent-context', async () => await import('../../server/utils/agent-context')) +vi.mock('~~/server/utils/agent-system-prompt', async () => await import('../../server/utils/agent-system-prompt')) vi.mock('~~/server/utils/conversation-engine', async () => await import('../../server/utils/conversation-engine')) vi.mock('~~/server/utils/conversation-history', async () => await import('../../server/utils/conversation-history')) diff --git a/tests/unit/agent-system-prompt-cache.test.ts b/tests/unit/agent-system-prompt-cache.test.ts new file mode 100644 index 0000000..f37e12e --- /dev/null +++ b/tests/unit/agent-system-prompt-cache.test.ts @@ -0,0 +1,214 @@ +import type { ContentrainConfig, ModelDefinition } from '@contentrain/types' +import { beforeAll, describe, expect, it, vi } from 'vitest' +import { agentPrompt } from '../../server/utils/content-strings' +import { getPlanParams, getUpgradeParams, PLAN_PRICING } from '../../shared/utils/license' +import type { AgentPermissions } from '../../server/utils/agent-permissions' +import { buildSystemPromptBlocks, toSystemBlocks } from '../../server/utils/agent-system-prompt' +import type { ChatUIContext, ClassifiedIntent } from '../../server/utils/agent-types' + +// `agent-system-prompt.ts` uses these via Nuxt auto-imports; for the +// unit test we hoist them onto the global so the module's internal +// calls resolve. Real Nuxt server runtime already exposes them. +beforeAll(() => { + vi.stubGlobal('agentPrompt', agentPrompt) + vi.stubGlobal('getPlanParams', getPlanParams) + vi.stubGlobal('getUpgradeParams', getUpgradeParams) + vi.stubGlobal('PLAN_PRICING', PLAN_PRICING) +}) + +const baseModels: ModelDefinition[] = [ + { id: 'posts', name: 'Posts', kind: 'collection', domain: 'public', i18n: 'translated', fields: { title: { type: 'string', required: true } } } as ModelDefinition, + { id: 'pages', name: 'Pages', kind: 'document', domain: 'public', i18n: 'translated', fields: { slug: { type: 'string', required: true } } } as ModelDefinition, +] + +const baseConfig: ContentrainConfig = { + stack: 'nuxt', + domains: ['public'], + locales: { default: 'en', supported: ['en'] }, + workflow: 'auto-merge', +} as ContentrainConfig + +const basePermissions: AgentPermissions = { + workspaceRole: 'admin', + projectRole: null, + specificModels: false, + allowedModels: [], + allowedLocales: [], + availableTools: ['get_content'], +} + +const baseState = { + initialized: true, + pendingBranches: [], + projectStatus: 'active', + phase: 'active' as const, + contentContext: null, +} + +const baseIntent: ClassifiedIntent = { + category: 'navigation', + confidence: 'high', + inferred: {}, +} + +describe('buildSystemPromptBlocks — static/dynamic separation', () => { + it('keeps the active model marker out of the static block', () => { + // Same project, two different UI contexts — only `activeModelId` + // changes. If the marker leaks into the static block the cached + // prefix breaks and we'd pay for cache_creation every turn. + const uiA: ChatUIContext = { + activeModelId: 'posts', + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const uiB: ChatUIContext = { + activeModelId: 'pages', + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, uiA, baseIntent, null) + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, uiB, baseIntent, null) + + expect(a.static).toBe(b.static) + expect(a.static).not.toContain('▶') + expect(a.dynamic).not.toBe(b.dynamic) + }) + + it('keeps the intent out of the static block', () => { + // Off-topic intent activates an additional rule; that rule must + // not bleed into the static body or every intent change would + // bust the cache. + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const onTopic: ClassifiedIntent = { category: 'navigation', confidence: 'high', inferred: {} } + const offTopic: ClassifiedIntent = { category: 'out_of_scope', confidence: 'high', inferred: {} } + + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, onTopic, null) + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, offTopic, null) + + expect(a.static).toBe(b.static) + expect(b.dynamic).toMatch(/off|out.?of.?scope|topic/i) + }) + + it('keeps the project state out of the static block', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const stateA = { ...baseState, pendingBranches: [] } + const stateB = { ...baseState, pendingBranches: [{ name: 'cr/test', sha: 'abc', protected: false }] } + + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, stateA, ui, baseIntent, null) + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, stateB, ui, baseIntent, null) + + expect(a.static).toBe(b.static) + expect(a.dynamic).not.toBe(b.dynamic) + }) + + it('emits a non-empty static body with role, schema, and base rules', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const blocks = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, null) + // Schema model names should appear in the static block + expect(blocks.static).toContain('Posts') + expect(blocks.static).toContain('Pages') + // Permissions section appears in static + expect(blocks.static).toContain('Permissions') + // Rules header appears in static + expect(blocks.static).toContain('## Rules') + }) + + it('returns contentIndex separately and is preserved when non-empty', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, 'index payload') + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, null) + expect(a.contentIndex).toBe('index payload') + expect(b.contentIndex).toBeNull() + // The static block does NOT inline the index — it lives in its own cacheable block. + expect(a.static).not.toContain('index payload') + }) + + it('treats whitespace-only contentIndex as absent', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const blocks = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, ' \n ') + expect(blocks.contentIndex).toBeNull() + }) +}) + +describe('toSystemBlocks — cache breakpoint assembly', () => { + it('places cache_control on the static and contentIndex blocks and leaves dynamic uncached', () => { + const blocks = toSystemBlocks({ + static: 'STATIC_BODY', + contentIndex: 'INDEX_BODY', + dynamic: 'DYNAMIC_BODY', + }) + expect(blocks).toEqual([ + { type: 'text', text: 'STATIC_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'INDEX_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'DYNAMIC_BODY' }, + ]) + }) + + it('omits the contentIndex block when null', () => { + const blocks = toSystemBlocks({ + static: 'STATIC_BODY', + contentIndex: null, + dynamic: 'DYNAMIC_BODY', + }) + expect(blocks).toEqual([ + { type: 'text', text: 'STATIC_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'DYNAMIC_BODY' }, + ]) + }) + + it('omits the dynamic block when empty', () => { + const blocks = toSystemBlocks({ + static: 'STATIC_BODY', + contentIndex: 'INDEX_BODY', + dynamic: '', + }) + expect(blocks).toEqual([ + { type: 'text', text: 'STATIC_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'INDEX_BODY', cacheControl: { type: 'ephemeral' } }, + ]) + }) + + it('produces at most 2 cached blocks (within Anthropic 4-breakpoint limit, leaving room for tools)', () => { + const blocks = toSystemBlocks({ + static: 'a', + contentIndex: 'b', + dynamic: 'c', + }) + const cached = blocks.filter(b => b.cacheControl?.type === 'ephemeral') + expect(cached.length).toBeLessThanOrEqual(2) + }) +}) diff --git a/tests/unit/anthropic-ai.test.ts b/tests/unit/anthropic-ai.test.ts index f768824..dbf9eb5 100644 --- a/tests/unit/anthropic-ai.test.ts +++ b/tests/unit/anthropic-ai.test.ts @@ -34,7 +34,22 @@ describe('anthropic provider', () => { }) it('normalizes Anthropic stream events into Studio stream events', async () => { + // Real Anthropic stream order: message_start → content blocks → + // message_delta (stop_reason + final output_tokens) → message_stop. + // The provider emits a single normalized `message_end` once + // message_stop fires, with all four token buckets in usage. anthropicState.stream.mockReturnValue((async function* () { + yield { + type: 'message_start', + message: { + usage: { + input_tokens: 10, + output_tokens: 0, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + }, + }, + } yield { type: 'content_block_start', content_block: { type: 'tool_use', id: 'tool-1', name: 'save_content' } } yield { type: 'content_block_delta', delta: { type: 'input_json_delta', partial_json: '{"title":"Hello"}' } } yield { type: 'content_block_stop' } @@ -42,8 +57,9 @@ describe('anthropic provider', () => { yield { type: 'message_delta', delta: { stop_reason: 'end_turn' }, - usage: { input_tokens: 10, output_tokens: 20 }, + usage: { output_tokens: 20 }, } + yield { type: 'message_stop' } })()) const { createAnthropicProvider } = await import('../../server/providers/anthropic-ai') @@ -73,11 +89,63 @@ describe('anthropic provider', () => { { type: 'message_end', stopReason: 'end_turn', - usage: { inputTokens: 10, outputTokens: 20 }, + usage: { + inputTokens: 10, + outputTokens: 20, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + }, }, ]) }) + it('captures prompt cache token buckets reported by the provider', async () => { + // Second turn of a cached conversation: most input is served + // from cache, a small slice was newly cached, no creation this + // turn. The provider must surface all three buckets disjointly. + anthropicState.stream.mockReturnValue((async function* () { + yield { + type: 'message_start', + message: { + usage: { + input_tokens: 50, + output_tokens: 0, + cache_creation_input_tokens: 200, + cache_read_input_tokens: 8000, + }, + }, + } + yield { type: 'content_block_delta', delta: { type: 'text_delta', text: 'ok' } } + yield { + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 5 }, + } + yield { type: 'message_stop' } + })()) + + const { createAnthropicProvider } = await import('../../server/providers/anthropic-ai') + const provider = createAnthropicProvider() + const events: { type: string, usage?: unknown }[] = [] + for await (const event of provider.streamCompletion({ + model: 'claude-sonnet-4-20250514', + system: [{ type: 'text', text: 'cached block', cacheControl: { type: 'ephemeral' } }], + messages: [{ role: 'user', content: 'hello' }], + tools: [], + maxTokens: 256, + }, 'api-key')) { + events.push(event) + } + + const messageEnd = events.find(e => e.type === 'message_end') + expect(messageEnd?.usage).toEqual({ + inputTokens: 50, + outputTokens: 5, + cacheCreationInputTokens: 200, + cacheReadInputTokens: 8000, + }) + }) + it('normalizes full Anthropic completions into Studio content blocks', async () => { anthropicState.create.mockResolvedValue({ content: [ @@ -88,6 +156,8 @@ describe('anthropic provider', () => { usage: { input_tokens: 11, output_tokens: 22, + cache_creation_input_tokens: 100, + cache_read_input_tokens: 4000, }, }) @@ -106,7 +176,48 @@ describe('anthropic provider', () => { { type: 'tool_use', id: 'tool-2', name: 'brain_query', input: { model: 'posts' } }, ], stopReason: 'tool_use', - usage: { inputTokens: 11, outputTokens: 22 }, + usage: { + inputTokens: 11, + outputTokens: 22, + cacheCreationInputTokens: 100, + cacheReadInputTokens: 4000, + }, + }) + }) + + it('maps system block array with cache_control to Anthropic SDK shape', async () => { + // Provider should pass blocks through with `cache_control` markers + // preserved and tools' last entry's cacheControl mapped too. + anthropicState.create.mockResolvedValue({ + content: [{ type: 'text', text: 'ok' }], + stop_reason: 'end_turn', + usage: { input_tokens: 1, output_tokens: 1 }, }) + + const { createAnthropicProvider } = await import('../../server/providers/anthropic-ai') + const provider = createAnthropicProvider() + await provider.createCompletion({ + model: 'claude-sonnet-4-20250514', + system: [ + { type: 'text', text: 'static body', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'brain index', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'dynamic body' }, + ], + messages: [{ role: 'user', content: 'hello' }], + tools: [ + { name: 'first', description: 'a', inputSchema: {} }, + { name: 'last', description: 'b', inputSchema: {}, cacheControl: { type: 'ephemeral' } }, + ], + maxTokens: 256, + }, 'api-key') + + const call = anthropicState.create.mock.calls[0]![0] + expect(call.system).toEqual([ + { type: 'text', text: 'static body', cache_control: { type: 'ephemeral' } }, + { type: 'text', text: 'brain index', cache_control: { type: 'ephemeral' } }, + { type: 'text', text: 'dynamic body' }, + ]) + expect(call.tools[0]).not.toHaveProperty('cache_control') + expect(call.tools[1]).toMatchObject({ cache_control: { type: 'ephemeral' } }) }) }) diff --git a/tests/unit/db.test.ts b/tests/unit/db.test.ts index a0af57d..17155c4 100644 --- a/tests/unit/db.test.ts +++ b/tests/unit/db.test.ts @@ -106,19 +106,21 @@ describe('db helpers', () => { it('saves chat results via provider methods', async () => { const { saveChatResult } = await loadDbModule() - await saveChatResult( - 'conv-1', - 'Hello', - 'World', - [{ type: 'text', text: 'World' }], - 'claude-sonnet-4-20250514', - 7, - 3, - 'workspace-1', - 'user-1', - 'studio', - '2026-04', - ) + await saveChatResult({ + conversationId: 'conv-1', + userMessage: 'Hello', + assistantText: 'World', + assistantContent: [{ type: 'text', text: 'World' }], + model: 'claude-sonnet-4-20250514', + inputTokens: 7, + outputTokens: 3, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'studio', + usageMonth: '2026-04', + }) expect(mockDb.insertMessage).toHaveBeenCalledTimes(2) expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ role: 'user', content: 'Hello' })) @@ -130,25 +132,29 @@ describe('db helpers', () => { source: 'studio', inputTokens: 7, outputTokens: 3, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, })) expect(mockDb.updateConversationTimestamp).toHaveBeenCalledWith('conv-1') }) it('updates token counts on the usage row reserved by the atomic limit check', async () => { const { saveChatResult } = await loadDbModule() - await saveChatResult( - 'conv-1', - 'Hello', - '', - [], - 'claude-haiku-4-5-20251001', - 4, - 2, - 'workspace-1', - 'user-1', - 'byoa', - '2026-04', - ) + await saveChatResult({ + conversationId: 'conv-1', + userMessage: 'Hello', + assistantText: '', + assistantContent: [], + model: 'claude-haiku-4-5-20251001', + inputTokens: 4, + outputTokens: 2, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'byoa', + usageMonth: '2026-04', + }) expect(mockDb.updateAgentUsageTokens).toHaveBeenCalledWith(expect.objectContaining({ source: 'byoa', @@ -158,20 +164,60 @@ describe('db helpers', () => { })) }) + it('propagates cache token buckets through saveChatResult', async () => { + // Second turn of a cached conversation — most input is cache_read + // (cheap), a small slice is cache_creation, base input/output are + // small. All four buckets must land on agent_usage row + message row. + const { saveChatResult } = await loadDbModule() + await saveChatResult({ + conversationId: 'conv-1', + userMessage: 'follow up', + assistantText: 'short', + assistantContent: [{ type: 'text', text: 'short' }], + model: 'claude-sonnet-4-5', + inputTokens: 80, + outputTokens: 12, + cacheCreationInputTokens: 150, + cacheReadInputTokens: 9000, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'studio', + usageMonth: '2026-04', + }) + + expect(mockDb.updateAgentUsageTokens).toHaveBeenCalledWith({ + workspaceId: 'workspace-1', + userId: 'user-1', + month: '2026-04', + source: 'studio', + inputTokens: 80, + outputTokens: 12, + cacheCreationInputTokens: 150, + cacheReadInputTokens: 9000, + }) + expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ + role: 'assistant', + cacheCreationInputTokens: 150, + cacheReadInputTokens: 9000, + })) + }) + it('saveApiChatResult writes to api_message_usage, not agent_usage', async () => { const { saveApiChatResult } = await loadDbModule() - await saveApiChatResult( - 'conv-2', - 'Hello', - 'World', - [{ type: 'text', text: 'World' }], - 'claude-sonnet-4-5', - 11, - 5, - 'workspace-1', - 'key-abc', - '2026-04', - ) + await saveApiChatResult({ + conversationId: 'conv-2', + userMessage: 'Hello', + assistantText: 'World', + assistantContent: [{ type: 'text', text: 'World' }], + model: 'claude-sonnet-4-5', + inputTokens: 11, + outputTokens: 5, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + workspaceId: 'workspace-1', + apiKeyId: 'key-abc', + usageMonth: '2026-04', + }) // Messages persist into the shared messages table the same way as // the Studio path; the difference is only in which usage table the @@ -186,6 +232,8 @@ describe('db helpers', () => { month: '2026-04', inputTokens: 11, outputTokens: 5, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, }) // Critical: must NOT touch the user-keyed agent_usage path. expect(mockDb.updateAgentUsageTokens).not.toHaveBeenCalled()