diff --git a/ee/enterprise/conversation-api.ts b/ee/enterprise/conversation-api.ts index 80a5574..ce8ab31 100644 --- a/ee/enterprise/conversation-api.ts +++ b/ee/enterprise/conversation-api.ts @@ -7,7 +7,7 @@ import type { ChatUIContext } from '../../server/utils/agent-types' import { toAITools } from '../../server/utils/agent-types' import { classifyIntent } from '../../server/utils/agent-context' import { deriveProjectPhase } from '../../server/utils/agent-state-machine' -import { buildSystemPrompt } from '../../server/utils/agent-system-prompt' +import { buildSystemPromptBlocks, toSystemBlocks } from '../../server/utils/agent-system-prompt' import { STUDIO_TOOLS, filterToolsByPermissions } from '../../server/utils/agent-tools' import { buildContentIndex, getOrBuildBrainCache } from '../../server/utils/brain-cache' import { createContentEngine } from '../../server/utils/content-engine' @@ -252,26 +252,30 @@ async function runConversationMessage( const phase = deriveProjectPhase(projectConfig, pendingBranches, project.status ?? 'active') const intent = classifyIntent(body.message, uiContext, phase) - let systemPrompt = buildSystemPrompt( + const contentIndex = buildContentIndex(brain) + const promptBlocks = buildSystemPromptBlocks( projectConfig, models, permissions, { initialized: !!projectConfig, pendingBranches, projectStatus: project.status ?? 'active', phase, contentContext }, uiContext, intent, + contentIndex || null, vocabulary, plan, keyData.customInstructions, ) - - const contentIndex = buildContentIndex(brain) - if (contentIndex) - systemPrompt += `\n\n${contentIndex}` + const systemPrompt = toSystemBlocks(promptBlocks) const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as typeof STUDIO_TOOLS const phaseFiltered = permissionFiltered.filter(tool => tool.requiredPhase.includes(phase)) const aiTools = toAITools(phaseFiltered) + // Cache the tools array — same rationale as the Studio chat path. + if (aiTools.length > 0) { + aiTools[aiTools.length - 1]!.cacheControl = { type: 'ephemeral' } + } + const runtimeConfig = useRuntimeConfig() const apiKey = runtimeConfig.anthropic.apiKey if (!apiKey) @@ -312,6 +316,8 @@ async function runConversationMessage( let responseText = '' let totalInputTokens = 0 let totalOutputTokens = 0 + let totalCacheCreationInputTokens = 0 + let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] for await (const evt of runConversationLoop( @@ -346,26 +352,32 @@ async function runConversationMessage( case 'tool_result': toolResults.push({ id: evt.id as string, name: evt.name as string, result: evt.result }) break - case 'done': - totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 - totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 + case 'done': { + const u = evt.usage as { inputTokens?: number, outputTokens?: number, cacheCreationInputTokens?: number, cacheReadInputTokens?: number } | undefined + totalInputTokens = u?.inputTokens ?? 0 + totalOutputTokens = u?.outputTokens ?? 0 + totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0 lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] break + } } } - await saveApiChatResult( + await saveApiChatResult({ conversationId, - body.message, - responseText, - lastAssistantContent, + userMessage: body.message, + assistantText: responseText, + assistantContent: lastAssistantContent, model, - totalInputTokens, - totalOutputTokens, - keyData.workspaceId, - keyData.keyId, + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, + workspaceId: keyData.workspaceId, + apiKeyId: keyData.keyId, usageMonth, - ) + }) return { conversationId, @@ -374,6 +386,8 @@ async function runConversationMessage( usage: { inputTokens: totalInputTokens, outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, }, } } diff --git a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts index 56d03c2..4b70fbd 100644 --- a/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts +++ b/server/api/workspaces/[workspaceId]/projects/[projectId]/chat.post.ts @@ -189,19 +189,32 @@ export default defineEventHandler(async (event) => { contentContext, } - let systemPrompt = buildSystemPrompt(projectConfig, models, permissions, projectState, uiContext, intent, vocabulary, plan) - - // Append content index from brain cache (compact summary of all content) + // Build the system prompt as cache-aware blocks: the static body + // (role, architecture, schema, vocab, permissions, base rules, + // custom instructions) and the brain content index get their own + // `cache_control` markers; the dynamic block (UI context, intent, + // state) follows uncached so request-by-request changes never + // invalidate the cached prefix. const contentIndex = buildContentIndex(brain) - if (contentIndex) { - systemPrompt += `\n\n${contentIndex}` - } + const promptBlocks = buildSystemPromptBlocks( + projectConfig, models, permissions, projectState, uiContext, intent, + contentIndex || null, + vocabulary, plan, + ) + const systemPrompt = toSystemBlocks(promptBlocks) // === FILTER TOOLS by permissions + phase === const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as StudioTool[] const phaseFiltered = permissionFiltered.filter(t => t.requiredPhase.includes(phase)) const aiTools = toAITools(phaseFiltered) + // Mark the last tool with cache_control so Anthropic caches the + // entire tools block. Tools rarely change within a session — this + // is a high-hit-rate third breakpoint after the two system blocks. + if (aiTools.length > 0) { + aiTools[aiTools.length - 1]!.cacheControl = { type: 'ephemeral' } + } + // Workflow: plans without review feature always auto-merge regardless of config const configWorkflow = projectConfig?.workflow ?? 'auto-merge' const workflow = hasFeature(plan, 'workflow.review') ? configWorkflow : 'auto-merge' @@ -216,6 +229,8 @@ export default defineEventHandler(async (event) => { let totalInputTokens = 0 let totalOutputTokens = 0 + let totalCacheCreationInputTokens = 0 + let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] try { @@ -238,9 +253,15 @@ export default defineEventHandler(async (event) => { // Forward all events to SSE stream if (evt.type === 'done') { - // Extract final state from done event before forwarding - totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0 - totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0 + // Extract final state from done event before forwarding. + // Engine accumulates all four token buckets (regular + // input/output + cache creation + cache read) across + // streaming and non-streaming iterations. + const u = evt.usage as { inputTokens?: number, outputTokens?: number, cacheCreationInputTokens?: number, cacheReadInputTokens?: number } | undefined + totalInputTokens = u?.inputTokens ?? 0 + totalOutputTokens = u?.outputTokens ?? 0 + totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0 lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? [] // Forward the done event without lastContent (not needed by client) @@ -261,11 +282,21 @@ export default defineEventHandler(async (event) => { .map(b => (b as { text: string }).text) .join('') - await saveChatResult( - conversationId, body.message, assistantText, - lastAssistantContent, model, totalInputTokens, totalOutputTokens, - workspaceId, session.user.id, usageSource, usageMonth, - ) + await saveChatResult({ + conversationId, + userMessage: body.message, + assistantText, + assistantContent: lastAssistantContent, + model, + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, + workspaceId, + userId: session.user.id, + usageSource, + usageMonth, + }) // Webhook events are now emitted from conversation-engine.ts per tool execution } diff --git a/server/providers/ai.ts b/server/providers/ai.ts index d1d69b1..40ce3cd 100644 --- a/server/providers/ai.ts +++ b/server/providers/ai.ts @@ -13,6 +13,12 @@ export interface AITool { name: string description: string inputSchema: Record // JSON Schema + /** + * Provider-agnostic prompt cache marker. When set on the LAST tool + * in the array, supporting providers (Anthropic) cache the full + * tools block. Unsupported providers ignore the field. + */ + cacheControl?: AICacheControl } export interface AIMessage { @@ -25,6 +31,43 @@ export type AIContentBlock | { type: 'tool_use', id: string, name: string, input: unknown } | { type: 'tool_result', toolUseId: string, content: string } +/** + * Prompt cache marker. `ephemeral` is Anthropic's 5-minute TTL bucket. + * Provider-agnostic shape — providers that don't support prompt + * caching ignore the marker entirely and the request still works + * (just without cache benefits). + */ +export interface AICacheControl { + type: 'ephemeral' +} + +/** + * Structured system prompt block. Use the array form of + * `AICompletionRequest.system` to place cache breakpoints between + * blocks. A plain `string` is equivalent to a single uncached block + * and stays accepted for backward compatibility. + */ +export interface AISystemBlock { + type: 'text' + text: string + cacheControl?: AICacheControl +} + +/** + * Token usage on a single completion. Anthropic returns four disjoint + * buckets — `input_tokens` semantic is "non-cached input only," so + * the existing dashboards that sum it stay correct after this change. + * Total billable input cost (Contentrain-side) is approximately: + * inputTokens * 1x + cacheCreationInputTokens * 1.25x + cacheReadInputTokens * 0.1x + * at the base per-MTok price for the model. + */ +export interface AIUsage { + inputTokens: number + outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number +} + export interface AIStreamEvent { type: 'text' | 'tool_use_start' | 'tool_use_input' | 'tool_use_end' | 'message_end' | 'error' // text @@ -35,14 +78,20 @@ export interface AIStreamEvent { toolInput?: unknown // message_end stopReason?: 'end_turn' | 'tool_use' | 'max_tokens' - usage?: { inputTokens: number, outputTokens: number } + usage?: AIUsage // error error?: string } export interface AICompletionRequest { model: string - system: string + /** + * String form is treated as a single uncached system block. Array + * form lets callers place cache breakpoints between blocks; up to + * 4 cache_control markers are honored per request (Anthropic + * limit), shared across system + tools + messages. + */ + system: string | AISystemBlock[] messages: AIMessage[] tools: AITool[] maxTokens: number @@ -52,7 +101,7 @@ export interface AICompletionRequest { export interface AICompletionResponse { content: AIContentBlock[] stopReason: 'end_turn' | 'tool_use' | 'max_tokens' - usage: { inputTokens: number, outputTokens: number } + usage: AIUsage } export interface AIProvider { diff --git a/server/providers/anthropic-ai.ts b/server/providers/anthropic-ai.ts index 4c9e45b..16e0abc 100644 --- a/server/providers/anthropic-ai.ts +++ b/server/providers/anthropic-ai.ts @@ -5,6 +5,9 @@ import type { AIContentBlock, AIProvider, AIStreamEvent, + AISystemBlock, + AITool, + AIUsage, } from './ai' /** @@ -12,6 +15,11 @@ import type { * * Uses @anthropic-ai/sdk for streaming and tool use. * Normalizes Anthropic-specific formats to Studio's standard events. + * + * Prompt cache: `AISystemBlock.cacheControl` and `AITool.cacheControl` + * map to Anthropic's `cache_control: { type: 'ephemeral' }`. The SDK + * returns three input-token buckets in `usage` (input, cache_creation, + * cache_read); all four are forwarded through `AIUsage`. */ export function createAnthropicProvider(): AIProvider { return { @@ -23,7 +31,7 @@ export function createAnthropicProvider(): AIProvider { const stream = client.messages.stream({ model: request.model, - system: request.system, + system: toAnthropicSystem(request.system), messages: toAnthropicMessages(request.messages), tools: toAnthropicTools(request.tools), max_tokens: request.maxTokens, @@ -32,9 +40,36 @@ export function createAnthropicProvider(): AIProvider { let currentToolId: string | undefined let currentToolName: string | undefined let currentToolInput = '' + // `message_start` carries the initial usage snapshot (input, + // cache_creation, cache_read). `message_delta` then carries the + // final output_tokens and any final usage adjustments. We + // accumulate as events arrive and emit a single normalized + // usage payload at message_end. + const usage: AIUsage = { + inputTokens: 0, + outputTokens: 0, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + } + let stopReason: AIStreamEvent['stopReason'] for await (const event of stream) { switch (event.type) { + case 'message_start': + if ('message' in event && event.message?.usage) { + const m = event.message.usage as { + input_tokens?: number + output_tokens?: number + cache_creation_input_tokens?: number + cache_read_input_tokens?: number + } + usage.inputTokens = m.input_tokens ?? 0 + usage.outputTokens = m.output_tokens ?? 0 + usage.cacheCreationInputTokens = m.cache_creation_input_tokens ?? 0 + usage.cacheReadInputTokens = m.cache_read_input_tokens ?? 0 + } + break + case 'content_block_start': if (event.content_block.type === 'text') { // Text block starting — nothing to emit yet @@ -86,19 +121,25 @@ export function createAnthropicProvider(): AIProvider { } break - case 'message_stop': - break - case 'message_delta': if ('usage' in event) { - yield { - type: 'message_end', - stopReason: (event.delta as { stop_reason?: string }).stop_reason as AIStreamEvent['stopReason'], - usage: { - inputTokens: (event.usage as { input_tokens?: number }).input_tokens ?? 0, - outputTokens: (event.usage as { output_tokens?: number }).output_tokens ?? 0, - }, - } + const d = event.usage as { output_tokens?: number, input_tokens?: number, cache_creation_input_tokens?: number, cache_read_input_tokens?: number } + // Anthropic spec: message_delta carries output_tokens + // (and sometimes a refined input bucket). Add to what + // message_start already captured rather than overwriting. + if (typeof d.output_tokens === 'number') usage.outputTokens = d.output_tokens + if (typeof d.input_tokens === 'number') usage.inputTokens = d.input_tokens + if (typeof d.cache_creation_input_tokens === 'number') usage.cacheCreationInputTokens = d.cache_creation_input_tokens + if (typeof d.cache_read_input_tokens === 'number') usage.cacheReadInputTokens = d.cache_read_input_tokens + } + stopReason = (event.delta as { stop_reason?: string }).stop_reason as AIStreamEvent['stopReason'] + break + + case 'message_stop': + yield { + type: 'message_end', + stopReason, + usage, } break } @@ -113,24 +154,46 @@ export function createAnthropicProvider(): AIProvider { const response = await client.messages.create({ model: request.model, - system: request.system, + system: toAnthropicSystem(request.system), messages: toAnthropicMessages(request.messages), tools: toAnthropicTools(request.tools), max_tokens: request.maxTokens, }, { signal: request.abortSignal }) + const respUsage = response.usage as { + input_tokens: number + output_tokens: number + cache_creation_input_tokens?: number + cache_read_input_tokens?: number + } + return { content: fromAnthropicContent(response.content), stopReason: response.stop_reason as AICompletionResponse['stopReason'], usage: { - inputTokens: response.usage.input_tokens, - outputTokens: response.usage.output_tokens, + inputTokens: respUsage.input_tokens, + outputTokens: respUsage.output_tokens, + cacheCreationInputTokens: respUsage.cache_creation_input_tokens ?? 0, + cacheReadInputTokens: respUsage.cache_read_input_tokens ?? 0, }, } }, } } +/** + * Convert Studio system input (string or block array) to Anthropic's + * `string | TextBlockParam[]`. Cache markers map directly. + */ +function toAnthropicSystem(system: string | AISystemBlock[]): string | Anthropic.TextBlockParam[] { + if (typeof system === 'string') return system + return system.map(block => ({ + type: 'text' as const, + text: block.text, + ...(block.cacheControl ? { cache_control: { type: 'ephemeral' as const } } : {}), + })) +} + /** * Convert Studio messages to Anthropic format. */ @@ -159,13 +222,18 @@ function toAnthropicMessages(messages: AICompletionRequest['messages']): Anthrop } /** - * Convert Studio tools to Anthropic format. + * Convert Studio tools to Anthropic format. The optional `cacheControl` + * on a tool maps to Anthropic's `cache_control` and is honored only on + * the last tool — placing it earlier would split the cached prefix and + * cost extra creation writes. Studio callers conventionally mark only + * the final tool. */ -function toAnthropicTools(tools: AICompletionRequest['tools']): Anthropic.Tool[] { - return tools.map(tool => ({ +function toAnthropicTools(tools: AITool[]): Anthropic.Tool[] { + return tools.map((tool): Anthropic.Tool => ({ name: tool.name, description: tool.description, input_schema: tool.inputSchema as Anthropic.Tool['input_schema'], + ...(tool.cacheControl ? { cache_control: { type: 'ephemeral' as const } } : {}), })) } diff --git a/server/providers/database.ts b/server/providers/database.ts index fa704a3..4f85c80 100644 --- a/server/providers/database.ts +++ b/server/providers/database.ts @@ -285,6 +285,8 @@ export interface DatabaseProvider { toolCalls?: unknown[] | null tokenCountInput?: number tokenCountOutput?: number + cacheCreationInputTokens?: number + cacheReadInputTokens?: number model?: string }) => Promise @@ -318,7 +320,13 @@ export interface DatabaseProvider { limit: number }) => Promise<{ allowed: boolean, currentCount: number }> - /** Update token counts on an existing agent_usage row (after chat completes). */ + /** + * Update token counts on an existing `agent_usage` row after the + * chat completes. Cache token fields are required (callers pass 0 + * when the provider didn't report cache usage) so the call shape + * stays stable and the underlying `_v2` RPC always receives all + * four values. + */ updateAgentUsageTokens: (input: { workspaceId: string userId: string @@ -326,6 +334,8 @@ export interface DatabaseProvider { source: string inputTokens: number outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number }) => Promise /** @@ -366,13 +376,19 @@ export interface DatabaseProvider { current: number }> - /** Increment token counters on the `api_message_usage` row after the AI call settles. */ + /** + * Increment token counters on the `api_message_usage` row after the + * AI call settles. Cache token fields are required for the same + * stability reason as `updateAgentUsageTokens`. + */ updateAPIUsageTokens: (input: { workspaceId: string apiKeyId: string month: string inputTokens: number outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number }) => Promise /** diff --git a/server/providers/supabase-db/conversations.ts b/server/providers/supabase-db/conversations.ts index f09fc08..f06dd6b 100644 --- a/server/providers/supabase-db/conversations.ts +++ b/server/providers/supabase-db/conversations.ts @@ -134,6 +134,8 @@ export function conversationMethods(): ConversationMethods { if (input.toolCalls) row.tool_calls = input.toolCalls if (input.tokenCountInput) row.token_count_input = input.tokenCountInput if (input.tokenCountOutput) row.token_count_output = input.tokenCountOutput + if (input.cacheCreationInputTokens) row.cache_creation_input_tokens = input.cacheCreationInputTokens + if (input.cacheReadInputTokens) row.cache_read_input_tokens = input.cacheReadInputTokens if (input.model) row.model = input.model await admin.from('messages').insert(row) @@ -236,14 +238,19 @@ export function conversationMethods(): ConversationMethods { async updateAgentUsageTokens(input) { const admin = getAdmin() - // Use RPC for atomic token increment to prevent concurrent overwrites - await admin.rpc('increment_agent_usage_tokens', { + // _v2 RPC carries the cache token columns alongside the base + // input/output counters; the legacy _v1 entry point stays + // registered for rolling-deploy safety but is no longer called + // from app code. + await admin.rpc('increment_agent_usage_tokens_v2', { p_workspace_id: input.workspaceId, p_user_id: input.userId, p_month: input.month, p_source: input.source, p_input_tokens: input.inputTokens, p_output_tokens: input.outputTokens, + p_cache_creation_input_tokens: input.cacheCreationInputTokens, + p_cache_read_input_tokens: input.cacheReadInputTokens, }) }, @@ -282,12 +289,14 @@ export function conversationMethods(): ConversationMethods { async updateAPIUsageTokens(input) { const admin = getAdmin() - await admin.rpc('increment_api_usage_tokens', { + await admin.rpc('increment_api_usage_tokens_v2', { p_workspace_id: input.workspaceId, p_api_key_id: input.apiKeyId, p_month: input.month, p_input_tokens: input.inputTokens, p_output_tokens: input.outputTokens, + p_cache_creation_input_tokens: input.cacheCreationInputTokens, + p_cache_read_input_tokens: input.cacheReadInputTokens, }) }, diff --git a/server/utils/agent-system-prompt.ts b/server/utils/agent-system-prompt.ts index ed3a6b9..47b9541 100644 --- a/server/utils/agent-system-prompt.ts +++ b/server/utils/agent-system-prompt.ts @@ -1,4 +1,5 @@ import type { ModelDefinition, ContentrainConfig, FieldDef } from '@contentrain/types' +import type { AISystemBlock } from '../providers/ai' import type { Branch } from '../providers/git' import type { AgentPermissions } from './agent-permissions' import type { ChatUIContext, ClassifiedIntent, ProjectPhase } from './agent-types' @@ -6,8 +7,22 @@ import type { ChatUIContext, ClassifiedIntent, ProjectPhase } from './agent-type /** * Bounded Task Executor system prompt. * - * Structure: Role → Contentrain Architecture → UI Context → Intent → State → Schema → Permissions → Rules - * Each section is purpose-built to constrain the agent's behavior. + * Two ways to consume: + * + * - `buildSystemPrompt(...)` returns a single concatenated string, + * preserved for callers that don't want prompt-cache markers + * (legacy paths, tests, alternative providers). + * + * - `buildSystemPromptBlocks(...)` returns a `static` / + * `contentIndex` / `dynamic` split shaped for Anthropic's + * `cache_control` breakpoints. The caller assembles the final + * `AISystemBlock[]` and places markers on the first two blocks. + * + * Static/dynamic split rule: a section is "static" only if its + * content is byte-identical across requests within the same project + * (modulo brain refreshes). Anything keyed on `uiContext`, `intent`, + * or `state` lives in the dynamic block so a single-character change + * doesn't invalidate the cached prefix. */ export interface ProjectState { @@ -19,29 +34,103 @@ export interface ProjectState { contentContext?: Record | null } -export function buildSystemPrompt( +/** + * Static prompt body — content that does NOT vary with `uiContext`, + * `intent`, or `state`. Safe to wrap in a cached system block. + */ +function buildStaticBody( config: ContentrainConfig | null, models: ModelDefinition[], permissions: AgentPermissions, - state: ProjectState, - uiContext: ChatUIContext, - intent: ClassifiedIntent, vocabulary?: Record> | null, plan?: import('./license').Plan, customInstructions?: string | null, ): string { const sections: string[] = [] - // 1. ROLE — strict, bounded + // ROLE sections.push(agentPrompt('role.definition')) - // 2. CONTENTRAIN ARCHITECTURE — the agent must know this to work correctly + // CONTENTRAIN ARCHITECTURE sections.push(buildArchitectureSection()) - // 3. UI CONTEXT — what the user is looking at RIGHT NOW + // CONFIG + if (config) { + sections.push(`## Configuration +- Stack: ${config.stack} +- Locales: ${config.locales.supported.join(', ')} (default: ${config.locales.default}) +- Domains: ${config.domains.join(', ')} +- Workflow: ${config.workflow}`) + } + + // SCHEMA — model list without active-model marker + if (models.length > 0) { + sections.push(buildSchemaSection(models)) + } + + // RELATION GRAPH + const relationGraph = buildRelationGraph(models) + if (relationGraph) { + sections.push(relationGraph) + } + + // VOCABULARY + if (vocabulary && Object.keys(vocabulary).length > 0) { + const termCount = Object.keys(vocabulary).length + const sampleTerms = Object.entries(vocabulary).slice(0, 10) + const termLines = sampleTerms.map(([key, translations]) => { + const locales = Object.entries(translations).map(([l, v]) => `${l}: "${v}"`).join(', ') + return ` - ${key}: ${locales}` + }) + let vocabSection = `## Vocabulary (${termCount} terms)\nShared terminology from .contentrain/vocabulary.json:\n${termLines.join('\n')}` + if (termCount > 10) { + vocabSection += `\n ... and ${termCount - 10} more terms` + } + sections.push(vocabSection) + } + + // PERMISSIONS + const roleDisplay = permissions.projectRole + ? `${permissions.workspaceRole} / ${permissions.projectRole}` + : permissions.workspaceRole + + sections.push(`## Permissions +- Role: ${roleDisplay} +- Available tools: ${permissions.availableTools.join(', ')}${ + permissions.specificModels + ? `\n- Model access restricted to: ${permissions.allowedModels.join(', ')}` + : '' +}`) + + // BASE RULES — intent-independent + sections.push(buildBaseRulesSection(config, permissions, plan)) + + // CUSTOM INSTRUCTIONS (per Conversation API key, stable across the key's lifetime) + if (customInstructions) { + sections.push(`### Custom Instructions (from project admin)\n${customInstructions}`) + } + + return sections.join('\n\n') +} + +/** + * Dynamic prompt body — UI context, intent, state, intent-specific + * rules. Anything that changes per request lives here so the cache + * marker on the static block stays valid. + */ +function buildDynamicBody( + models: ModelDefinition[], + state: ProjectState, + uiContext: ChatUIContext, + intent: ClassifiedIntent, + config: ContentrainConfig | null, +): string { + const sections: string[] = [] + + // UI CONTEXT — what the user is looking at RIGHT NOW (includes active model annotation) sections.push(buildContextSection(uiContext, models, config)) - // 4. INFERRED INTENT + // INFERRED INTENT if (intent.category !== 'out_of_scope') { const inferredLines: string[] = [`## Inferred Intent: ${intent.category}`] if (intent.inferred.modelId) inferredLines.push(`Default model: ${intent.inferred.modelId}`) @@ -53,7 +142,7 @@ export function buildSystemPrompt( sections.push(inferredLines.join('\n')) } - // 5. PROJECT STATE + // PROJECT STATE const stateLines: string[] = ['## Project State'] stateLines.push(`- Phase: ${state.phase}`) stateLines.push(`- Initialized: ${state.initialized ? 'YES' : 'NO'}`) @@ -65,7 +154,6 @@ export function buildSystemPrompt( } } - // Context.json — last operation tracking if (state.contentContext) { const lastOp = state.contentContext.lastOperation as { tool?: string, model?: string, locale?: string, timestamp?: string } | undefined const stats = state.contentContext.stats as { models?: number, entries?: number, locales?: string[] } | undefined @@ -86,63 +174,95 @@ export function buildSystemPrompt( sections.push(stateLines.join('\n')) - // 6. PROJECT CONFIG - if (config) { - sections.push(`## Configuration -- Stack: ${config.stack} -- Locales: ${config.locales.supported.join(', ')} (default: ${config.locales.default}) -- Domains: ${config.domains.join(', ')} -- Workflow: ${config.workflow}`) - } + // INTENT-SPECIFIC RULES (out-of-scope, etc.) + const intentRules = buildIntentRulesSection(intent) + if (intentRules) sections.push(intentRules) - // 7. SCHEMA — full detail for all models with relation graph - if (models.length > 0) { - sections.push(buildSchemaSection(models, uiContext)) - } + return sections.join('\n\n') +} - // 8. RELATION GRAPH — cross-model references - const relationGraph = buildRelationGraph(models) - if (relationGraph) { - sections.push(relationGraph) - } +/** + * Structured system prompt split into prompt-cache friendly blocks. + * + * - `static`: stable across requests; safe behind a `cache_control` + * marker (Block 1). + * - `contentIndex`: brain content index, refreshes on its own TTL; + * gets its own cache breakpoint (Block 2) so a schema change + * doesn't invalidate the content cache and vice versa. + * - `dynamic`: UI context / intent / state — must come after the + * cache breakpoints so request-by-request changes don't break the + * cached prefix. + * + * Callers compose `AISystemBlock[]` and place markers on the first + * two blocks. The `contentIndex` field is `null` when the brain + * hasn't produced one. + */ +export interface SystemPromptBlocks { + static: string + contentIndex: string | null + dynamic: string +} - // 9. VOCABULARY — shared terminology across locales - if (vocabulary && Object.keys(vocabulary).length > 0) { - const termCount = Object.keys(vocabulary).length - const sampleTerms = Object.entries(vocabulary).slice(0, 10) - const termLines = sampleTerms.map(([key, translations]) => { - const locales = Object.entries(translations).map(([l, v]) => `${l}: "${v}"`).join(', ') - return ` - ${key}: ${locales}` - }) - let vocabSection = `## Vocabulary (${termCount} terms)\nShared terminology from .contentrain/vocabulary.json:\n${termLines.join('\n')}` - if (termCount > 10) { - vocabSection += `\n ... and ${termCount - 10} more terms` - } - sections.push(vocabSection) +export function buildSystemPromptBlocks( + config: ContentrainConfig | null, + models: ModelDefinition[], + permissions: AgentPermissions, + state: ProjectState, + uiContext: ChatUIContext, + intent: ClassifiedIntent, + contentIndex: string | null, + vocabulary?: Record> | null, + plan?: import('./license').Plan, + customInstructions?: string | null, +): SystemPromptBlocks { + return { + static: buildStaticBody(config, models, permissions, vocabulary, plan, customInstructions), + contentIndex: contentIndex && contentIndex.trim() ? contentIndex : null, + dynamic: buildDynamicBody(models, state, uiContext, intent, config), } +} - // 9. PERMISSIONS - const roleDisplay = permissions.projectRole - ? `${permissions.workspaceRole} / ${permissions.projectRole}` - : permissions.workspaceRole - - sections.push(`## Permissions -- Role: ${roleDisplay} -- Available tools: ${permissions.availableTools.join(', ')}${ - permissions.specificModels - ? `\n- Model access restricted to: ${permissions.allowedModels.join(', ')}` - : '' -}`) - - // 10. RULES — hardened, workflow-aware, architecture-aware, role-aware, plan-aware - sections.push(buildRulesSection(config, intent, permissions, plan)) - - // Custom instructions (Conversation API keys) - if (customInstructions) { - sections.push(`### Custom Instructions (from project admin)\n${customInstructions}`) +/** + * Materialize the cache-aware blocks as an `AISystemBlock[]` ready + * to hand to `AIProvider`. The first two blocks (static + brain + * content index) get `cache_control` markers; the dynamic block + * stays uncached so request-level changes don't poison the prefix. + */ +export function toSystemBlocks(prompt: SystemPromptBlocks): AISystemBlock[] { + const blocks: AISystemBlock[] = [] + blocks.push({ type: 'text', text: prompt.static, cacheControl: { type: 'ephemeral' } }) + if (prompt.contentIndex) { + blocks.push({ type: 'text', text: prompt.contentIndex, cacheControl: { type: 'ephemeral' } }) } + if (prompt.dynamic.trim()) { + blocks.push({ type: 'text', text: prompt.dynamic }) + } + return blocks +} - return sections.join('\n\n') +/** + * Legacy single-string composition, preserved for callers that don't + * want cache markers (alternative providers, certain test paths). + * Equivalent to `buildSystemPromptBlocks(...)` concatenated, no + * cache_control markers — semantically identical to the old function. + */ +export function buildSystemPrompt( + config: ContentrainConfig | null, + models: ModelDefinition[], + permissions: AgentPermissions, + state: ProjectState, + uiContext: ChatUIContext, + intent: ClassifiedIntent, + vocabulary?: Record> | null, + plan?: import('./license').Plan, + customInstructions?: string | null, +): string { + const blocks = buildSystemPromptBlocks( + config, models, permissions, state, uiContext, intent, + null, // contentIndex appended by caller via `${prompt}\n\n${contentIndex}` + vocabulary, plan, customInstructions, + ) + return [blocks.static, blocks.dynamic].filter(Boolean).join('\n\n') } // ─── Architecture Section ─── @@ -162,19 +282,18 @@ function buildArchitectureSection(): string { // ─── Schema Section ─── -function buildSchemaSection(models: ModelDefinition[], uiContext: ChatUIContext): string { - const activeModel = uiContext.activeModelId - ? models.find(m => m.id === uiContext.activeModelId) - : null - +/** + * Pure model-list rendering — no active-model marker. The active + * model annotation lives in the dynamic UI Context block so the + * schema itself stays byte-identical across requests and the + * cache_control marker placed on the static system block can + * actually hit. + */ +function buildSchemaSection(models: ModelDefinition[]): string { const lines: string[] = ['## Content Schema'] - // Show ALL models with full field details (not just active one) for (const model of models) { - const isActive = model.id === activeModel?.id - const prefix = isActive ? '### ▶ ' : '### ' - - lines.push(`${prefix}${model.name} (\`${model.id}\`)`) + lines.push(`### ${model.name} (\`${model.id}\`)`) lines.push(`Kind: ${model.kind}, domain: ${model.domain}, i18n: ${model.i18n}`) if (model.fields && Object.keys(model.fields).length > 0) { @@ -334,7 +453,12 @@ function buildContextSection( // ─── Rules Section ─── -function buildRulesSection(config: ContentrainConfig | null, intent: ClassifiedIntent, permissions: AgentPermissions, plan?: import('./license').Plan): string { +/** + * Base rules block — depends on `config` (project-stable), + * `permissions` (role-stable per request), and `plan` (workspace- + * stable). No intent dependency, so safe for the cached prefix. + */ +function buildBaseRulesSection(config: ContentrainConfig | null, permissions: AgentPermissions, plan?: import('./license').Plan): string { const effectivePlan = plan ?? 'starter' const workflow = config?.workflow ?? 'auto-merge' const isPrivileged = permissions.workspaceRole === 'owner' || permissions.workspaceRole === 'admin' @@ -412,10 +536,17 @@ function buildRulesSection(config: ContentrainConfig | null, intent: ClassifiedI } rules.push(agentPrompt('plan.tiers', tierParams)) - // Out of scope + return `## Rules\n${rules.map(r => `- ${r}`).join('\n')}` +} + +/** + * Intent-dependent rules. Returns `null` when no intent-specific rule + * applies (the common case), so the caller can omit the section + * entirely and keep the dynamic block minimal. + */ +function buildIntentRulesSection(intent: ClassifiedIntent): string | null { if (intent.category === 'out_of_scope') { - rules.push(agentPrompt('rules.off_topic')) + return `## Additional Rules\n- ${agentPrompt('rules.off_topic')}` } - - return `## Rules\n${rules.map(r => `- ${r}`).join('\n')}` + return null } diff --git a/server/utils/conversation-engine.ts b/server/utils/conversation-engine.ts index c4b259d..7fe3254 100644 --- a/server/utils/conversation-engine.ts +++ b/server/utils/conversation-engine.ts @@ -1,5 +1,5 @@ import type { ModelDefinition } from '@contentrain/types' -import type { AIMessage, AIContentBlock, AITool } from '~~/server/providers/ai' +import type { AIMessage, AIContentBlock, AISystemBlock, AITool } from '~~/server/providers/ai' import type { ChatUIContext, AffectedResources, ProjectPhase } from '~~/server/utils/agent-types' import type { AgentPermissions } from '~~/server/utils/agent-permissions' @@ -29,7 +29,13 @@ export interface ConversationEvent { export interface ConversationConfig { model: string apiKey: string - systemPrompt: string + /** + * Either a plain string (single uncached system block) or an array + * of `AISystemBlock`s with optional cache markers. The engine + * forwards the value verbatim to the provider — callers that want + * prompt-cache hits build their blocks via `buildSystemPromptBlocks`. + */ + systemPrompt: string | AISystemBlock[] messages: AIMessage[] tools: AITool[] maxToolIterations?: number @@ -84,6 +90,8 @@ export async function* runConversationLoop( let totalInputTokens = 0 let totalOutputTokens = 0 + let totalCacheCreationInputTokens = 0 + let totalCacheReadInputTokens = 0 let lastAssistantContent: AIContentBlock[] = [] let accumulatedAffected: AffectedResources = emptyAffected() @@ -142,6 +150,8 @@ export async function* runConversationLoop( flushText() totalInputTokens += streamEvent.usage?.inputTokens ?? 0 totalOutputTokens += streamEvent.usage?.outputTokens ?? 0 + totalCacheCreationInputTokens += streamEvent.usage?.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens += streamEvent.usage?.cacheReadInputTokens ?? 0 stopReason = streamEvent.stopReason break case 'error': @@ -157,6 +167,8 @@ export async function* runConversationLoop( ) totalInputTokens += response.usage.inputTokens totalOutputTokens += response.usage.outputTokens + totalCacheCreationInputTokens += response.usage.cacheCreationInputTokens ?? 0 + totalCacheReadInputTokens += response.usage.cacheReadInputTokens ?? 0 stopReason = response.stopReason for (const block of response.content) { @@ -224,7 +236,12 @@ export async function* runConversationLoop( // === DONE with affected resources === yield { type: 'done', - usage: { inputTokens: totalInputTokens, outputTokens: totalOutputTokens }, + usage: { + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationInputTokens: totalCacheCreationInputTokens, + cacheReadInputTokens: totalCacheReadInputTokens, + }, affected: accumulatedAffected, lastContent: lastAssistantContent, } diff --git a/server/utils/db.ts b/server/utils/db.ts index 866be47..d98b798 100644 --- a/server/utils/db.ts +++ b/server/utils/db.ts @@ -227,6 +227,8 @@ async function persistChatMessages(input: { model: string inputTokens: number outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number }) { const db = useDatabaseProvider() await db.insertMessage({ conversationId: input.conversationId, role: 'user', content: input.userMessage }) @@ -238,6 +240,8 @@ async function persistChatMessages(input: { toolCalls: input.assistantContent.length > 0 ? input.assistantContent : null, tokenCountInput: input.inputTokens, tokenCountOutput: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, model: input.model, }) } @@ -258,41 +262,47 @@ async function persistChatMessages(input: { * Conversation API has its own actor model (key-keyed, not user-keyed) * and a dedicated `api_message_usage` table — use `saveApiChatResult`. */ -export async function saveChatResult( - conversationId: string, - userMessage: string, - assistantText: string, - assistantContent: unknown[], - model: string, - inputTokens: number, - outputTokens: number, - workspaceId: string, - userId: string, - usageSource: 'byoa' | 'studio', - usageMonth: string, -) { +export async function saveChatResult(input: { + conversationId: string + userMessage: string + assistantText: string + assistantContent: unknown[] + model: string + inputTokens: number + outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number + workspaceId: string + userId: string + usageSource: 'byoa' | 'studio' + usageMonth: string +}) { const db = useDatabaseProvider() await persistChatMessages({ - conversationId, - userMessage, - assistantText, - assistantContent, - model, - inputTokens, - outputTokens, + conversationId: input.conversationId, + userMessage: input.userMessage, + assistantText: input.assistantText, + assistantContent: input.assistantContent, + model: input.model, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) await db.updateAgentUsageTokens({ - workspaceId, - userId, - month: usageMonth, - source: usageSource, - inputTokens, - outputTokens, + workspaceId: input.workspaceId, + userId: input.userId, + month: input.usageMonth, + source: input.usageSource, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) - await db.updateConversationTimestamp(conversationId) + await db.updateConversationTimestamp(input.conversationId) } /** @@ -300,37 +310,43 @@ export async function saveChatResult( * chat. The actor here is an API key (no workspace member identity), * so usage flows through `api_message_usage` instead of `agent_usage`. */ -export async function saveApiChatResult( - conversationId: string, - userMessage: string, - assistantText: string, - assistantContent: unknown[], - model: string, - inputTokens: number, - outputTokens: number, - workspaceId: string, - apiKeyId: string, - usageMonth: string, -) { +export async function saveApiChatResult(input: { + conversationId: string + userMessage: string + assistantText: string + assistantContent: unknown[] + model: string + inputTokens: number + outputTokens: number + cacheCreationInputTokens: number + cacheReadInputTokens: number + workspaceId: string + apiKeyId: string + usageMonth: string +}) { const db = useDatabaseProvider() await persistChatMessages({ - conversationId, - userMessage, - assistantText, - assistantContent, - model, - inputTokens, - outputTokens, + conversationId: input.conversationId, + userMessage: input.userMessage, + assistantText: input.assistantText, + assistantContent: input.assistantContent, + model: input.model, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) await db.updateAPIUsageTokens({ - workspaceId, - apiKeyId, - month: usageMonth, - inputTokens, - outputTokens, + workspaceId: input.workspaceId, + apiKeyId: input.apiKeyId, + month: input.usageMonth, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationInputTokens: input.cacheCreationInputTokens, + cacheReadInputTokens: input.cacheReadInputTokens, }) - await db.updateConversationTimestamp(conversationId) + await db.updateConversationTimestamp(input.conversationId) } diff --git a/supabase/migrations/008_cache_token_accounting.sql b/supabase/migrations/008_cache_token_accounting.sql new file mode 100644 index 0000000..81ddcf2 --- /dev/null +++ b/supabase/migrations/008_cache_token_accounting.sql @@ -0,0 +1,100 @@ +-- Anthropic prompt cache token accounting. +-- +-- Anthropic's prompt cache splits an input call into three token +-- buckets in `response.usage`: +-- +-- - input_tokens non-cached input, 1.0x base price +-- - cache_creation_input_tokens written to cache this turn, 1.25x +-- - cache_read_input_tokens served from cache this turn, 0.1x +-- +-- The Studio billing model stays message-based (`ai.messages_per_month`, +-- `api.messages_per_month`) — cache tokens are observability for +-- Contentrain's cost-of-goods analysis, never customer-facing quota. +-- A workspace that benefits from cache hits does NOT earn extra +-- messages; gross margin improves silently. +-- +-- Schema choices: +-- 1. Separate columns mirror the existing `input_tokens`/`output_tokens` +-- pattern. Trivial to query, no jsonb gymnastics. +-- 2. `input_tokens` semantic is UNCHANGED — still "non-cached input +-- tokens." Anthropic returns the buckets disjoint, so existing +-- dashboards built on `input_tokens` keep their meaning. +-- 3. RPCs ship as `_v2` rather than mutating signatures in place. +-- Mid-deploy, the old app version + new RPC schema (or vice +-- versa) is a real failure mode on Supabase / PostgREST. The v1 +-- RPCs stay registered and unused for safety; a follow-up +-- cleanup migration can drop them once every deploy is on v2. + +-- ───────────────────────────────────────────────────────────────── +-- 1. Columns +-- ───────────────────────────────────────────────────────────────── + +ALTER TABLE public.agent_usage + ADD COLUMN cache_creation_input_tokens bigint NOT NULL DEFAULT 0, + ADD COLUMN cache_read_input_tokens bigint NOT NULL DEFAULT 0; + +ALTER TABLE public.api_message_usage + ADD COLUMN cache_creation_input_tokens bigint NOT NULL DEFAULT 0, + ADD COLUMN cache_read_input_tokens bigint NOT NULL DEFAULT 0; + +ALTER TABLE public.messages + ADD COLUMN cache_creation_input_tokens integer NOT NULL DEFAULT 0, + ADD COLUMN cache_read_input_tokens integer NOT NULL DEFAULT 0; + +-- ───────────────────────────────────────────────────────────────── +-- 2. v2 token increment RPCs +-- ───────────────────────────────────────────────────────────────── + +CREATE FUNCTION public.increment_agent_usage_tokens_v2( + p_workspace_id uuid, + p_user_id uuid, + p_month text, + p_source text, + p_input_tokens bigint, + p_output_tokens bigint, + p_cache_creation_input_tokens bigint, + p_cache_read_input_tokens bigint +) RETURNS void +LANGUAGE plpgsql SECURITY DEFINER +SET search_path TO '' +AS $$ +BEGIN + UPDATE public.agent_usage + SET + input_tokens = input_tokens + p_input_tokens, + output_tokens = output_tokens + p_output_tokens, + cache_creation_input_tokens = cache_creation_input_tokens + p_cache_creation_input_tokens, + cache_read_input_tokens = cache_read_input_tokens + p_cache_read_input_tokens, + updated_at = now() + WHERE workspace_id = p_workspace_id + AND user_id = p_user_id + AND month = p_month + AND source = p_source; +END; +$$; + +CREATE FUNCTION public.increment_api_usage_tokens_v2( + p_workspace_id uuid, + p_api_key_id uuid, + p_month text, + p_input_tokens bigint, + p_output_tokens bigint, + p_cache_creation_input_tokens bigint, + p_cache_read_input_tokens bigint +) RETURNS void +LANGUAGE plpgsql SECURITY DEFINER +SET search_path TO '' +AS $$ +BEGIN + UPDATE public.api_message_usage + SET + input_tokens = input_tokens + p_input_tokens, + output_tokens = output_tokens + p_output_tokens, + cache_creation_input_tokens = cache_creation_input_tokens + p_cache_creation_input_tokens, + cache_read_input_tokens = cache_read_input_tokens + p_cache_read_input_tokens, + updated_at = now() + WHERE workspace_id = p_workspace_id + AND api_key_id = p_api_key_id + AND month = p_month; +END; +$$; diff --git a/tests/integration/chat-route.integration.test.ts b/tests/integration/chat-route.integration.test.ts index 7b6b6e4..5af805c 100644 --- a/tests/integration/chat-route.integration.test.ts +++ b/tests/integration/chat-route.integration.test.ts @@ -4,6 +4,7 @@ import { withTestServer } from '../helpers/http' vi.mock('~~/server/utils/agent-types', async () => await import('../../server/utils/agent-types')) vi.mock('~~/server/utils/agent-state-machine', async () => await import('../../server/utils/agent-state-machine')) vi.mock('~~/server/utils/agent-context', async () => await import('../../server/utils/agent-context')) +vi.mock('~~/server/utils/agent-system-prompt', async () => await import('../../server/utils/agent-system-prompt')) vi.mock('~~/server/utils/conversation-engine', async () => await import('../../server/utils/conversation-engine')) vi.mock('~~/server/utils/conversation-history', async () => await import('../../server/utils/conversation-history')) @@ -149,7 +150,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', saveChatResult) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, @@ -195,19 +197,19 @@ describe('chat route integration', () => { // budget math is covered by conversation-history.test.ts. Here we only // check that the handler asked for some bounded history slice. expect(mockLoadMessages).toHaveBeenCalledWith('conversation-new', expect.any(Number)) - expect(saveChatResult).toHaveBeenCalledWith( - 'conversation-new', - 'hello', - 'Hello from the agent.', - [{ type: 'text', text: 'Hello from the agent.' }], - expect.any(String), - 12, - 24, - 'workspace-1', - 'user-1', - 'studio', - expect.any(String), - ) + expect(saveChatResult).toHaveBeenCalledWith(expect.objectContaining({ + conversationId: 'conversation-new', + userMessage: 'hello', + assistantText: 'Hello from the agent.', + assistantContent: [{ type: 'text', text: 'Hello from the agent.' }], + model: expect.any(String), + inputTokens: 12, + outputTokens: 24, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'studio', + usageMonth: expect.any(String), + })) }) }) @@ -334,7 +336,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', saveChatResult) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, @@ -418,7 +421,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', vi.fn().mockResolvedValue(undefined)) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, @@ -501,7 +505,8 @@ describe('chat route integration', () => { vi.stubGlobal('hasFeature', vi.fn().mockReturnValue(false)) vi.stubGlobal('saveChatResult', vi.fn().mockResolvedValue(undefined)) vi.stubGlobal('createContentEngine', vi.fn().mockReturnValue({})) - vi.stubGlobal('buildSystemPrompt', vi.fn().mockReturnValue('system')) + vi.stubGlobal('buildSystemPromptBlocks', vi.fn().mockReturnValue({ static: 'system', contentIndex: null, dynamic: '' })) + vi.stubGlobal('toSystemBlocks', vi.fn().mockReturnValue([{ type: 'text', text: 'system' }])) vi.stubGlobal('buildContentIndex', vi.fn().mockReturnValue('')) vi.stubGlobal('getOrBuildBrainCache', vi.fn().mockResolvedValue({ config: null, diff --git a/tests/integration/overage-soft-cap.integration.test.ts b/tests/integration/overage-soft-cap.integration.test.ts index f1620a8..54f9d35 100644 --- a/tests/integration/overage-soft-cap.integration.test.ts +++ b/tests/integration/overage-soft-cap.integration.test.ts @@ -4,6 +4,7 @@ import { withTestServer } from '../helpers/http' vi.mock('~~/server/utils/agent-types', async () => await import('../../server/utils/agent-types')) vi.mock('~~/server/utils/agent-state-machine', async () => await import('../../server/utils/agent-state-machine')) vi.mock('~~/server/utils/agent-context', async () => await import('../../server/utils/agent-context')) +vi.mock('~~/server/utils/agent-system-prompt', async () => await import('../../server/utils/agent-system-prompt')) vi.mock('~~/server/utils/conversation-engine', async () => await import('../../server/utils/conversation-engine')) vi.mock('~~/server/utils/conversation-history', async () => await import('../../server/utils/conversation-history')) diff --git a/tests/unit/agent-system-prompt-cache.test.ts b/tests/unit/agent-system-prompt-cache.test.ts new file mode 100644 index 0000000..f37e12e --- /dev/null +++ b/tests/unit/agent-system-prompt-cache.test.ts @@ -0,0 +1,214 @@ +import type { ContentrainConfig, ModelDefinition } from '@contentrain/types' +import { beforeAll, describe, expect, it, vi } from 'vitest' +import { agentPrompt } from '../../server/utils/content-strings' +import { getPlanParams, getUpgradeParams, PLAN_PRICING } from '../../shared/utils/license' +import type { AgentPermissions } from '../../server/utils/agent-permissions' +import { buildSystemPromptBlocks, toSystemBlocks } from '../../server/utils/agent-system-prompt' +import type { ChatUIContext, ClassifiedIntent } from '../../server/utils/agent-types' + +// `agent-system-prompt.ts` uses these via Nuxt auto-imports; for the +// unit test we hoist them onto the global so the module's internal +// calls resolve. Real Nuxt server runtime already exposes them. +beforeAll(() => { + vi.stubGlobal('agentPrompt', agentPrompt) + vi.stubGlobal('getPlanParams', getPlanParams) + vi.stubGlobal('getUpgradeParams', getUpgradeParams) + vi.stubGlobal('PLAN_PRICING', PLAN_PRICING) +}) + +const baseModels: ModelDefinition[] = [ + { id: 'posts', name: 'Posts', kind: 'collection', domain: 'public', i18n: 'translated', fields: { title: { type: 'string', required: true } } } as ModelDefinition, + { id: 'pages', name: 'Pages', kind: 'document', domain: 'public', i18n: 'translated', fields: { slug: { type: 'string', required: true } } } as ModelDefinition, +] + +const baseConfig: ContentrainConfig = { + stack: 'nuxt', + domains: ['public'], + locales: { default: 'en', supported: ['en'] }, + workflow: 'auto-merge', +} as ContentrainConfig + +const basePermissions: AgentPermissions = { + workspaceRole: 'admin', + projectRole: null, + specificModels: false, + allowedModels: [], + allowedLocales: [], + availableTools: ['get_content'], +} + +const baseState = { + initialized: true, + pendingBranches: [], + projectStatus: 'active', + phase: 'active' as const, + contentContext: null, +} + +const baseIntent: ClassifiedIntent = { + category: 'navigation', + confidence: 'high', + inferred: {}, +} + +describe('buildSystemPromptBlocks — static/dynamic separation', () => { + it('keeps the active model marker out of the static block', () => { + // Same project, two different UI contexts — only `activeModelId` + // changes. If the marker leaks into the static block the cached + // prefix breaks and we'd pay for cache_creation every turn. + const uiA: ChatUIContext = { + activeModelId: 'posts', + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const uiB: ChatUIContext = { + activeModelId: 'pages', + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, uiA, baseIntent, null) + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, uiB, baseIntent, null) + + expect(a.static).toBe(b.static) + expect(a.static).not.toContain('▶') + expect(a.dynamic).not.toBe(b.dynamic) + }) + + it('keeps the intent out of the static block', () => { + // Off-topic intent activates an additional rule; that rule must + // not bleed into the static body or every intent change would + // bust the cache. + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const onTopic: ClassifiedIntent = { category: 'navigation', confidence: 'high', inferred: {} } + const offTopic: ClassifiedIntent = { category: 'out_of_scope', confidence: 'high', inferred: {} } + + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, onTopic, null) + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, offTopic, null) + + expect(a.static).toBe(b.static) + expect(b.dynamic).toMatch(/off|out.?of.?scope|topic/i) + }) + + it('keeps the project state out of the static block', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const stateA = { ...baseState, pendingBranches: [] } + const stateB = { ...baseState, pendingBranches: [{ name: 'cr/test', sha: 'abc', protected: false }] } + + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, stateA, ui, baseIntent, null) + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, stateB, ui, baseIntent, null) + + expect(a.static).toBe(b.static) + expect(a.dynamic).not.toBe(b.dynamic) + }) + + it('emits a non-empty static body with role, schema, and base rules', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const blocks = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, null) + // Schema model names should appear in the static block + expect(blocks.static).toContain('Posts') + expect(blocks.static).toContain('Pages') + // Permissions section appears in static + expect(blocks.static).toContain('Permissions') + // Rules header appears in static + expect(blocks.static).toContain('## Rules') + }) + + it('returns contentIndex separately and is preserved when non-empty', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const a = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, 'index payload') + const b = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, null) + expect(a.contentIndex).toBe('index payload') + expect(b.contentIndex).toBeNull() + // The static block does NOT inline the index — it lives in its own cacheable block. + expect(a.static).not.toContain('index payload') + }) + + it('treats whitespace-only contentIndex as absent', () => { + const ui: ChatUIContext = { + activeModelId: null, + activeLocale: 'en', + activeEntryId: null, + panelState: 'overview', + activeBranch: null, + } + const blocks = buildSystemPromptBlocks(baseConfig, baseModels, basePermissions, baseState, ui, baseIntent, ' \n ') + expect(blocks.contentIndex).toBeNull() + }) +}) + +describe('toSystemBlocks — cache breakpoint assembly', () => { + it('places cache_control on the static and contentIndex blocks and leaves dynamic uncached', () => { + const blocks = toSystemBlocks({ + static: 'STATIC_BODY', + contentIndex: 'INDEX_BODY', + dynamic: 'DYNAMIC_BODY', + }) + expect(blocks).toEqual([ + { type: 'text', text: 'STATIC_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'INDEX_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'DYNAMIC_BODY' }, + ]) + }) + + it('omits the contentIndex block when null', () => { + const blocks = toSystemBlocks({ + static: 'STATIC_BODY', + contentIndex: null, + dynamic: 'DYNAMIC_BODY', + }) + expect(blocks).toEqual([ + { type: 'text', text: 'STATIC_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'DYNAMIC_BODY' }, + ]) + }) + + it('omits the dynamic block when empty', () => { + const blocks = toSystemBlocks({ + static: 'STATIC_BODY', + contentIndex: 'INDEX_BODY', + dynamic: '', + }) + expect(blocks).toEqual([ + { type: 'text', text: 'STATIC_BODY', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'INDEX_BODY', cacheControl: { type: 'ephemeral' } }, + ]) + }) + + it('produces at most 2 cached blocks (within Anthropic 4-breakpoint limit, leaving room for tools)', () => { + const blocks = toSystemBlocks({ + static: 'a', + contentIndex: 'b', + dynamic: 'c', + }) + const cached = blocks.filter(b => b.cacheControl?.type === 'ephemeral') + expect(cached.length).toBeLessThanOrEqual(2) + }) +}) diff --git a/tests/unit/anthropic-ai.test.ts b/tests/unit/anthropic-ai.test.ts index f768824..dbf9eb5 100644 --- a/tests/unit/anthropic-ai.test.ts +++ b/tests/unit/anthropic-ai.test.ts @@ -34,7 +34,22 @@ describe('anthropic provider', () => { }) it('normalizes Anthropic stream events into Studio stream events', async () => { + // Real Anthropic stream order: message_start → content blocks → + // message_delta (stop_reason + final output_tokens) → message_stop. + // The provider emits a single normalized `message_end` once + // message_stop fires, with all four token buckets in usage. anthropicState.stream.mockReturnValue((async function* () { + yield { + type: 'message_start', + message: { + usage: { + input_tokens: 10, + output_tokens: 0, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + }, + }, + } yield { type: 'content_block_start', content_block: { type: 'tool_use', id: 'tool-1', name: 'save_content' } } yield { type: 'content_block_delta', delta: { type: 'input_json_delta', partial_json: '{"title":"Hello"}' } } yield { type: 'content_block_stop' } @@ -42,8 +57,9 @@ describe('anthropic provider', () => { yield { type: 'message_delta', delta: { stop_reason: 'end_turn' }, - usage: { input_tokens: 10, output_tokens: 20 }, + usage: { output_tokens: 20 }, } + yield { type: 'message_stop' } })()) const { createAnthropicProvider } = await import('../../server/providers/anthropic-ai') @@ -73,11 +89,63 @@ describe('anthropic provider', () => { { type: 'message_end', stopReason: 'end_turn', - usage: { inputTokens: 10, outputTokens: 20 }, + usage: { + inputTokens: 10, + outputTokens: 20, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + }, }, ]) }) + it('captures prompt cache token buckets reported by the provider', async () => { + // Second turn of a cached conversation: most input is served + // from cache, a small slice was newly cached, no creation this + // turn. The provider must surface all three buckets disjointly. + anthropicState.stream.mockReturnValue((async function* () { + yield { + type: 'message_start', + message: { + usage: { + input_tokens: 50, + output_tokens: 0, + cache_creation_input_tokens: 200, + cache_read_input_tokens: 8000, + }, + }, + } + yield { type: 'content_block_delta', delta: { type: 'text_delta', text: 'ok' } } + yield { + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 5 }, + } + yield { type: 'message_stop' } + })()) + + const { createAnthropicProvider } = await import('../../server/providers/anthropic-ai') + const provider = createAnthropicProvider() + const events: { type: string, usage?: unknown }[] = [] + for await (const event of provider.streamCompletion({ + model: 'claude-sonnet-4-20250514', + system: [{ type: 'text', text: 'cached block', cacheControl: { type: 'ephemeral' } }], + messages: [{ role: 'user', content: 'hello' }], + tools: [], + maxTokens: 256, + }, 'api-key')) { + events.push(event) + } + + const messageEnd = events.find(e => e.type === 'message_end') + expect(messageEnd?.usage).toEqual({ + inputTokens: 50, + outputTokens: 5, + cacheCreationInputTokens: 200, + cacheReadInputTokens: 8000, + }) + }) + it('normalizes full Anthropic completions into Studio content blocks', async () => { anthropicState.create.mockResolvedValue({ content: [ @@ -88,6 +156,8 @@ describe('anthropic provider', () => { usage: { input_tokens: 11, output_tokens: 22, + cache_creation_input_tokens: 100, + cache_read_input_tokens: 4000, }, }) @@ -106,7 +176,48 @@ describe('anthropic provider', () => { { type: 'tool_use', id: 'tool-2', name: 'brain_query', input: { model: 'posts' } }, ], stopReason: 'tool_use', - usage: { inputTokens: 11, outputTokens: 22 }, + usage: { + inputTokens: 11, + outputTokens: 22, + cacheCreationInputTokens: 100, + cacheReadInputTokens: 4000, + }, + }) + }) + + it('maps system block array with cache_control to Anthropic SDK shape', async () => { + // Provider should pass blocks through with `cache_control` markers + // preserved and tools' last entry's cacheControl mapped too. + anthropicState.create.mockResolvedValue({ + content: [{ type: 'text', text: 'ok' }], + stop_reason: 'end_turn', + usage: { input_tokens: 1, output_tokens: 1 }, }) + + const { createAnthropicProvider } = await import('../../server/providers/anthropic-ai') + const provider = createAnthropicProvider() + await provider.createCompletion({ + model: 'claude-sonnet-4-20250514', + system: [ + { type: 'text', text: 'static body', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'brain index', cacheControl: { type: 'ephemeral' } }, + { type: 'text', text: 'dynamic body' }, + ], + messages: [{ role: 'user', content: 'hello' }], + tools: [ + { name: 'first', description: 'a', inputSchema: {} }, + { name: 'last', description: 'b', inputSchema: {}, cacheControl: { type: 'ephemeral' } }, + ], + maxTokens: 256, + }, 'api-key') + + const call = anthropicState.create.mock.calls[0]![0] + expect(call.system).toEqual([ + { type: 'text', text: 'static body', cache_control: { type: 'ephemeral' } }, + { type: 'text', text: 'brain index', cache_control: { type: 'ephemeral' } }, + { type: 'text', text: 'dynamic body' }, + ]) + expect(call.tools[0]).not.toHaveProperty('cache_control') + expect(call.tools[1]).toMatchObject({ cache_control: { type: 'ephemeral' } }) }) }) diff --git a/tests/unit/db.test.ts b/tests/unit/db.test.ts index a0af57d..17155c4 100644 --- a/tests/unit/db.test.ts +++ b/tests/unit/db.test.ts @@ -106,19 +106,21 @@ describe('db helpers', () => { it('saves chat results via provider methods', async () => { const { saveChatResult } = await loadDbModule() - await saveChatResult( - 'conv-1', - 'Hello', - 'World', - [{ type: 'text', text: 'World' }], - 'claude-sonnet-4-20250514', - 7, - 3, - 'workspace-1', - 'user-1', - 'studio', - '2026-04', - ) + await saveChatResult({ + conversationId: 'conv-1', + userMessage: 'Hello', + assistantText: 'World', + assistantContent: [{ type: 'text', text: 'World' }], + model: 'claude-sonnet-4-20250514', + inputTokens: 7, + outputTokens: 3, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'studio', + usageMonth: '2026-04', + }) expect(mockDb.insertMessage).toHaveBeenCalledTimes(2) expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ role: 'user', content: 'Hello' })) @@ -130,25 +132,29 @@ describe('db helpers', () => { source: 'studio', inputTokens: 7, outputTokens: 3, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, })) expect(mockDb.updateConversationTimestamp).toHaveBeenCalledWith('conv-1') }) it('updates token counts on the usage row reserved by the atomic limit check', async () => { const { saveChatResult } = await loadDbModule() - await saveChatResult( - 'conv-1', - 'Hello', - '', - [], - 'claude-haiku-4-5-20251001', - 4, - 2, - 'workspace-1', - 'user-1', - 'byoa', - '2026-04', - ) + await saveChatResult({ + conversationId: 'conv-1', + userMessage: 'Hello', + assistantText: '', + assistantContent: [], + model: 'claude-haiku-4-5-20251001', + inputTokens: 4, + outputTokens: 2, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'byoa', + usageMonth: '2026-04', + }) expect(mockDb.updateAgentUsageTokens).toHaveBeenCalledWith(expect.objectContaining({ source: 'byoa', @@ -158,20 +164,60 @@ describe('db helpers', () => { })) }) + it('propagates cache token buckets through saveChatResult', async () => { + // Second turn of a cached conversation — most input is cache_read + // (cheap), a small slice is cache_creation, base input/output are + // small. All four buckets must land on agent_usage row + message row. + const { saveChatResult } = await loadDbModule() + await saveChatResult({ + conversationId: 'conv-1', + userMessage: 'follow up', + assistantText: 'short', + assistantContent: [{ type: 'text', text: 'short' }], + model: 'claude-sonnet-4-5', + inputTokens: 80, + outputTokens: 12, + cacheCreationInputTokens: 150, + cacheReadInputTokens: 9000, + workspaceId: 'workspace-1', + userId: 'user-1', + usageSource: 'studio', + usageMonth: '2026-04', + }) + + expect(mockDb.updateAgentUsageTokens).toHaveBeenCalledWith({ + workspaceId: 'workspace-1', + userId: 'user-1', + month: '2026-04', + source: 'studio', + inputTokens: 80, + outputTokens: 12, + cacheCreationInputTokens: 150, + cacheReadInputTokens: 9000, + }) + expect(mockDb.insertMessage).toHaveBeenCalledWith(expect.objectContaining({ + role: 'assistant', + cacheCreationInputTokens: 150, + cacheReadInputTokens: 9000, + })) + }) + it('saveApiChatResult writes to api_message_usage, not agent_usage', async () => { const { saveApiChatResult } = await loadDbModule() - await saveApiChatResult( - 'conv-2', - 'Hello', - 'World', - [{ type: 'text', text: 'World' }], - 'claude-sonnet-4-5', - 11, - 5, - 'workspace-1', - 'key-abc', - '2026-04', - ) + await saveApiChatResult({ + conversationId: 'conv-2', + userMessage: 'Hello', + assistantText: 'World', + assistantContent: [{ type: 'text', text: 'World' }], + model: 'claude-sonnet-4-5', + inputTokens: 11, + outputTokens: 5, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, + workspaceId: 'workspace-1', + apiKeyId: 'key-abc', + usageMonth: '2026-04', + }) // Messages persist into the shared messages table the same way as // the Studio path; the difference is only in which usage table the @@ -186,6 +232,8 @@ describe('db helpers', () => { month: '2026-04', inputTokens: 11, outputTokens: 5, + cacheCreationInputTokens: 0, + cacheReadInputTokens: 0, }) // Critical: must NOT touch the user-keyed agent_usage path. expect(mockDb.updateAgentUsageTokens).not.toHaveBeenCalled()