Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions packages/altimate-code/src/bridge/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { existsSync } from "fs"
import path from "path"
import { ensureEngine, enginePythonPath } from "./engine"
import type { BridgeMethod, BridgeMethods } from "./protocol"
import { Telemetry } from "@/telemetry"

/** Resolve the Python interpreter to use for the engine sidecar.
* Exported for testing — not part of the public API. */
Expand Down Expand Up @@ -48,20 +49,55 @@ export namespace Bridge {
method: M,
params: (typeof BridgeMethods)[M] extends { params: infer P } ? P : never,
): Promise<(typeof BridgeMethods)[M] extends { result: infer R } ? R : never> {
const startTime = Date.now()
if (!child || child.exitCode !== null) {
if (restartCount >= MAX_RESTARTS) throw new Error("Python bridge failed after max restarts")
await start()
}
const id = ++requestId
const request = JSON.stringify({ jsonrpc: "2.0", method, params, id })
return new Promise((resolve, reject) => {
pending.set(id, { resolve, reject })
pending.set(id, {
resolve: (value: any) => {
Telemetry.track({
type: "bridge_call",
timestamp: Date.now(),
session_id: Telemetry.getContext().sessionId,
method,
status: "success",
duration_ms: Date.now() - startTime,
})
resolve(value)
},
reject: (reason: any) => {
Telemetry.track({
type: "bridge_call",
timestamp: Date.now(),
session_id: Telemetry.getContext().sessionId,
method,
status: "error",
duration_ms: Date.now() - startTime,
error: String(reason).slice(0, 500),
})
reject(reason)
},
})
child!.stdin!.write(request + "\n")

setTimeout(() => {
if (pending.has(id)) {
pending.delete(id)
reject(new Error(`Bridge timeout: ${method} (${CALL_TIMEOUT_MS}ms)`))
const error = new Error(`Bridge timeout: ${method} (${CALL_TIMEOUT_MS}ms)`)
Telemetry.track({
type: "bridge_call",
timestamp: Date.now(),
session_id: Telemetry.getContext().sessionId,
method,
status: "error",
duration_ms: Date.now() - startTime,
error: error.message,
})
reject(error)
}
}, CALL_TIMEOUT_MS)
})
Expand Down
59 changes: 57 additions & 2 deletions packages/altimate-code/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Config } from "@/config/config"
import { SessionCompaction } from "./compaction"
import { PermissionNext } from "@/permission/next"
import { Question } from "@/question"
import { Telemetry } from "@/telemetry"

export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
Expand All @@ -34,11 +35,16 @@ export namespace SessionProcessor {
let blocked = false
let attempt = 0
let needsCompaction = false
let stepStartTime = Date.now()
let toolCallCounter = 0

const result = {
get message() {
return input.assistantMessage
},
get toolCallCount() {
return toolCallCounter
},
partFromToolCall(toolCallID: string) {
return toolcalls[toolCallID]
},
Expand Down Expand Up @@ -195,7 +201,17 @@ export namespace SessionProcessor {
attachments: value.output.attachments,
},
})

toolCallCounter++
Telemetry.track({
type: "tool_call",
timestamp: Date.now(),
session_id: input.sessionID,
message_id: input.assistantMessage.id,
tool_name: match.tool,
tool_type: match.tool.startsWith("mcp__") ? "mcp" : "standard",
status: "success",
duration_ms: Date.now() - match.state.time.start,
})
delete toolcalls[value.toolCallId]
}
break
Expand All @@ -216,7 +232,18 @@ export namespace SessionProcessor {
},
},
})

