From 6376ba54f50f8817f39de59234d23873ec551992 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 10 May 2026 12:17:10 -0400 Subject: [PATCH 1/3] refactor(session): consume native LLM events --- bun.lock | 1 + packages/opencode/package.json | 1 + packages/opencode/src/session/llm-ai-sdk.ts | 223 ++++++++++++ packages/opencode/src/session/llm.ts | 13 +- packages/opencode/src/session/processor.ts | 318 +++++++++++------- packages/opencode/src/session/prompt.ts | 3 +- packages/opencode/src/session/session.ts | 12 +- .../opencode/test/session/compaction.test.ts | 275 +++------------ 8 files changed, 485 insertions(+), 361 deletions(-) create mode 100644 packages/opencode/src/session/llm-ai-sdk.ts diff --git a/bun.lock b/bun.lock index 4268e5fb7d46..58a9aa8fc837 100644 --- a/bun.lock +++ b/bun.lock @@ -396,6 +396,7 @@ "@octokit/graphql": "9.0.2", "@octokit/rest": "catalog:", "@openauthjs/openauth": "catalog:", + "@opencode-ai/llm": "workspace:*", "@opencode-ai/plugin": "workspace:*", "@opencode-ai/script": "workspace:*", "@opencode-ai/sdk": "workspace:*", diff --git a/packages/opencode/package.json b/packages/opencode/package.json index e9b811fc5e1d..3f63a85bf158 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -105,6 +105,7 @@ "@octokit/graphql": "9.0.2", "@octokit/rest": "catalog:", "@openauthjs/openauth": "catalog:", + "@opencode-ai/llm": "workspace:*", "@opencode-ai/plugin": "workspace:*", "@opencode-ai/script": "workspace:*", "@opencode-ai/sdk": "workspace:*", diff --git a/packages/opencode/src/session/llm-ai-sdk.ts b/packages/opencode/src/session/llm-ai-sdk.ts new file mode 100644 index 000000000000..9d5407f975f9 --- /dev/null +++ b/packages/opencode/src/session/llm-ai-sdk.ts @@ -0,0 +1,223 @@ +import { ContentBlockID, FinishReason, LLMEvent, ProviderMetadata, ToolCallID, ToolResultValue, Usage } from "@opencode-ai/llm" +import { Effect, Schema } from "effect" +import { type streamText } from "ai" +import { errorMessage } from "@/util/error" + +type Result = Awaited> +type AISDKEvent = Result["fullStream"] extends AsyncIterable ? T : never + +export function adapterState() { + return { + step: 0, + text: 0, + reasoning: 0, + currentTextID: undefined as ContentBlockID | undefined, + currentReasoningID: undefined as ContentBlockID | undefined, + toolNames: {} as Record, + } +} + +const contentBlockID = (value: string) => ContentBlockID.make(value) +const toolCallID = (value: string) => ToolCallID.make(value) + +function finishReason(value: string | undefined): FinishReason { + return Schema.is(FinishReason)(value) ? value : "unknown" +} + +function providerMetadata(value: unknown): ProviderMetadata | undefined { + return Schema.is(ProviderMetadata)(value) ? value : undefined +} + +function usage(value: unknown): Usage | undefined { + if (!value || typeof value !== "object") return undefined + const item = value as { + inputTokens?: number + outputTokens?: number + totalTokens?: number + reasoningTokens?: number + cachedInputTokens?: number + inputTokenDetails?: { cacheReadTokens?: number; cacheWriteTokens?: number } + outputTokenDetails?: { reasoningTokens?: number } + } + const result = Object.fromEntries( + Object.entries({ + inputTokens: item.inputTokens, + outputTokens: item.outputTokens, + totalTokens: item.totalTokens, + reasoningTokens: item.outputTokenDetails?.reasoningTokens ?? item.reasoningTokens, + cacheReadInputTokens: item.inputTokenDetails?.cacheReadTokens ?? item.cachedInputTokens, + cacheWriteInputTokens: item.inputTokenDetails?.cacheWriteTokens, + }).filter((entry) => entry[1] !== undefined), + ) + return new Usage(result) +} + +export function toLLMEvents( + state: ReturnType, + event: AISDKEvent, +): Effect.Effect, unknown> { + switch (event.type) { + case "start": + return Effect.succeed([]) + + case "start-step": + return Effect.succeed([LLMEvent.stepStart({ index: state.step })]) + + case "finish-step": + return Effect.sync(() => [ + LLMEvent.stepFinish({ + index: state.step++, + reason: finishReason(event.finishReason), + usage: usage(event.usage), + providerMetadata: providerMetadata(event.providerMetadata), + }), + ]) + + case "finish": + return Effect.sync(() => { + state.toolNames = {} + return [ + LLMEvent.requestFinish({ + reason: finishReason(event.finishReason), + usage: usage(event.totalUsage), + }), + ] + }) + + case "text-start": + return Effect.sync(() => { + state.currentTextID = contentBlockID(event.id ?? `text-${state.text++}`) + return [ + LLMEvent.textStart({ + id: state.currentTextID, + providerMetadata: providerMetadata(event.providerMetadata), + }), + ] + }) + + case "text-delta": + return Effect.succeed([ + LLMEvent.textDelta({ + id: event.id ? contentBlockID(event.id) : (state.currentTextID ?? contentBlockID(`text-${state.text++}`)), + text: event.text, + }), + ]) + + case "text-end": + return Effect.succeed([ + LLMEvent.textEnd({ + id: event.id ? contentBlockID(event.id) : (state.currentTextID ?? contentBlockID(`text-${state.text++}`)), + providerMetadata: providerMetadata(event.providerMetadata), + }), + ]) + + case "reasoning-start": + return Effect.sync(() => { + state.currentReasoningID = contentBlockID(event.id) + return [ + LLMEvent.reasoningStart({ + id: state.currentReasoningID, + providerMetadata: providerMetadata(event.providerMetadata), + }), + ] + }) + + case "reasoning-delta": + return Effect.succeed([ + LLMEvent.reasoningDelta({ + id: event.id ? contentBlockID(event.id) : (state.currentReasoningID ?? contentBlockID(`reasoning-${state.reasoning++}`)), + text: event.text, + }), + ]) + + case "reasoning-end": + return Effect.sync(() => { + const id = contentBlockID(event.id) + state.currentReasoningID = undefined + return [ + LLMEvent.reasoningEnd({ + id, + providerMetadata: providerMetadata(event.providerMetadata), + }), + ] + }) + + case "tool-input-start": + return Effect.sync(() => { + state.toolNames[event.id] = event.toolName + return [ + LLMEvent.toolInputStart({ + id: toolCallID(event.id), + name: event.toolName, + providerMetadata: providerMetadata(event.providerMetadata), + }), + ] + }) + + case "tool-input-delta": + return Effect.succeed([ + LLMEvent.toolInputDelta({ + id: toolCallID(event.id), + name: state.toolNames[event.id] ?? "unknown", + text: event.delta ?? "", + }), + ]) + + case "tool-input-end": + return Effect.succeed([ + LLMEvent.toolInputEnd({ + id: toolCallID(event.id), + name: state.toolNames[event.id] ?? "unknown", + }), + ]) + + case "tool-call": + return Effect.sync(() => { + state.toolNames[event.toolCallId] = event.toolName + return [ + LLMEvent.toolCall({ + id: toolCallID(event.toolCallId), + name: event.toolName, + input: event.input, + providerExecuted: "providerExecuted" in event ? event.providerExecuted : undefined, + providerMetadata: providerMetadata(event.providerMetadata), + }), + ] + }) + + case "tool-result": + return Effect.sync(() => { + const name = state.toolNames[event.toolCallId] ?? "unknown" + delete state.toolNames[event.toolCallId] + return [ + LLMEvent.toolResult({ + id: toolCallID(event.toolCallId), + name, + result: ToolResultValue.make(event.output), + providerExecuted: "providerExecuted" in event ? event.providerExecuted : undefined, + }), + ] + }) + + case "tool-error": + return Effect.sync(() => { + const name = state.toolNames[event.toolCallId] ?? "unknown" + delete state.toolNames[event.toolCallId] + return [ + LLMEvent.toolError({ + id: toolCallID(event.toolCallId), + name, + message: errorMessage(event.error), + }), + ] + }) + + case "error": + return Effect.fail(event.error) + + default: + return Effect.succeed([]) + } +} + +export * as LLMAISDK from "./llm-ai-sdk" diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index c7990d1b3539..28e05aba8f97 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -3,6 +3,7 @@ import * as Log from "@opencode-ai/core/util/log" import { Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" +import type { LLMEvent } from "@opencode-ai/llm" import { mergeDeep } from "remeda" import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider" import { ProviderTransform } from "@/provider/transform" @@ -24,10 +25,10 @@ import { InstallationVersion } from "@opencode-ai/core/installation/version" import { EffectBridge } from "@/effect/bridge" import * as Option from "effect/Option" import * as OtelTracer from "@effect/opentelemetry/Tracer" +import { LLMAISDK } from "./llm-ai-sdk" const log = Log.create({ service: "llm" }) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX -type Result = Awaited> // Avoid re-instantiating remeda's deep merge types in this hot LLM path; the runtime behavior is still mergeDeep. const mergeOptions = (target: Record, source: Record | undefined): Record => @@ -52,10 +53,8 @@ export type StreamRequest = StreamInput & { abort: AbortSignal } -export type Event = Result["fullStream"] extends AsyncIterable ? T : never - export interface Interface { - readonly stream: (input: StreamInput) => Stream.Stream + readonly stream: (input: StreamInput) => Stream.Stream } export class Service extends Context.Service()("@opencode/LLM") {} @@ -427,7 +426,11 @@ const live: Layer.Layer< const result = yield* run({ ...input, abort: ctrl.signal }) - return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e)))) + const state = LLMAISDK.adapterState() + return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e)))).pipe( + Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)), + Stream.flatMap((events) => Stream.fromIterable(events)), + ) }), ), ) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 579c4cc42c54..2d78dc70e882 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -1,4 +1,4 @@ -import { Cause, Deferred, Effect, Exit, Layer, Context, Scope } from "effect" +import { Cause, Deferred, Effect, Exit, Layer, Context, Scope, Schema } from "effect" import * as Stream from "effect/Stream" import { Agent } from "@/agent/agent" import { Bus } from "@/bus" @@ -26,14 +26,13 @@ import { SessionEvent } from "@/v2/session-event" import { Modelv2 } from "@/v2/model" import * as DateTime from "effect/DateTime" import { Flag } from "@opencode-ai/core/flag/flag" +import { Usage, type LLMEvent } from "@opencode-ai/llm" const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) export type Result = "compact" | "stop" | "continue" -export type Event = LLM.Event - export interface Handle { readonly message: MessageV2.Assistant readonly updateToolCall: ( @@ -67,6 +66,7 @@ type ToolCall = { messageID: MessageV2.ToolPart["messageID"] sessionID: MessageV2.ToolPart["sessionID"] done: Deferred.Deferred + inputEnded: boolean } interface ProcessorContext extends Input { @@ -79,7 +79,7 @@ interface ProcessorContext extends Input { reasoningMap: Record } -type StreamEvent = Event +type StreamEvent = LLMEvent export class Service extends Context.Service()("@opencode/SessionProcessor") {} @@ -223,9 +223,85 @@ export const layer: Layer.Layer< return true }) + const finishReasoning = Effect.fn("SessionProcessor.finishReasoning")(function* (reasoningID: string) { + if (!(reasoningID in ctx.reasoningMap)) return + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Reasoning.Ended.Sync, { + sessionID: ctx.sessionID, + reasoningID, + text: ctx.reasoningMap[reasoningID].text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + // oxlint-disable-next-line no-self-assign -- reactivity trigger + ctx.reasoningMap[reasoningID].text = ctx.reasoningMap[reasoningID].text + ctx.reasoningMap[reasoningID].time = { ...ctx.reasoningMap[reasoningID].time, end: Date.now() } + yield* session.updatePart(ctx.reasoningMap[reasoningID]) + delete ctx.reasoningMap[reasoningID] + }) + + const ensureToolCall = Effect.fn("SessionProcessor.ensureToolCall")(function* (input: { + id: string + name: string + providerExecuted?: boolean + }) { + const existing = yield* readToolCall(input.id) + if (existing) return existing + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Tool.Input.Started.Sync, { + sessionID: ctx.sessionID, + callID: input.id, + name: input.name, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + const part = yield* session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "tool", + tool: input.name, + callID: input.id, + state: { status: "pending", input: {}, raw: "" }, + metadata: input.providerExecuted ? { providerExecuted: true } : undefined, + } satisfies MessageV2.ToolPart) + ctx.toolcalls[input.id] = { + done: yield* Deferred.make(), + partID: part.id, + messageID: part.messageID, + sessionID: part.sessionID, + inputEnded: false, + } + return { call: ctx.toolcalls[input.id], part } + }) + + const isFilePart = Schema.is(MessageV2.FilePart) + + const toolResultOutput = (value: Extract) => { + if (isRecord(value.result.value) && typeof value.result.value.output === "string") { + return { + title: typeof value.result.value.title === "string" ? value.result.value.title : value.name, + metadata: isRecord(value.result.value.metadata) ? value.result.value.metadata : {}, + output: value.result.value.output, + attachments: Array.isArray(value.result.value.attachments) + ? value.result.value.attachments.filter(isFilePart) + : undefined, + } + } + return { + title: value.name, + metadata: value.result.type === "json" && isRecord(value.result.value) ? value.result.value : {}, + output: typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), + } + } + + const toolInput = (value: unknown): Record => (isRecord(value) ? value : { value }) + const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { switch (value.type) { - case "start": + case "request-start": yield* status.set(ctx.sessionID, { type: "busy" }) return @@ -251,116 +327,132 @@ export const layer: Layer.Layer< yield* session.updatePart(ctx.reasoningMap[value.id]) return - case "reasoning-delta": - if (!(value.id in ctx.reasoningMap)) return - ctx.reasoningMap[value.id].text += value.text - if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata + case "reasoning-delta": { + const reasoningID = value.id ?? "reasoning" + if (!(reasoningID in ctx.reasoningMap)) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Reasoning.Started.Sync, { + sessionID: ctx.sessionID, + reasoningID, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + ctx.reasoningMap[reasoningID] = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "reasoning", + text: "", + time: { start: Date.now() }, + } + yield* session.updatePart(ctx.reasoningMap[reasoningID]) + } + ctx.reasoningMap[reasoningID].text += value.text yield* session.updatePartDelta({ - sessionID: ctx.reasoningMap[value.id].sessionID, - messageID: ctx.reasoningMap[value.id].messageID, - partID: ctx.reasoningMap[value.id].id, + sessionID: ctx.reasoningMap[reasoningID].sessionID, + messageID: ctx.reasoningMap[reasoningID].messageID, + partID: ctx.reasoningMap[reasoningID].id, field: "text", delta: value.text, }) return + } case "reasoning-end": - if (!(value.id in ctx.reasoningMap)) return - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Reasoning.Ended.Sync, { - sessionID: ctx.sessionID, - reasoningID: value.id, - text: ctx.reasoningMap[value.id].text, - timestamp: DateTime.makeUnsafe(Date.now()), - }) + if (value.providerMetadata && value.id in ctx.reasoningMap) { + ctx.reasoningMap[value.id].metadata = value.providerMetadata } - // oxlint-disable-next-line no-self-assign -- reactivity trigger - ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text - ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() } - if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata - yield* session.updatePart(ctx.reasoningMap[value.id]) - delete ctx.reasoningMap[value.id] + yield* finishReasoning(value.id) return case "tool-input-start": if (ctx.assistantMessage.summary) { - throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) - } - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Tool.Input.Started.Sync, { - sessionID: ctx.sessionID, - callID: value.id, - name: value.toolName, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - const part = yield* session.updatePart({ - id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "tool", - tool: value.toolName, - callID: value.id, - state: { status: "pending", input: {}, raw: "" }, - metadata: value.providerExecuted ? { providerExecuted: true } : undefined, - } satisfies MessageV2.ToolPart) - ctx.toolcalls[value.id] = { - done: yield* Deferred.make(), - partID: part.id, - messageID: part.messageID, - sessionID: part.sessionID, + throw new Error(`Tool call not allowed while generating summary: ${value.name}`) } + yield* ensureToolCall(value) return - case "tool-input-delta": + case "tool-input-delta": { + if (ctx.assistantMessage.summary) { + throw new Error(`Tool call not allowed while generating summary: ${value.name}`) + } + yield* ensureToolCall(value) + if (value.text) { + yield* updateToolCall(value.id, (match) => ({ + ...match, + state: + match.state.status === "pending" + ? { ...match.state, raw: match.state.raw + value.text } + : match.state, + })) + } return + } case "tool-input-end": { + const toolCall = yield* ensureToolCall(value) // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { yield* sync.run(SessionEvent.Tool.Input.Ended.Sync, { sessionID: ctx.sessionID, callID: value.id, - text: "", + text: toolCall.part.state.status === "pending" ? toolCall.part.state.raw : "", timestamp: DateTime.makeUnsafe(Date.now()), }) } + ctx.toolcalls[value.id] = { ...toolCall.call, inputEnded: true } return } case "tool-call": { if (ctx.assistantMessage.summary) { - throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) + throw new Error(`Tool call not allowed while generating summary: ${value.name}`) + } + const toolCall = yield* ensureToolCall(value) + const input = toolInput(value.input) + const raw = toolCall.part.state.status === "pending" ? toolCall.part.state.raw : "" + if (!toolCall.call.inputEnded) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Tool.Input.Ended.Sync, { + sessionID: ctx.sessionID, + callID: value.id, + text: raw, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } } - const toolCall = yield* readToolCall(value.toolCallId) // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { yield* sync.run(SessionEvent.Tool.Called.Sync, { sessionID: ctx.sessionID, - callID: value.toolCallId, - tool: value.toolName, - input: value.input, + callID: value.id, + tool: value.name, + input, provider: { - executed: toolCall?.part.metadata?.providerExecuted === true, + executed: toolCall.part.metadata?.providerExecuted === true, ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}), }, timestamp: DateTime.makeUnsafe(Date.now()), }) } - yield* updateToolCall(value.toolCallId, (match) => ({ + yield* updateToolCall(value.id, (match) => ({ ...match, - tool: value.toolName, - state: { - ...match.state, - status: "running", - input: value.input, - time: { start: Date.now() }, + tool: value.name, + state: + match.state.status === "running" + ? { ...match.state, input } + : { + status: "running", + input, + time: { start: Date.now() }, + }, + metadata: { + ...match.metadata, + ...value.providerMetadata, + ...(match.metadata?.providerExecuted ? { providerExecuted: true } : {}), }, - metadata: match.metadata?.providerExecuted - ? { ...value.providerMetadata, providerExecuted: true } - : value.providerMetadata, })) const parts = MessageV2.parts(ctx.assistantMessage.id) @@ -371,9 +463,9 @@ export const layer: Layer.Layer< !recentParts.every( (part) => part.type === "tool" && - part.tool === value.toolName && + part.tool === value.name && part.state.status !== "pending" && - JSON.stringify(part.state.input) === JSON.stringify(value.input), + JSON.stringify(part.state.input) === JSON.stringify(input), ) ) { return @@ -382,27 +474,19 @@ export const layer: Layer.Layer< const agent = yield* agents.get(ctx.assistantMessage.agent) yield* permission.ask({ permission: "doom_loop", - patterns: [value.toolName], + patterns: [value.name], sessionID: ctx.assistantMessage.sessionID, - metadata: { tool: value.toolName, input: value.input }, - always: [value.toolName], + metadata: { tool: value.name, input }, + always: [value.name], ruleset: agent.permission, }) return } case "tool-result": { - const toolCall = yield* readToolCall(value.toolCallId) - const toolAttachments: MessageV2.FilePart[] = ( - Array.isArray(value.output.attachments) ? value.output.attachments : [] - ).filter( - (attachment: unknown): attachment is MessageV2.FilePart => - isRecord(attachment) && - attachment.type === "file" && - typeof attachment.mime === "string" && - typeof attachment.url === "string", - ) - const normalized = yield* Effect.forEach(toolAttachments, (attachment) => + const toolCall = yield* readToolCall(value.id) + const rawOutput = toolResultOutput(value) + const normalized = yield* Effect.forEach(rawOutput.attachments ?? [], (attachment) => attachment.mime.startsWith("image/") ? image.normalize(attachment).pipe(Effect.exit) : Effect.succeed(Exit.succeed(attachment)), @@ -410,18 +494,18 @@ export const layer: Layer.Layer< const omitted = normalized.filter(Exit.isFailure).length const attachments = normalized.filter(Exit.isSuccess).map((item) => item.value) const output = { - ...value.output, + ...rawOutput, output: omitted === 0 - ? value.output.output - : `${value.output.output}\n\n[${omitted} image${omitted === 1 ? "" : "s"} omitted: could not be resized below the inline image size limit.]`, - attachments: attachments?.length ? attachments : undefined, + ? rawOutput.output + : `${rawOutput.output}\n\n[${omitted} image${omitted === 1 ? "" : "s"} omitted: could not be resized below the inline image size limit.]`, + attachments: attachments.length ? attachments : undefined, } // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { yield* sync.run(SessionEvent.Tool.Success.Sync, { sessionID: ctx.sessionID, - callID: value.toolCallId, + callID: value.id, structured: output.metadata, content: [ { @@ -429,32 +513,32 @@ export const layer: Layer.Layer< text: output.output, }, ...(output.attachments?.map((item: MessageV2.FilePart) => ({ - type: "file", + type: "file" as const, uri: item.url, mime: item.mime, name: item.filename, })) ?? []), ], provider: { - executed: toolCall?.part.metadata?.providerExecuted === true, + executed: value.providerExecuted === true || toolCall?.part.metadata?.providerExecuted === true, }, timestamp: DateTime.makeUnsafe(Date.now()), }) } - yield* completeToolCall(value.toolCallId, output) + yield* completeToolCall(value.id, output) return } case "tool-error": { - const toolCall = yield* readToolCall(value.toolCallId) + const toolCall = yield* readToolCall(value.id) // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { yield* sync.run(SessionEvent.Tool.Failed.Sync, { sessionID: ctx.sessionID, - callID: value.toolCallId, + callID: value.id, error: { type: "unknown", - message: errorMessage(value.error), + message: value.message, }, provider: { executed: toolCall?.part.metadata?.providerExecuted === true, @@ -462,14 +546,14 @@ export const layer: Layer.Layer< timestamp: DateTime.makeUnsafe(Date.now()), }) } - yield* failToolCall(value.toolCallId, value.error) + yield* failToolCall(value.id, new Error(value.message)) return } - case "error": - throw value.error + case "provider-error": + throw new Error(value.message) - case "start-step": + case "step-start": if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track() if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. @@ -496,19 +580,20 @@ export const layer: Layer.Layer< }) return - case "finish-step": { + case "step-finish": { const completedSnapshot = yield* snapshot.track() - const usage = Session.getUsage({ - model: ctx.model, - usage: value.usage, - metadata: value.providerMetadata, - }) + yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage ?? new Usage({}), + metadata: value.providerMetadata, + }) if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { yield* sync.run(SessionEvent.Step.Ended.Sync, { sessionID: ctx.sessionID, - finish: value.finishReason, + finish: value.reason, cost: usage.cost, tokens: usage.tokens, snapshot: completedSnapshot, @@ -516,12 +601,12 @@ export const layer: Layer.Layer< }) } } - ctx.assistantMessage.finish = value.finishReason + ctx.assistantMessage.finish = value.reason ctx.assistantMessage.cost += usage.cost ctx.assistantMessage.tokens = usage.tokens yield* session.updatePart({ id: PartID.ascending(), - reason: value.finishReason, + reason: value.reason, snapshot: completedSnapshot, messageID: ctx.assistantMessage.id, sessionID: ctx.assistantMessage.sessionID, @@ -584,7 +669,6 @@ export const layer: Layer.Layer< case "text-delta": if (!ctx.currentText) return ctx.currentText.text += value.text - if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata yield* session.updatePartDelta({ sessionID: ctx.currentText.sessionID, messageID: ctx.currentText.messageID, @@ -626,12 +710,9 @@ export const layer: Layer.Layer< ctx.currentText = undefined return - case "finish": + case "request-finish": return - default: - slog.info("unhandled", { event: value.type, value }) - return } }) @@ -733,6 +814,7 @@ export const layer: Layer.Layer< yield* Effect.gen(function* () { ctx.currentText = undefined ctx.reasoningMap = {} + yield* status.set(ctx.sessionID, { type: "busy" }) const stream = llm.stream(streamInput) yield* stream.pipe( @@ -816,12 +898,12 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(LLM.defaultLayer), Layer.provide(Permission.defaultLayer), Layer.provide(Plugin.defaultLayer), + Layer.provide(Image.defaultLayer), Layer.provide(SessionSummary.defaultLayer), Layer.provide(SessionStatus.defaultLayer), - Layer.provide(Image.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), Layer.provide(Bus.layer), Layer.provide(Config.defaultLayer), - Layer.provide(SyncEvent.defaultLayer), ), ) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 15246dac394d..6fd0c5049c3c 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -60,6 +60,7 @@ import * as DateTime from "effect/DateTime" import { eq } from "@/storage/db" import * as Database from "@/storage/db" import { SessionTable } from "./session.sql" +import { LLMEvent } from "@opencode-ai/llm" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -359,7 +360,7 @@ export const layer = Layer.effect( messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs], }) .pipe( - Stream.filter((e): e is Extract => e.type === "text-delta"), + Stream.filter(LLMEvent.is.textDelta), Stream.map((e) => e.text), Stream.mkString, Effect.orDie, diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index 92b4329e6f1d..616a9deff4b6 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -3,7 +3,7 @@ import path from "path" import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { Decimal } from "decimal.js" -import { type ProviderMetadata, type LanguageModelUsage } from "ai" +import type { ProviderMetadata, Usage } from "@opencode-ai/llm" import { Flag } from "@opencode-ai/core/flag/flag" import { InstallationVersion } from "@opencode-ai/core/installation/version" @@ -341,21 +341,19 @@ export function plan(input: { slug: string; time: { created: number } }, instanc return path.join(base, [input.time.created, input.slug].join("-") + ".md") } -export const getUsage = (input: { model: Provider.Model; usage: LanguageModelUsage; metadata?: ProviderMetadata }) => { +export const getUsage = (input: { model: Provider.Model; usage: Usage; metadata?: ProviderMetadata }) => { const safe = (value: number) => { if (!Number.isFinite(value)) return 0 return Math.max(0, value) } const inputTokens = safe(input.usage.inputTokens ?? 0) const outputTokens = safe(input.usage.outputTokens ?? 0) - const reasoningTokens = safe(input.usage.outputTokenDetails?.reasoningTokens ?? input.usage.reasoningTokens ?? 0) + const reasoningTokens = safe(input.usage.reasoningTokens ?? 0) - const cacheReadInputTokens = safe( - input.usage.inputTokenDetails?.cacheReadTokens ?? input.usage.cachedInputTokens ?? 0, - ) + const cacheReadInputTokens = safe(input.usage.cacheReadInputTokens ?? 0) const cacheWriteInputTokens = safe( Number( - input.usage.inputTokenDetails?.cacheWriteTokens ?? + input.usage.cacheWriteInputTokens ?? input.metadata?.["anthropic"]?.["cacheCreationInputTokens"] ?? // google-vertex-anthropic returns metadata under "vertex" key // (AnthropicMessagesLanguageModel custom provider key from 'vertex.anthropic.messages') diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts index c7f349d5cea0..fa77b22a689b 100644 --- a/packages/opencode/test/session/compaction.test.ts +++ b/packages/opencode/test/session/compaction.test.ts @@ -28,6 +28,7 @@ import { testEffect } from "../lib/effect" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { TestConfig } from "../fixture/config" import { SyncEvent } from "@/sync" +import { LLMEvent, Usage } from "@opencode-ai/llm" void Log.init({ print: false }) @@ -45,6 +46,10 @@ const ref = { modelID: ModelID.make("test-model"), } +const usage = (input: ConstructorParameters[0]) => new Usage(input) + +const basicUsage = () => usage({ inputTokens: 1, outputTokens: 1, totalTokens: 2 }) + afterEach(() => { mock.restore() }) @@ -289,11 +294,11 @@ function readCompactionPart(sessionID: SessionID) { function llm() { const queue: Array< - Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) + Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) > = [] return { - push(stream: Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream)) { + push(stream: Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream)) { queue.push(stream) }, layer: Layer.succeed( @@ -312,54 +317,22 @@ function llm() { function reply( text: string, capture?: (input: LLM.StreamInput) => void, -): (input: LLM.StreamInput) => Stream.Stream { +): (input: LLM.StreamInput) => Stream.Stream { return (input) => { capture?.(input) return Stream.make( - { type: "start" } satisfies LLM.Event, - { type: "text-start", id: "txt-0" } satisfies LLM.Event, - { type: "text-delta", id: "txt-0", delta: text, text } as LLM.Event, - { type: "text-end", id: "txt-0" } satisfies LLM.Event, - { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: { - inputTokens: 1, - outputTokens: 1, - totalTokens: 2, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, - } satisfies LLM.Event, - { - type: "finish", - finishReason: "stop", - rawFinishReason: "stop", - totalUsage: { - inputTokens: 1, - outputTokens: 1, - totalTokens: 2, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, - } satisfies LLM.Event, + LLMEvent.textStart({ id: "txt-0" }), + LLMEvent.textDelta({ id: "txt-0", text }), + LLMEvent.textEnd({ id: "txt-0" }), + LLMEvent.stepFinish({ + index: 0, + reason: "stop", + usage: basicUsage(), + }), + LLMEvent.requestFinish({ + reason: "stop", + usage: basicUsage(), + }), ) } } @@ -1198,7 +1171,7 @@ describe("session.compaction.process", () => { Stream.fromAsyncIterable( { async *[Symbol.asyncIterator]() { - yield { type: "start" } as LLM.Event + yield LLMEvent.stepStart({ index: 0 }) throw new APICallError({ message: "boom", url: "https://example.com/v1/chat/completions", @@ -1290,49 +1263,16 @@ describe("session.compaction.process", () => { const stub = llm() stub.push( Stream.make( - { type: "start" } satisfies LLM.Event, - { type: "tool-input-start", id: "call-1", toolName: "_noop" } satisfies LLM.Event, - { type: "tool-call", toolCallId: "call-1", toolName: "_noop", input: {} } satisfies LLM.Event, - { - type: "finish-step", - finishReason: "tool-calls", - rawFinishReason: "tool_calls", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: { - inputTokens: 1, - outputTokens: 1, - totalTokens: 2, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, - } satisfies LLM.Event, - { - type: "finish", - finishReason: "tool-calls", - rawFinishReason: "tool_calls", - totalUsage: { - inputTokens: 1, - outputTokens: 1, - totalTokens: 2, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, - } satisfies LLM.Event, + LLMEvent.toolCall({ id: "call-1", name: "_noop", input: {} }), + LLMEvent.stepFinish({ + index: 0, + reason: "tool-calls", + usage: basicUsage(), + }), + LLMEvent.requestFinish({ + reason: "tool-calls", + usage: basicUsage(), + }), ), ) return Effect.gen(function* () { @@ -1543,20 +1483,7 @@ describe("SessionNs.getUsage", () => { const model = createModel({ context: 100_000, output: 32_000 }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 1000, - outputTokens: 500, - totalTokens: 1500, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, + usage: usage({ inputTokens: 1000, outputTokens: 500, totalTokens: 1500 }), }) expect(result.tokens.input).toBe(1000) @@ -1570,20 +1497,7 @@ describe("SessionNs.getUsage", () => { const model = createModel({ context: 100_000, output: 32_000 }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 1000, - outputTokens: 500, - totalTokens: 1500, - inputTokenDetails: { - noCacheTokens: 800, - cacheReadTokens: 200, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, + usage: usage({ inputTokens: 1000, outputTokens: 500, totalTokens: 1500, cacheReadInputTokens: 200 }), }) expect(result.tokens.input).toBe(800) @@ -1594,20 +1508,7 @@ describe("SessionNs.getUsage", () => { const model = createModel({ context: 100_000, output: 32_000 }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 1000, - outputTokens: 500, - totalTokens: 1500, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, + usage: usage({ inputTokens: 1000, outputTokens: 500, totalTokens: 1500 }), metadata: { anthropic: { cacheCreationInputTokens: 300, @@ -1623,20 +1524,7 @@ describe("SessionNs.getUsage", () => { // AI SDK v6 normalizes inputTokens to include cached tokens for all providers const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 1000, - outputTokens: 500, - totalTokens: 1500, - inputTokenDetails: { - noCacheTokens: 800, - cacheReadTokens: 200, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, + usage: usage({ inputTokens: 1000, outputTokens: 500, totalTokens: 1500, cacheReadInputTokens: 200 }), metadata: { anthropic: {}, }, @@ -1650,20 +1538,7 @@ describe("SessionNs.getUsage", () => { const model = createModel({ context: 100_000, output: 32_000 }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 1000, - outputTokens: 500, - totalTokens: 1500, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: 400, - reasoningTokens: 100, - }, - }, + usage: usage({ inputTokens: 1000, outputTokens: 500, reasoningTokens: 100, totalTokens: 1500 }), }) expect(result.tokens.input).toBe(1000) @@ -1684,20 +1559,7 @@ describe("SessionNs.getUsage", () => { }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 0, - outputTokens: 1_000_000, - totalTokens: 1_000_000, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: 750_000, - reasoningTokens: 250_000, - }, - }, + usage: usage({ inputTokens: 0, outputTokens: 1_000_000, reasoningTokens: 250_000, totalTokens: 1_000_000 }), }) expect(result.tokens.output).toBe(750_000) @@ -1709,20 +1571,7 @@ describe("SessionNs.getUsage", () => { const model = createModel({ context: 100_000, output: 32_000 }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 0, - outputTokens: 0, - totalTokens: 0, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, + usage: usage({ inputTokens: 0, outputTokens: 0, totalTokens: 0 }), }) expect(result.tokens.input).toBe(0) @@ -1745,20 +1594,7 @@ describe("SessionNs.getUsage", () => { }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 1_000_000, - outputTokens: 100_000, - totalTokens: 1_100_000, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, + usage: usage({ inputTokens: 1_000_000, outputTokens: 100_000, totalTokens: 1_100_000 }), }) expect(result.cost).toBe(3 + 1.5) @@ -1769,24 +1605,16 @@ describe("SessionNs.getUsage", () => { (npm) => { const model = createModel({ context: 100_000, output: 32_000, npm }) // AI SDK v6: inputTokens includes cached tokens for all providers - const usage = { + const item = usage({ inputTokens: 1000, outputTokens: 500, totalTokens: 1500, - inputTokenDetails: { - noCacheTokens: 800, - cacheReadTokens: 200, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - } + cacheReadInputTokens: 200, + }) if (npm === "@ai-sdk/amazon-bedrock") { const result = SessionNs.getUsage({ model, - usage, + usage: item, metadata: { bedrock: { usage: { @@ -1807,7 +1635,7 @@ describe("SessionNs.getUsage", () => { const result = SessionNs.getUsage({ model, - usage, + usage: item, metadata: { anthropic: { cacheCreationInputTokens: 300, @@ -1828,20 +1656,7 @@ describe("SessionNs.getUsage", () => { const model = createModel({ context: 100_000, output: 32_000, npm: "@ai-sdk/google-vertex/anthropic" }) const result = SessionNs.getUsage({ model, - usage: { - inputTokens: 1000, - outputTokens: 500, - totalTokens: 1500, - inputTokenDetails: { - noCacheTokens: 800, - cacheReadTokens: 200, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - }, + usage: usage({ inputTokens: 1000, outputTokens: 500, totalTokens: 1500, cacheReadInputTokens: 200 }), metadata: { vertex: { cacheCreationInputTokens: 300, From 73196dc2fa93849aec38e6715f030704d92d0156 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 10 May 2026 12:47:50 -0400 Subject: [PATCH 2/3] fix(session): align OpenAI response stream fixtures --- packages/llm/src/schema/events.ts | 9 ++++++-- packages/opencode/test/session/llm.test.ts | 24 ++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/packages/llm/src/schema/events.ts b/packages/llm/src/schema/events.ts index 6e6bb1541bfd..b2109f3693b7 100644 --- a/packages/llm/src/schema/events.ts +++ b/packages/llm/src/schema/events.ts @@ -222,6 +222,9 @@ const llmEventTagged = Schema.Union([ ]).pipe(Schema.toTaggedUnion("type")) type WithID = Omit & { readonly id: ID | string } +type WithUsage = Omit & { + readonly usage?: Usage | ConstructorParameters[0] +} const responseID = (value: ResponseID | string) => ResponseID.make(value) const contentBlockID = (value: ContentBlockID | string) => ContentBlockID.make(value) @@ -252,8 +255,10 @@ export const LLMEvent = Object.assign(llmEventTagged, { toolCall: (input: WithID) => ToolCall.make({ ...input, id: toolCallID(input.id) }), toolResult: (input: WithID) => ToolResult.make({ ...input, id: toolCallID(input.id) }), toolError: (input: WithID) => ToolError.make({ ...input, id: toolCallID(input.id) }), - stepFinish: StepFinish.make, - requestFinish: RequestFinish.make, + stepFinish: (input: WithUsage) => + StepFinish.make({ ...input, usage: input.usage instanceof Usage ? input.usage : new Usage(input.usage ?? {}) }), + requestFinish: (input: WithUsage) => + RequestFinish.make({ ...input, usage: input.usage instanceof Usage ? input.usage : new Usage(input.usage ?? {}) }), providerError: ProviderErrorEvent.make, is: { requestStart: llmEventTagged.guards["request-start"], diff --git a/packages/opencode/test/session/llm.test.ts b/packages/opencode/test/session/llm.test.ts index 2879d0481249..f078e352acc1 100644 --- a/packages/opencode/test/session/llm.test.ts +++ b/packages/opencode/test/session/llm.test.ts @@ -581,6 +581,18 @@ describe("session.llm.stream", () => { service_tier: null, }, }, + { + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "item-1", status: "in_progress", role: "assistant", content: [] }, + }, + { + type: "response.content_part.added", + item_id: "item-1", + output_index: 0, + content_index: 0, + part: { type: "output_text", text: "", annotations: [] }, + }, { type: "response.output_text.delta", item_id: "item-1", @@ -694,6 +706,18 @@ describe("session.llm.stream", () => { service_tier: null, }, }, + { + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "item-data-url", status: "in_progress", role: "assistant", content: [] }, + }, + { + type: "response.content_part.added", + item_id: "item-data-url", + output_index: 0, + content_index: 0, + part: { type: "output_text", text: "", annotations: [] }, + }, { type: "response.output_text.delta", item_id: "item-data-url", From 48bce2b6871234bee3cf3d72eb5e69d7e4c38363 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 10 May 2026 21:50:04 -0400 Subject: [PATCH 3/3] refactor(session): simplify LLM event adapter --- packages/llm/src/schema/events.ts | 10 ++++- packages/opencode/src/session/llm-ai-sdk.ts | 47 ++++++++++++--------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/packages/llm/src/schema/events.ts b/packages/llm/src/schema/events.ts index b2109f3693b7..212a01f6a320 100644 --- a/packages/llm/src/schema/events.ts +++ b/packages/llm/src/schema/events.ts @@ -256,9 +256,15 @@ export const LLMEvent = Object.assign(llmEventTagged, { toolResult: (input: WithID) => ToolResult.make({ ...input, id: toolCallID(input.id) }), toolError: (input: WithID) => ToolError.make({ ...input, id: toolCallID(input.id) }), stepFinish: (input: WithUsage) => - StepFinish.make({ ...input, usage: input.usage instanceof Usage ? input.usage : new Usage(input.usage ?? {}) }), + StepFinish.make({ + ...input, + usage: input.usage === undefined ? undefined : input.usage instanceof Usage ? input.usage : new Usage(input.usage), + }), requestFinish: (input: WithUsage) => - RequestFinish.make({ ...input, usage: input.usage instanceof Usage ? input.usage : new Usage(input.usage ?? {}) }), + RequestFinish.make({ + ...input, + usage: input.usage === undefined ? undefined : input.usage instanceof Usage ? input.usage : new Usage(input.usage), + }), providerError: ProviderErrorEvent.make, is: { requestStart: llmEventTagged.guards["request-start"], diff --git a/packages/opencode/src/session/llm-ai-sdk.ts b/packages/opencode/src/session/llm-ai-sdk.ts index 9d5407f975f9..bba31861e162 100644 --- a/packages/opencode/src/session/llm-ai-sdk.ts +++ b/packages/opencode/src/session/llm-ai-sdk.ts @@ -1,4 +1,4 @@ -import { ContentBlockID, FinishReason, LLMEvent, ProviderMetadata, ToolCallID, ToolResultValue, Usage } from "@opencode-ai/llm" +import { FinishReason, LLMEvent, ProviderMetadata, ToolResultValue } from "@opencode-ai/llm" import { Effect, Schema } from "effect" import { type streamText } from "ai" import { errorMessage } from "@/util/error" @@ -11,15 +11,12 @@ export function adapterState() { step: 0, text: 0, reasoning: 0, - currentTextID: undefined as ContentBlockID | undefined, - currentReasoningID: undefined as ContentBlockID | undefined, + currentTextID: undefined as string | undefined, + currentReasoningID: undefined as string | undefined, toolNames: {} as Record, } } -const contentBlockID = (value: string) => ContentBlockID.make(value) -const toolCallID = (value: string) => ToolCallID.make(value) - function finishReason(value: string | undefined): FinishReason { return Schema.is(FinishReason)(value) ? value : "unknown" } @@ -28,7 +25,7 @@ function providerMetadata(value: unknown): ProviderMetadata | undefined { return Schema.is(ProviderMetadata)(value) ? value : undefined } -function usage(value: unknown): Usage | undefined { +function usage(value: unknown) { if (!value || typeof value !== "object") return undefined const item = value as { inputTokens?: number @@ -49,7 +46,17 @@ function usage(value: unknown): Usage | undefined { cacheWriteInputTokens: item.inputTokenDetails?.cacheWriteTokens, }).filter((entry) => entry[1] !== undefined), ) - return new Usage(result) + return result +} + +function currentTextID(state: ReturnType, id: string | undefined) { + state.currentTextID = id ?? state.currentTextID ?? `text-${state.text++}` + return state.currentTextID +} + +function currentReasoningID(state: ReturnType, id: string | undefined) { + state.currentReasoningID = id ?? state.currentReasoningID ?? `reasoning-${state.reasoning++}` + return state.currentReasoningID } export function toLLMEvents( @@ -86,7 +93,7 @@ export function toLLMEvents( case "text-start": return Effect.sync(() => { - state.currentTextID = contentBlockID(event.id ?? `text-${state.text++}`) + state.currentTextID = currentTextID(state, event.id) return [ LLMEvent.textStart({ id: state.currentTextID, @@ -98,7 +105,7 @@ export function toLLMEvents( case "text-delta": return Effect.succeed([ LLMEvent.textDelta({ - id: event.id ? contentBlockID(event.id) : (state.currentTextID ?? contentBlockID(`text-${state.text++}`)), + id: currentTextID(state, event.id), text: event.text, }), ]) @@ -106,14 +113,14 @@ export function toLLMEvents( case "text-end": return Effect.succeed([ LLMEvent.textEnd({ - id: event.id ? contentBlockID(event.id) : (state.currentTextID ?? contentBlockID(`text-${state.text++}`)), + id: currentTextID(state, event.id), providerMetadata: providerMetadata(event.providerMetadata), }), ]) case "reasoning-start": return Effect.sync(() => { - state.currentReasoningID = contentBlockID(event.id) + state.currentReasoningID = currentReasoningID(state, event.id) return [ LLMEvent.reasoningStart({ id: state.currentReasoningID, @@ -125,14 +132,14 @@ export function toLLMEvents( case "reasoning-delta": return Effect.succeed([ LLMEvent.reasoningDelta({ - id: event.id ? contentBlockID(event.id) : (state.currentReasoningID ?? contentBlockID(`reasoning-${state.reasoning++}`)), + id: currentReasoningID(state, event.id), text: event.text, }), ]) case "reasoning-end": return Effect.sync(() => { - const id = contentBlockID(event.id) + const id = currentReasoningID(state, event.id) state.currentReasoningID = undefined return [ LLMEvent.reasoningEnd({ @@ -147,7 +154,7 @@ export function toLLMEvents( state.toolNames[event.id] = event.toolName return [ LLMEvent.toolInputStart({ - id: toolCallID(event.id), + id: event.id, name: event.toolName, providerMetadata: providerMetadata(event.providerMetadata), }), @@ -157,7 +164,7 @@ export function toLLMEvents( case "tool-input-delta": return Effect.succeed([ LLMEvent.toolInputDelta({ - id: toolCallID(event.id), + id: event.id, name: state.toolNames[event.id] ?? "unknown", text: event.delta ?? "", }), @@ -166,7 +173,7 @@ export function toLLMEvents( case "tool-input-end": return Effect.succeed([ LLMEvent.toolInputEnd({ - id: toolCallID(event.id), + id: event.id, name: state.toolNames[event.id] ?? "unknown", }), ]) @@ -176,7 +183,7 @@ export function toLLMEvents( state.toolNames[event.toolCallId] = event.toolName return [ LLMEvent.toolCall({ - id: toolCallID(event.toolCallId), + id: event.toolCallId, name: event.toolName, input: event.input, providerExecuted: "providerExecuted" in event ? event.providerExecuted : undefined, @@ -191,7 +198,7 @@ export function toLLMEvents( delete state.toolNames[event.toolCallId] return [ LLMEvent.toolResult({ - id: toolCallID(event.toolCallId), + id: event.toolCallId, name, result: ToolResultValue.make(event.output), providerExecuted: "providerExecuted" in event ? event.providerExecuted : undefined, @@ -205,7 +212,7 @@ export function toLLMEvents( delete state.toolNames[event.toolCallId] return [ LLMEvent.toolError({ - id: toolCallID(event.toolCallId), + id: event.toolCallId, name, message: errorMessage(event.error), }),