From 3a5787a36bf0f5de1e9471eaf3417773752f19a1 Mon Sep 17 00:00:00 2001 From: Kulvir Date: Wed, 4 Mar 2026 11:12:50 -0800 Subject: [PATCH] feat: add CLI observability telemetry with two-tier auth and command tracking - Buffered fire-and-forget telemetry client flushing every 5s to `POST /api/observability/ingest` - Two-tier init: authenticated users via `Control.account()` with Bearer token, anonymous users via `ALTIMATE_TELEMETRY_URL` env var - Track 7 event types: `session_start`, `session_end`, `generation`, `tool_call`, `bridge_call`, `error`, `command` - Instrument `processor.ts` with generation, tool call, and error tracking - Instrument `prompt.ts` with session lifecycle and slash command tracking - Instrument `bridge/client.ts` with bridge call duration tracking - Conditional Bearer header and 401 retry only for authenticated sessions - Ring buffer (200 max), 10s timeout, `unref()` timer, silent failure Co-Authored-By: Kai (Claude Opus 4.6) --- packages/altimate-code/src/bridge/client.ts | 40 +++- .../altimate-code/src/session/processor.ts | 59 ++++- packages/altimate-code/src/session/prompt.ts | 45 ++++ packages/altimate-code/src/telemetry/index.ts | 225 ++++++++++++++++++ 4 files changed, 365 insertions(+), 4 deletions(-) create mode 100644 packages/altimate-code/src/telemetry/index.ts diff --git a/packages/altimate-code/src/bridge/client.ts b/packages/altimate-code/src/bridge/client.ts index 20c06d5ecb..e6365cac69 100644 --- a/packages/altimate-code/src/bridge/client.ts +++ b/packages/altimate-code/src/bridge/client.ts @@ -11,6 +11,7 @@ import { existsSync } from "fs" import path from "path" import { ensureEngine, enginePythonPath } from "./engine" import type { BridgeMethod, BridgeMethods } from "./protocol" +import { Telemetry } from "@/telemetry" /** Resolve the Python interpreter to use for the engine sidecar. * Exported for testing — not part of the public API. */ @@ -48,6 +49,7 @@ export namespace Bridge { method: M, params: (typeof BridgeMethods)[M] extends { params: infer P } ? P : never, ): Promise<(typeof BridgeMethods)[M] extends { result: infer R } ? R : never> { + const startTime = Date.now() if (!child || child.exitCode !== null) { if (restartCount >= MAX_RESTARTS) throw new Error("Python bridge failed after max restarts") await start() @@ -55,13 +57,47 @@ export namespace Bridge { const id = ++requestId const request = JSON.stringify({ jsonrpc: "2.0", method, params, id }) return new Promise((resolve, reject) => { - pending.set(id, { resolve, reject }) + pending.set(id, { + resolve: (value: any) => { + Telemetry.track({ + type: "bridge_call", + timestamp: Date.now(), + session_id: Telemetry.getContext().sessionId, + method, + status: "success", + duration_ms: Date.now() - startTime, + }) + resolve(value) + }, + reject: (reason: any) => { + Telemetry.track({ + type: "bridge_call", + timestamp: Date.now(), + session_id: Telemetry.getContext().sessionId, + method, + status: "error", + duration_ms: Date.now() - startTime, + error: String(reason).slice(0, 500), + }) + reject(reason) + }, + }) child!.stdin!.write(request + "\n") setTimeout(() => { if (pending.has(id)) { pending.delete(id) - reject(new Error(`Bridge timeout: ${method} (${CALL_TIMEOUT_MS}ms)`)) + const error = new Error(`Bridge timeout: ${method} (${CALL_TIMEOUT_MS}ms)`) + Telemetry.track({ + type: "bridge_call", + timestamp: Date.now(), + session_id: Telemetry.getContext().sessionId, + method, + status: "error", + duration_ms: Date.now() - startTime, + error: error.message, + }) + reject(error) } }, CALL_TIMEOUT_MS) }) diff --git a/packages/altimate-code/src/session/processor.ts b/packages/altimate-code/src/session/processor.ts index e7532d2007..4140536913 100644 --- a/packages/altimate-code/src/session/processor.ts +++ b/packages/altimate-code/src/session/processor.ts @@ -15,6 +15,7 @@ import { Config } from "@/config/config" import { SessionCompaction } from "./compaction" import { PermissionNext } from "@/permission/next" import { Question } from "@/question" +import { Telemetry } from "@/telemetry" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 @@ -34,11 +35,16 @@ export namespace SessionProcessor { let blocked = false let attempt = 0 let needsCompaction = false + let stepStartTime = Date.now() + let toolCallCounter = 0 const result = { get message() { return input.assistantMessage }, + get toolCallCount() { + return toolCallCounter + }, partFromToolCall(toolCallID: string) { return toolcalls[toolCallID] }, @@ -195,7 +201,17 @@ export namespace SessionProcessor { attachments: value.output.attachments, }, }) - + toolCallCounter++ + Telemetry.track({ + type: "tool_call", + timestamp: Date.now(), + session_id: input.sessionID, + message_id: input.assistantMessage.id, + tool_name: match.tool, + tool_type: match.tool.startsWith("mcp__") ? "mcp" : "standard", + status: "success", + duration_ms: Date.now() - match.state.time.start, + }) delete toolcalls[value.toolCallId] } break @@ -216,7 +232,18 @@ export namespace SessionProcessor { }, }, }) - + toolCallCounter++ + Telemetry.track({ + type: "tool_call", + timestamp: Date.now(), + session_id: input.sessionID, + message_id: input.assistantMessage.id, + tool_name: match.tool, + tool_type: match.tool.startsWith("mcp__") ? "mcp" : "standard", + status: "error", + duration_ms: Date.now() - match.state.time.start, + error: (value.error as any).toString().slice(0, 500), + }) if ( value.error instanceof PermissionNext.RejectedError || value.error instanceof Question.RejectedError @@ -231,6 +258,7 @@ export namespace SessionProcessor { throw value.error case "start-step": + stepStartTime = Date.now() snapshot = await Snapshot.track() await Session.updatePart({ id: Identifier.ascending("part"), @@ -261,6 +289,25 @@ export namespace SessionProcessor { cost: usage.cost, }) await Session.updateMessage(input.assistantMessage) + Telemetry.track({ + type: "generation", + timestamp: Date.now(), + session_id: input.sessionID, + message_id: input.assistantMessage.id, + model_id: input.model.id, + provider_id: input.model.providerID, + agent: input.assistantMessage.agent ?? "", + finish_reason: value.finishReason, + tokens: { + input: usage.tokens.input, + output: usage.tokens.output, + reasoning: usage.tokens.reasoning, + cache_read: usage.tokens.cache.read, + cache_write: usage.tokens.cache.write, + }, + cost: usage.cost, + duration_ms: Date.now() - stepStartTime, + }) if (snapshot) { const patch = await Snapshot.patch(snapshot) if (patch.files.length) { @@ -352,6 +399,14 @@ export namespace SessionProcessor { error: e, stack: JSON.stringify(e.stack), }) + Telemetry.track({ + type: "error", + timestamp: Date.now(), + session_id: input.sessionID, + error_name: e?.name ?? "UnknownError", + error_message: (e?.message ?? String(e)).slice(0, 1000), + context: "processor", + }) const error = MessageV2.fromError(e, { providerID: input.model.providerID }) if (MessageV2.ContextOverflowError.isInstance(error)) { // TODO: Handle context overflow error diff --git a/packages/altimate-code/src/session/prompt.ts b/packages/altimate-code/src/session/prompt.ts index 393142e922..0377635b1f 100644 --- a/packages/altimate-code/src/session/prompt.ts +++ b/packages/altimate-code/src/session/prompt.ts @@ -45,6 +45,7 @@ import { LLM } from "./llm" import { iife } from "@/util/iife" import { Shell } from "@/shell/shell" import { Truncate } from "@/tool/truncation" +import { Telemetry } from "@/telemetry" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -290,7 +291,14 @@ export namespace SessionPrompt { let structuredOutput: unknown | undefined let step = 0 + const sessionStartTime = Date.now() + let sessionTotalCost = 0 + let sessionTotalTokens = 0 + let toolCallCount = 0 const session = await Session.get(sessionID) + await Telemetry.init() + Telemetry.setContext({ sessionId: sessionID, projectId: Instance.project?.id ?? "" }) + try { while (true) { SessionStatus.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) @@ -624,6 +632,15 @@ export namespace SessionPrompt { sessionID: sessionID, messageID: lastUser.id, }) + Telemetry.track({ + type: "session_start", + timestamp: Date.now(), + session_id: sessionID, + model_id: model.id, + provider_id: model.providerID, + agent: lastUser.agent, + project_id: Instance.project?.id ?? "", + }) } // Ephemerally wrap queued user messages with a reminder to stay on track @@ -680,6 +697,13 @@ export namespace SessionPrompt { toolChoice: format.type === "json_schema" ? "required" : undefined, }) + sessionTotalCost += processor.message.cost + sessionTotalTokens += + (processor.message.tokens?.input ?? 0) + + (processor.message.tokens?.output ?? 0) + + (processor.message.tokens?.reasoning ?? 0) + toolCallCount += processor.toolCallCount + // If structured output was captured, save it and exit immediately // This takes priority because the StructuredOutput tool was called successfully if (structuredOutput !== undefined) { @@ -716,6 +740,18 @@ export namespace SessionPrompt { continue } SessionCompaction.prune({ sessionID }) + } finally { + Telemetry.track({ + type: "session_end", + timestamp: Date.now(), + session_id: sessionID, + total_cost: sessionTotalCost, + total_tokens: sessionTotalTokens, + tool_call_count: toolCallCount, + duration_ms: Date.now() - sessionStartTime, + }) + await Telemetry.shutdown() + } for await (const item of MessageV2.stream(sessionID)) { if (item.info.role === "user") continue const queued = state()[sessionID]?.callbacks ?? [] @@ -1886,6 +1922,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the messageID: result.info.id, }) + Telemetry.track({ + type: "command", + timestamp: Date.now(), + session_id: input.sessionID, + command_name: input.command, + command_source: command.source ?? "unknown", + message_id: result.info.id, + }) + return result } diff --git a/packages/altimate-code/src/telemetry/index.ts b/packages/altimate-code/src/telemetry/index.ts new file mode 100644 index 0000000000..8750c0d0f5 --- /dev/null +++ b/packages/altimate-code/src/telemetry/index.ts @@ -0,0 +1,225 @@ +import { Control } from "@/control" +import { Installation } from "@/installation" +import { Log } from "@/util/log" + +const log = Log.create({ service: "telemetry" }) + +export namespace Telemetry { + const FLUSH_INTERVAL_MS = 5_000 + const MAX_BUFFER_SIZE = 200 + const REQUEST_TIMEOUT_MS = 10_000 + + export type TokensPayload = { + input: number + output: number + reasoning: number + cache_read: number + cache_write: number + } + + export type Event = + | { + type: "session_start" + timestamp: number + session_id: string + model_id: string + provider_id: string + agent: string + project_id: string + } + | { + type: "session_end" + timestamp: number + session_id: string + total_cost: number + total_tokens: number + tool_call_count: number + duration_ms: number + } + | { + type: "generation" + timestamp: number + session_id: string + message_id: string + model_id: string + provider_id: string + agent: string + finish_reason: string + tokens: TokensPayload + cost: number + duration_ms: number + } + | { + type: "tool_call" + timestamp: number + session_id: string + message_id: string + tool_name: string + tool_type: "standard" | "mcp" + status: "success" | "error" + duration_ms: number + error?: string + } + | { + type: "bridge_call" + timestamp: number + session_id: string + method: string + status: "success" | "error" + duration_ms: number + error?: string + } + | { + type: "error" + timestamp: number + session_id: string + error_name: string + error_message: string + context: string + } + | { + type: "command" + timestamp: number + session_id: string + command_name: string + command_source: "command" | "mcp" | "skill" | "unknown" + message_id: string + } + + type Batch = { + session_id: string + cli_version: string + user_email: string + project_id: string + timestamp: number + events: Event[] + } + + let enabled = false + let authenticated = false + let buffer: Event[] = [] + let flushTimer: ReturnType | undefined + let accountUrl = "" + let cachedToken = "" + let userEmail = "" + let sessionId = "" + let projectId = "" + + export async function init() { + if (enabled || flushTimer) return + try { + const account = Control.account() + if (account) { + const token = await Control.token() + if (token) { + accountUrl = account.url + cachedToken = token + userEmail = account.email + authenticated = true + } + } + + // Fall back to env var for anonymous users + if (!accountUrl) { + const envUrl = process.env.ALTIMATE_TELEMETRY_URL + if (!envUrl) { + enabled = false + return + } + accountUrl = envUrl + } + + enabled = true + + const timer = setInterval(flush, FLUSH_INTERVAL_MS) + if (typeof timer === "object" && timer && "unref" in timer) (timer as any).unref() + flushTimer = timer + + log.info("telemetry initialized", { authenticated }) + } catch { + enabled = false + } + } + + export function setContext(opts: { sessionId: string; projectId: string }) { + sessionId = opts.sessionId + projectId = opts.projectId + } + + export function getContext() { + return { sessionId, projectId } + } + + export function track(event: Event) { + if (!enabled) return + buffer.push(event) + if (buffer.length > MAX_BUFFER_SIZE) { + buffer.shift() + } + } + + export async function flush() { + if (!enabled || buffer.length === 0) return + + const events = buffer.splice(0, buffer.length) + const batch: Batch = { + session_id: sessionId, + cli_version: Installation.VERSION, + user_email: userEmail, + project_id: projectId, + timestamp: Date.now(), + events, + } + + try { + const headers: Record = { "Content-Type": "application/json" } + if (authenticated && cachedToken) { + headers["Authorization"] = `Bearer ${cachedToken}` + } + + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS) + + const response = await fetch(`${accountUrl}/api/observability/ingest`, { + method: "POST", + headers, + body: JSON.stringify(batch), + signal: controller.signal, + }) + clearTimeout(timeout) + + if (authenticated && response.status === 401) { + const newToken = await Control.token() + if (!newToken) return + cachedToken = newToken + const retryController = new AbortController() + const retryTimeout = setTimeout(() => retryController.abort(), REQUEST_TIMEOUT_MS) + await fetch(`${accountUrl}/api/observability/ingest`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${cachedToken}`, + }, + body: JSON.stringify(batch), + signal: retryController.signal, + }) + clearTimeout(retryTimeout) + } + } catch { + // Silently drop on failure — telemetry must never break the CLI + } + } + + export async function shutdown() { + if (flushTimer) { + clearInterval(flushTimer) + flushTimer = undefined + } + await flush() + enabled = false + authenticated = false + buffer = [] + sessionId = "" + projectId = "" + } +}