Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions ee/enterprise/conversation-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { ChatUIContext } from '../../server/utils/agent-types'
import { toAITools } from '../../server/utils/agent-types'
import { classifyIntent } from '../../server/utils/agent-context'
import { deriveProjectPhase } from '../../server/utils/agent-state-machine'
import { buildSystemPrompt } from '../../server/utils/agent-system-prompt'
import { buildSystemPromptBlocks, toSystemBlocks } from '../../server/utils/agent-system-prompt'
import { STUDIO_TOOLS, filterToolsByPermissions } from '../../server/utils/agent-tools'
import { buildContentIndex, getOrBuildBrainCache } from '../../server/utils/brain-cache'
import { createContentEngine } from '../../server/utils/content-engine'
Expand Down Expand Up @@ -252,26 +252,30 @@ async function runConversationMessage(
const phase = deriveProjectPhase(projectConfig, pendingBranches, project.status ?? 'active')
const intent = classifyIntent(body.message, uiContext, phase)

let systemPrompt = buildSystemPrompt(
const contentIndex = buildContentIndex(brain)
const promptBlocks = buildSystemPromptBlocks(
projectConfig,
models,
permissions,
{ initialized: !!projectConfig, pendingBranches, projectStatus: project.status ?? 'active', phase, contentContext },
uiContext,
intent,
contentIndex || null,
vocabulary,
plan,
keyData.customInstructions,
)

const contentIndex = buildContentIndex(brain)
if (contentIndex)
systemPrompt += `\n\n${contentIndex}`
const systemPrompt = toSystemBlocks(promptBlocks)

const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as typeof STUDIO_TOOLS
const phaseFiltered = permissionFiltered.filter(tool => tool.requiredPhase.includes(phase))
const aiTools = toAITools(phaseFiltered)

// Cache the tools array — same rationale as the Studio chat path.
if (aiTools.length > 0) {
aiTools[aiTools.length - 1]!.cacheControl = { type: 'ephemeral' }
}

const runtimeConfig = useRuntimeConfig()
const apiKey = runtimeConfig.anthropic.apiKey
if (!apiKey)
Expand Down Expand Up @@ -312,6 +316,8 @@ async function runConversationMessage(
let responseText = ''
let totalInputTokens = 0
let totalOutputTokens = 0
let totalCacheCreationInputTokens = 0
let totalCacheReadInputTokens = 0
let lastAssistantContent: AIContentBlock[] = []

for await (const evt of runConversationLoop(
Expand Down Expand Up @@ -346,26 +352,32 @@ async function runConversationMessage(
case 'tool_result':
toolResults.push({ id: evt.id as string, name: evt.name as string, result: evt.result })
break
case 'done':
totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0
totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0
case 'done': {
const u = evt.usage as { inputTokens?: number, outputTokens?: number, cacheCreationInputTokens?: number, cacheReadInputTokens?: number } | undefined
totalInputTokens = u?.inputTokens ?? 0
totalOutputTokens = u?.outputTokens ?? 0
totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0
totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0
lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? []
break
}
}
}

await saveApiChatResult(
await saveApiChatResult({
conversationId,
body.message,
responseText,
lastAssistantContent,
userMessage: body.message,
assistantText: responseText,
assistantContent: lastAssistantContent,
model,
totalInputTokens,
totalOutputTokens,
keyData.workspaceId,
keyData.keyId,
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
cacheCreationInputTokens: totalCacheCreationInputTokens,
cacheReadInputTokens: totalCacheReadInputTokens,
workspaceId: keyData.workspaceId,
apiKeyId: keyData.keyId,
usageMonth,
)
})

return {
conversationId,
Expand All @@ -374,6 +386,8 @@ async function runConversationMessage(
usage: {
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
cacheCreationInputTokens: totalCacheCreationInputTokens,
cacheReadInputTokens: totalCacheReadInputTokens,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,32 @@ export default defineEventHandler(async (event) => {
contentContext,
}

let systemPrompt = buildSystemPrompt(projectConfig, models, permissions, projectState, uiContext, intent, vocabulary, plan)

// Append content index from brain cache (compact summary of all content)
// Build the system prompt as cache-aware blocks: the static body
// (role, architecture, schema, vocab, permissions, base rules,
// custom instructions) and the brain content index get their own
// `cache_control` markers; the dynamic block (UI context, intent,
// state) follows uncached so request-by-request changes never
// invalidate the cached prefix.
const contentIndex = buildContentIndex(brain)
if (contentIndex) {
systemPrompt += `\n\n${contentIndex}`
}
const promptBlocks = buildSystemPromptBlocks(
projectConfig, models, permissions, projectState, uiContext, intent,
contentIndex || null,
vocabulary, plan,
)
const systemPrompt = toSystemBlocks(promptBlocks)

// === FILTER TOOLS by permissions + phase ===
const permissionFiltered = filterToolsByPermissions(STUDIO_TOOLS, permissions.availableTools) as StudioTool[]
const phaseFiltered = permissionFiltered.filter(t => t.requiredPhase.includes(phase))
const aiTools = toAITools(phaseFiltered)

// Mark the last tool with cache_control so Anthropic caches the
// entire tools block. Tools rarely change within a session — this
// is a high-hit-rate third breakpoint after the two system blocks.
if (aiTools.length > 0) {
aiTools[aiTools.length - 1]!.cacheControl = { type: 'ephemeral' }
}

// Workflow: plans without review feature always auto-merge regardless of config
const configWorkflow = projectConfig?.workflow ?? 'auto-merge'
const workflow = hasFeature(plan, 'workflow.review') ? configWorkflow : 'auto-merge'
Expand All @@ -216,6 +229,8 @@ export default defineEventHandler(async (event) => {

let totalInputTokens = 0
let totalOutputTokens = 0
let totalCacheCreationInputTokens = 0
let totalCacheReadInputTokens = 0
let lastAssistantContent: AIContentBlock[] = []

try {
Expand All @@ -238,9 +253,15 @@ export default defineEventHandler(async (event) => {

// Forward all events to SSE stream
if (evt.type === 'done') {
// Extract final state from done event before forwarding
totalInputTokens = (evt.usage as { inputTokens: number })?.inputTokens ?? 0
totalOutputTokens = (evt.usage as { outputTokens: number })?.outputTokens ?? 0
// Extract final state from done event before forwarding.
// Engine accumulates all four token buckets (regular
// input/output + cache creation + cache read) across
// streaming and non-streaming iterations.
const u = evt.usage as { inputTokens?: number, outputTokens?: number, cacheCreationInputTokens?: number, cacheReadInputTokens?: number } | undefined
totalInputTokens = u?.inputTokens ?? 0
totalOutputTokens = u?.outputTokens ?? 0
totalCacheCreationInputTokens = u?.cacheCreationInputTokens ?? 0
totalCacheReadInputTokens = u?.cacheReadInputTokens ?? 0
lastAssistantContent = (evt.lastContent as AIContentBlock[]) ?? []

// Forward the done event without lastContent (not needed by client)
Expand All @@ -261,11 +282,21 @@ export default defineEventHandler(async (event) => {
.map(b => (b as { text: string }).text)
.join('')

await saveChatResult(
conversationId, body.message, assistantText,
lastAssistantContent, model, totalInputTokens, totalOutputTokens,
workspaceId, session.user.id, usageSource, usageMonth,
)
await saveChatResult({
conversationId,
userMessage: body.message,
assistantText,
assistantContent: lastAssistantContent,
model,
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
cacheCreationInputTokens: totalCacheCreationInputTokens,
cacheReadInputTokens: totalCacheReadInputTokens,
workspaceId,
userId: session.user.id,
usageSource,
usageMonth,
})

// Webhook events are now emitted from conversation-engine.ts per tool execution
}
Expand Down
55 changes: 52 additions & 3 deletions server/providers/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ export interface AITool {
name: string
description: string
inputSchema: Record<string, unknown> // JSON Schema
/**
* Provider-agnostic prompt cache marker. When set on the LAST tool
* in the array, supporting providers (Anthropic) cache the full
* tools block. Unsupported providers ignore the field.
*/
cacheControl?: AICacheControl
}

export interface AIMessage {
Expand All @@ -25,6 +31,43 @@ export type AIContentBlock
| { type: 'tool_use', id: string, name: string, input: unknown }
| { type: 'tool_result', toolUseId: string, content: string }

/**
* Prompt cache marker. `ephemeral` is Anthropic's 5-minute TTL bucket.
* Provider-agnostic shape — providers that don't support prompt
* caching ignore the marker entirely and the request still works
* (just without cache benefits).
*/
export interface AICacheControl {
type: 'ephemeral'
}

/**
* Structured system prompt block. Use the array form of
* `AICompletionRequest.system` to place cache breakpoints between
* blocks. A plain `string` is equivalent to a single uncached block
* and stays accepted for backward compatibility.
*/
export interface AISystemBlock {
type: 'text'
text: string
cacheControl?: AICacheControl
}

/**
* Token usage on a single completion. Anthropic returns four disjoint
* buckets — `input_tokens` semantic is "non-cached input only," so
* the existing dashboards that sum it stay correct after this change.
* Total billable input cost (Contentrain-side) is approximately:
* inputTokens * 1x + cacheCreationInputTokens * 1.25x + cacheReadInputTokens * 0.1x
* at the base per-MTok price for the model.
*/
export interface AIUsage {
inputTokens: number
outputTokens: number
cacheCreationInputTokens: number
cacheReadInputTokens: number
}

export interface AIStreamEvent {
type: 'text' | 'tool_use_start' | 'tool_use_input' | 'tool_use_end' | 'message_end' | 'error'
// text
Expand All @@ -35,14 +78,20 @@ export interface AIStreamEvent {
toolInput?: unknown
// message_end
stopReason?: 'end_turn' | 'tool_use' | 'max_tokens'
usage?: { inputTokens: number, outputTokens: number }
usage?: AIUsage
// error
error?: string
}

export interface AICompletionRequest {
model: string
system: string
/**
* String form is treated as a single uncached system block. Array
* form lets callers place cache breakpoints between blocks; up to
* 4 cache_control markers are honored per request (Anthropic
* limit), shared across system + tools + messages.
*/
system: string | AISystemBlock[]
messages: AIMessage[]
tools: AITool[]
maxTokens: number
Expand All @@ -52,7 +101,7 @@ export interface AICompletionRequest {
export interface AICompletionResponse {
content: AIContentBlock[]
stopReason: 'end_turn' | 'tool_use' | 'max_tokens'
usage: { inputTokens: number, outputTokens: number }
usage: AIUsage
}

export interface AIProvider {
Expand Down
Loading
Loading