toolCallCounter++
Telemetry.track({
type: "tool_call",
timestamp: Date.now(),
session_id: input.sessionID,
message_id: input.assistantMessage.id,
tool_name: match.tool,
tool_type: match.tool.startsWith("mcp__") ? "mcp" : "standard",
status: "error",
duration_ms: Date.now() - match.state.time.start,
error: (value.error as any).toString().slice(0, 500),
})
if (
value.error instanceof PermissionNext.RejectedError ||
value.error instanceof Question.RejectedError
Expand All @@ -231,6 +258,7 @@ export namespace SessionProcessor {
throw value.error

case "start-step":
stepStartTime = Date.now()
snapshot = await Snapshot.track()
await Session.updatePart({
id: Identifier.ascending("part"),
Expand Down Expand Up @@ -261,6 +289,25 @@ export namespace SessionProcessor {
cost: usage.cost,
})
await Session.updateMessage(input.assistantMessage)
Telemetry.track({
type: "generation",
timestamp: Date.now(),
session_id: input.sessionID,
message_id: input.assistantMessage.id,
model_id: input.model.id,
provider_id: input.model.providerID,
agent: input.assistantMessage.agent ?? "",
finish_reason: value.finishReason,
tokens: {
input: usage.tokens.input,
output: usage.tokens.output,
reasoning: usage.tokens.reasoning,
cache_read: usage.tokens.cache.read,
cache_write: usage.tokens.cache.write,
},
cost: usage.cost,
duration_ms: Date.now() - stepStartTime,
})
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
if (patch.files.length) {
Expand Down Expand Up @@ -352,6 +399,14 @@ export namespace SessionProcessor {
error: e,
stack: JSON.stringify(e.stack),
})
Telemetry.track({
type: "error",
timestamp: Date.now(),
session_id: input.sessionID,
error_name: e?.name ?? "UnknownError",
error_message: (e?.message ?? String(e)).slice(0, 1000),
context: "processor",
})
const error = MessageV2.fromError(e, { providerID: input.model.providerID })
if (MessageV2.ContextOverflowError.isInstance(error)) {
// TODO: Handle context overflow error
Expand Down
45 changes: 45 additions & 0 deletions packages/altimate-code/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { LLM } from "./llm"
import { iife } from "@/util/iife"
import { Shell } from "@/shell/shell"
import { Truncate } from "@/tool/truncation"
import { Telemetry } from "@/telemetry"

// @ts-ignore
globalThis.AI_SDK_LOG_WARNINGS = false
Expand Down Expand Up @@ -290,7 +291,14 @@ export namespace SessionPrompt {
let structuredOutput: unknown | undefined

let step = 0
const sessionStartTime = Date.now()
let sessionTotalCost = 0
let sessionTotalTokens = 0
let toolCallCount = 0
const session = await Session.get(sessionID)
await Telemetry.init()
Telemetry.setContext({ sessionId: sessionID, projectId: Instance.project?.id ?? "" })
try {
while (true) {
SessionStatus.set(sessionID, { type: "busy" })
log.info("loop", { step, sessionID })
Expand Down Expand Up @@ -624,6 +632,15 @@ export namespace SessionPrompt {
sessionID: sessionID,
messageID: lastUser.id,
})
Telemetry.track({
type: "session_start",
timestamp: Date.now(),
session_id: sessionID,
model_id: model.id,
provider_id: model.providerID,
agent: lastUser.agent,
project_id: Instance.project?.id ?? "",
})
}

// Ephemerally wrap queued user messages with a reminder to stay on track
Expand Down Expand Up @@ -680,6 +697,13 @@ export namespace SessionPrompt {
toolChoice: format.type === "json_schema" ? "required" : undefined,
})

sessionTotalCost += processor.message.cost
sessionTotalTokens +=
(processor.message.tokens?.input ?? 0) +
(processor.message.tokens?.output ?? 0) +
(processor.message.tokens?.reasoning ?? 0)
toolCallCount += processor.toolCallCount

// If structured output was captured, save it and exit immediately
// This takes priority because the StructuredOutput tool was called successfully
if (structuredOutput !== undefined) {
Expand Down Expand Up @@ -716,6 +740,18 @@ export namespace SessionPrompt {
continue
}
SessionCompaction.prune({ sessionID })
} finally {
Telemetry.track({
type: "session_end",
timestamp: Date.now(),
session_id: sessionID,
total_cost: sessionTotalCost,
total_tokens: sessionTotalTokens,
tool_call_count: toolCallCount,
duration_ms: Date.now() - sessionStartTime,
})
await Telemetry.shutdown()
}
for await (const item of MessageV2.stream(sessionID)) {
if (item.info.role === "user") continue
const queued = state()[sessionID]?.callbacks ?? []
Expand Down Expand Up @@ -1886,6 +1922,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the
messageID: result.info.id,
})

Telemetry.track({
type: "command",
timestamp: Date.now(),
session_id: input.sessionID,
command_name: input.command,
command_source: command.source ?? "unknown",
message_id: result.info.id,
})

return result
}

Expand Down
Loading