From 7599f7eb1dc219d9f03000de2a9222be33556b7b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 14 Apr 2026 19:30:26 -0400 Subject: [PATCH 1/4] fix(effect): add effect bridge for callback contexts --- packages/opencode/src/bus/index.ts | 5 +- packages/opencode/src/command/index.ts | 6 +-- .../src/control-plane/workspace-context.ts | 4 ++ packages/opencode/src/effect/bridge.ts | 49 +++++++++++++++++++ packages/opencode/src/effect/run-service.ts | 23 +++++++-- packages/opencode/src/mcp/index.ts | 9 ++-- packages/opencode/src/plugin/index.ts | 23 ++++----- packages/opencode/src/provider/provider.ts | 6 +-- packages/opencode/src/pty/index.ts | 7 +-- packages/opencode/src/session/prompt.ts | 7 +-- .../test/effect/app-runtime-logger.test.ts | 31 ++++++++++++ 11 files changed, 133 insertions(+), 37 deletions(-) create mode 100644 packages/opencode/src/effect/bridge.ts diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 0638777bd4dc..3a1eea5c73e0 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,6 +1,6 @@ import z from "zod" import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectBridge } from "@/effect/bridge" import { Log } from "../util/log" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" @@ -128,6 +128,7 @@ export namespace Bus { function on(pubsub: PubSub.PubSub, type: string, callback: (event: T) => unknown) { return Effect.gen(function* () { log.info("subscribing", { type }) + const bridge = yield* EffectBridge.make() const scope = yield* Scope.make() const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub)) @@ -147,7 +148,7 @@ export namespace Bus { return () => { log.info("unsubscribing", { type }) - Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer))) + bridge.fork(Scope.close(scope, Exit.void)) } }) } diff --git a/packages/opencode/src/command/index.ts b/packages/opencode/src/command/index.ts index 42f53301b231..2e6dbac528ee 100644 --- a/packages/opencode/src/command/index.ts +++ b/packages/opencode/src/command/index.ts @@ -1,9 +1,9 @@ import { BusEvent } from "@/bus/bus-event" import { InstanceState } from "@/effect/instance-state" +import { EffectBridge } from "@/effect/bridge" import type { InstanceContext } from "@/project/instance" import { SessionID, MessageID } from "@/session/schema" import { Effect, Layer, Context } from "effect" -import { EffectLogger } from "@/effect/logger" import z from "zod" import { Config } from "../config/config" import { MCP } from "../mcp" @@ -79,6 +79,7 @@ export namespace Command { const config = yield* Config.Service const mcp = yield* MCP.Service const skill = yield* Skill.Service + const bridge = yield* EffectBridge.make() const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) { const cfg = yield* config.get() @@ -125,7 +126,7 @@ export namespace Command { source: "mcp", description: prompt.description, get template() { - return Effect.runPromise( + return bridge.promise( mcp .getPrompt( prompt.client, @@ -141,7 +142,6 @@ export namespace Command { .map((message) => (message.content.type === "text" ? message.content.text : "")) .join("\n") || "", ), - Effect.provide(EffectLogger.layer), ), ) }, diff --git a/packages/opencode/src/control-plane/workspace-context.ts b/packages/opencode/src/control-plane/workspace-context.ts index 173ec6178a53..541657b88cce 100644 --- a/packages/opencode/src/control-plane/workspace-context.ts +++ b/packages/opencode/src/control-plane/workspace-context.ts @@ -12,6 +12,10 @@ export const WorkspaceContext = { return context.provide({ workspaceID: input.workspaceID as string }, () => input.fn()) }, + restore(workspaceID: string, fn: () => R): R { + return context.provide({ workspaceID }, fn) + }, + get workspaceID() { try { return context.use().workspaceID diff --git a/packages/opencode/src/effect/bridge.ts b/packages/opencode/src/effect/bridge.ts new file mode 100644 index 000000000000..bafa5a0ea6b0 --- /dev/null +++ b/packages/opencode/src/effect/bridge.ts @@ -0,0 +1,49 @@ +import { Effect, Fiber } from "effect" +import { WorkspaceContext } from "@/control-plane/workspace-context" +import { Instance, type InstanceContext } from "@/project/instance" +import { LocalContext } from "@/util/local-context" +import { InstanceRef, WorkspaceRef } from "./instance-ref" +import { attachWith } from "./run-service" + +export namespace EffectBridge { + export interface Shape { + readonly promise: (effect: Effect.Effect) => Promise + readonly fork: (effect: Effect.Effect) => Fiber.Fiber + } + + function restore(instance: InstanceContext | undefined, workspace: string | undefined, fn: () => R): R { + if (instance && workspace !== undefined) { + return WorkspaceContext.restore(workspace, () => Instance.restore(instance, fn)) + } + if (instance) return Instance.restore(instance, fn) + if (workspace !== undefined) return WorkspaceContext.restore(workspace, fn) + return fn() + } + + export function make(): Effect.Effect { + return Effect.gen(function* () { + const ctx = yield* Effect.context() + const value = yield* InstanceRef + const instance = + value ?? + (() => { + try { + return Instance.current + } catch (err) { + if (!(err instanceof LocalContext.NotFound)) throw err + } + })() + const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID + const attach = (effect: Effect.Effect) => attachWith(effect, { instance, workspace }) + const wrap = (effect: Effect.Effect) => + attach(effect).pipe(Effect.provide(ctx)) as Effect.Effect + + return { + promise: (effect: Effect.Effect) => + restore(instance, workspace, () => Effect.runPromise(wrap(effect))), + fork: (effect: Effect.Effect) => + restore(instance, workspace, () => Effect.runFork(wrap(effect))), + } satisfies Shape + }) + } +} diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index bb4307b57ce4..13104c88b341 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -5,14 +5,31 @@ import { LocalContext } from "@/util/local-context" import { InstanceRef, WorkspaceRef } from "./instance-ref" import { Observability } from "./observability" import { WorkspaceContext } from "@/control-plane/workspace-context" +import type { InstanceContext } from "@/project/instance" export const memoMap = Layer.makeMemoMapUnsafe() +type Refs = { + instance?: InstanceContext + workspace?: string +} + +export function attachWith(effect: Effect.Effect, refs: Refs): Effect.Effect { + if (!refs.instance && !refs.workspace) return effect + if (!refs.instance) return effect.pipe(Effect.provideService(WorkspaceRef, refs.workspace)) + if (!refs.workspace) return effect.pipe(Effect.provideService(InstanceRef, refs.instance)) + return effect.pipe( + Effect.provideService(InstanceRef, refs.instance), + Effect.provideService(WorkspaceRef, refs.workspace), + ) +} + export function attach(effect: Effect.Effect): Effect.Effect { try { - const ctx = Instance.current - const workspaceID = WorkspaceContext.workspaceID - return effect.pipe(Effect.provideService(InstanceRef, ctx), Effect.provideService(WorkspaceRef, workspaceID)) + return attachWith(effect, { + instance: Instance.current, + workspace: WorkspaceContext.workspaceID, + }) } catch (err) { if (!(err instanceof LocalContext.NotFound)) throw err } diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 3b6690934062..8e050e4ba420 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -25,7 +25,7 @@ import { Bus } from "@/bus" import { TuiEvent } from "@/cli/cmd/tui/event" import open from "open" import { Effect, Exit, Layer, Option, Context, Stream } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectBridge } from "@/effect/bridge" import { InstanceState } from "@/effect/instance-state" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" @@ -444,6 +444,7 @@ export namespace MCP { return { mcpClient, status, defs: listed } satisfies CreateResult }) const cfgSvc = yield* Config.Service + const bridge = yield* EffectBridge.make() const descendants = Effect.fnUntraced( function* (pid: number) { @@ -476,14 +477,12 @@ export namespace MCP { log.info("tools list changed notification received", { server: name }) if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer))) + const listed = await bridge.promise(defs(name, client, timeout)) if (!listed) return if (s.clients[name] !== client || s.status[name]?.status !== "connected") return s.defs[name] = listed - await Effect.runPromise( - bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)), - ) + await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) }) } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index c716ffdf8dc4..5296913d5407 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -18,7 +18,7 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth" import { PoeAuthPlugin } from "opencode-poe-auth" import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare" import { Effect, Layer, Context, Stream } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectBridge } from "@/effect/bridge" import { InstanceState } from "@/effect/instance-state" import { errorMessage } from "@/util/error" import { PluginLoader } from "./loader" @@ -90,14 +90,6 @@ export namespace Plugin { return result } - function publishPluginError(bus: Bus.Interface, message: string) { - Effect.runFork( - bus - .publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }) - .pipe(Effect.provide(EffectLogger.layer)), - ) - } - async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) { const plugin = readV1Plugin(load.mod, load.spec, "server", "detect") if (plugin) { @@ -116,6 +108,11 @@ export namespace Plugin { Effect.gen(function* () { const bus = yield* Bus.Service const config = yield* Config.Service + const bridge = yield* EffectBridge.make() + + function publishPluginError(message: string) { + bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })) + } const state = yield* InstanceState.make( Effect.fn("Plugin.state")(function* (ctx) { @@ -187,24 +184,24 @@ export namespace Plugin { if (stage === "install") { const parsed = parsePluginSpecifier(spec) log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message }) - publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`) + publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`) return } if (stage === "compatibility") { log.warn("plugin incompatible", { path: spec, error: message }) - publishPluginError(bus, `Plugin ${spec} skipped: ${message}`) + publishPluginError(`Plugin ${spec} skipped: ${message}`) return } if (stage === "entry") { log.error("failed to resolve plugin server entry", { path: spec, error: message }) - publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`) + publishPluginError(`Failed to load plugin ${spec}: ${message}`) return } log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message }) - publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`) + publishPluginError(`Failed to load plugin ${spec}: ${message}`) }, }, }), diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index d34721f1d82d..a4a98c55bc37 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -19,7 +19,7 @@ import { iife } from "@/util/iife" import { Global } from "../global" import path from "path" import { Effect, Layer, Context } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectBridge } from "@/effect/bridge" import { InstanceState } from "@/effect/instance-state" import { AppFileSystem } from "@opencode-ai/shared/filesystem" import { isRecord } from "@/util/record" @@ -1039,6 +1039,7 @@ export namespace Provider { const auth = yield* Auth.Service const env = yield* Env.Service const plugin = yield* Plugin.Service + const bridge = yield* EffectBridge.make() const state = yield* InstanceState.make(() => Effect.gen(function* () { @@ -1223,8 +1224,7 @@ export namespace Provider { const options = yield* Effect.promise(() => plugin.auth!.loader!( - () => - Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any, + () => bridge.promise(auth.get(providerID).pipe(Effect.orDie)) as any, database[plugin.auth!.provider], ), ) diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index 9c79eb2d4c78..e912615b358e 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -10,7 +10,7 @@ import { Shell } from "@/shell/shell" import { Plugin } from "@/plugin" import { PtyID } from "./schema" import { Effect, Layer, Context } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectBridge } from "@/effect/bridge" export namespace Pty { const log = Log.create({ service: "pty" }) @@ -119,6 +119,7 @@ export namespace Pty { Effect.gen(function* () { const bus = yield* Bus.Service const plugin = yield* Plugin.Service + const bridge = yield* EffectBridge.make() function teardown(session: Active) { try { session.process.kill() @@ -256,8 +257,8 @@ export namespace Pty { if (session.info.status === "exited") return log.info("session exited", { id, exitCode }) session.info.status = "exited" - Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer))) - Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer))) + bridge.fork(bus.publish(Event.Exited, { id, exitCode })) + bridge.fork(remove(id)) }), ) yield* bus.publish(Event.Created, { info }) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index f8c794505e03..ffd074d3f8b4 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -48,6 +48,7 @@ import { EffectLogger } from "@/effect/logger" import { InstanceState } from "@/effect/instance-state" import { TaskTool, type TaskPromptOps } from "@/tool/task" import { SessionRunState } from "./run-state" +import { EffectBridge } from "@/effect/bridge" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -105,11 +106,7 @@ export namespace SessionPrompt { const sys = yield* SystemPrompt.Service const llm = yield* LLM.Service const runner = Effect.fn("SessionPrompt.runner")(function* () { - const ctx = yield* Effect.context() - return { - promise: (effect: Effect.Effect) => Effect.runPromiseWith(ctx)(effect), - fork: (effect: Effect.Effect) => Effect.runForkWith(ctx)(effect), - } + return yield* EffectBridge.make() }) const ops = Effect.fn("SessionPrompt.ops")(function* () { const run = yield* runner() diff --git a/packages/opencode/test/effect/app-runtime-logger.test.ts b/packages/opencode/test/effect/app-runtime-logger.test.ts index 8a7aab6cf8b1..7388748f929a 100644 --- a/packages/opencode/test/effect/app-runtime-logger.test.ts +++ b/packages/opencode/test/effect/app-runtime-logger.test.ts @@ -1,6 +1,7 @@ import { expect, test } from "bun:test" import { Context, Effect, Layer, Logger } from "effect" import { AppRuntime } from "../../src/effect/app-runtime" +import { EffectBridge } from "../../src/effect/bridge" import { InstanceRef } from "../../src/effect/instance-ref" import { EffectLogger } from "../../src/effect/logger" import { makeRuntime } from "../../src/effect/run-service" @@ -59,3 +60,33 @@ test("AppRuntime attaches InstanceRef from ALS", async () => { expect(dir).toBe(tmp.path) }) + +test("EffectBridge preserves logger and instance context across async boundaries", async () => { + await using tmp = await tmpdir({ git: true }) + + const result = await Instance.provide({ + directory: tmp.path, + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const bridge = yield* EffectBridge.make() + return yield* Effect.promise(() => + Promise.resolve().then(() => + bridge.promise( + Effect.gen(function* () { + return { + directory: (yield* InstanceRef)?.directory, + ...check(yield* Effect.service(Logger.CurrentLoggers)), + } + }), + ), + ), + ) + }), + ), + }) + + expect(result.directory).toBe(tmp.path) + expect(result.effectLogger).toBe(true) + expect(result.defaultLogger).toBe(false) +}) From 6e7b99e7f62eb3ef7dfe7f24434444a263b160e7 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 14 Apr 2026 19:46:13 -0400 Subject: [PATCH 2/4] refactor(effect): scope bridges to callback setup --- packages/opencode/src/command/index.ts | 2 +- packages/opencode/src/mcp/index.ts | 9 +++++---- packages/opencode/src/plugin/index.ts | 10 +++++----- packages/opencode/src/provider/provider.ts | 2 +- packages/opencode/src/pty/index.ts | 2 +- 5 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/opencode/src/command/index.ts b/packages/opencode/src/command/index.ts index 2e6dbac528ee..91a9e1b405bb 100644 --- a/packages/opencode/src/command/index.ts +++ b/packages/opencode/src/command/index.ts @@ -79,10 +79,10 @@ export namespace Command { const config = yield* Config.Service const mcp = yield* MCP.Service const skill = yield* Skill.Service - const bridge = yield* EffectBridge.make() const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) { const cfg = yield* config.get() + const bridge = yield* EffectBridge.make() const commands: Record = {} commands[Default.INIT] = { diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 8e050e4ba420..a68c6c1d8da1 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -444,7 +444,6 @@ export namespace MCP { return { mcpClient, status, defs: listed } satisfies CreateResult }) const cfgSvc = yield* Config.Service - const bridge = yield* EffectBridge.make() const descendants = Effect.fnUntraced( function* (pid: number) { @@ -472,7 +471,7 @@ export namespace MCP { Effect.catch(() => Effect.succeed([] as number[])), ) - function watch(s: State, name: string, client: MCPClient, timeout?: number) { + function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) { client.setNotificationHandler(ToolListChangedNotificationSchema, async () => { log.info("tools list changed notification received", { server: name }) if (s.clients[name] !== client || s.status[name]?.status !== "connected") return @@ -489,6 +488,7 @@ export namespace MCP { const state = yield* InstanceState.make( Effect.fn("MCP.state")(function* () { const cfg = yield* cfgSvc.get() + const bridge = yield* EffectBridge.make() const config = cfg.mcp ?? {} const s: State = { status: {}, @@ -517,7 +517,7 @@ export namespace MCP { if (result.mcpClient) { s.clients[key] = result.mcpClient s.defs[key] = result.defs! - watch(s, key, result.mcpClient, mcp.timeout) + watch(s, key, result.mcpClient, bridge, mcp.timeout) } }), { concurrency: "unbounded" }, @@ -564,11 +564,12 @@ export namespace MCP { listed: MCPToolDef[], timeout?: number, ) { + const bridge = yield* EffectBridge.make() yield* closeClient(s, name) s.status[name] = { status: "connected" } s.clients[name] = client s.defs[name] = listed - watch(s, name, client, timeout) + watch(s, name, client, bridge, timeout) return s.status[name] }) diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 5296913d5407..9f618eff8cad 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -108,15 +108,15 @@ export namespace Plugin { Effect.gen(function* () { const bus = yield* Bus.Service const config = yield* Config.Service - const bridge = yield* EffectBridge.make() - - function publishPluginError(message: string) { - bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })) - } const state = yield* InstanceState.make( Effect.fn("Plugin.state")(function* (ctx) { const hooks: Hooks[] = [] + const bridge = yield* EffectBridge.make() + + function publishPluginError(message: string) { + bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })) + } const { Server } = yield* Effect.promise(() => import("../server/server")) diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index a4a98c55bc37..8833cfd05fd7 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -1039,11 +1039,11 @@ export namespace Provider { const auth = yield* Auth.Service const env = yield* Env.Service const plugin = yield* Plugin.Service - const bridge = yield* EffectBridge.make() const state = yield* InstanceState.make(() => Effect.gen(function* () { using _ = log.time("state") + const bridge = yield* EffectBridge.make() const cfg = yield* config.get() const modelsDev = yield* Effect.promise(() => ModelsDev.get()) const database = mapValues(modelsDev, fromModelsDevProvider) diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index e912615b358e..1c969b4b93f4 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -119,7 +119,6 @@ export namespace Pty { Effect.gen(function* () { const bus = yield* Bus.Service const plugin = yield* Plugin.Service - const bridge = yield* EffectBridge.make() function teardown(session: Active) { try { session.process.kill() @@ -174,6 +173,7 @@ export namespace Pty { const create = Effect.fn("Pty.create")(function* (input: CreateInput) { const s = yield* InstanceState.get(state) + const bridge = yield* EffectBridge.make() const id = PtyID.ascending() const command = input.command || Shell.preferred() const args = input.args || [] From efe38507643f8504101fe61ac5a38f7d4c38b65a Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 14 Apr 2026 19:51:31 -0400 Subject: [PATCH 3/4] fix(server): install effect types for typecheck --- bun.lock | 1 + packages/server/package.json | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bun.lock b/bun.lock index 01966b826a03..fe5d42d7cc38 100644 --- a/bun.lock +++ b/bun.lock @@ -510,6 +510,7 @@ "effect": "catalog:", }, "devDependencies": { + "@typescript/native-preview": "catalog:", "typescript": "catalog:", }, }, diff --git a/packages/server/package.json b/packages/server/package.json index c397c40d9066..9b8b31299d6d 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -17,10 +17,11 @@ "dist" ], "scripts": { - "typecheck": "tsc --noEmit", + "typecheck": "tsgo --noEmit", "build": "tsc" }, "devDependencies": { + "@typescript/native-preview": "catalog:", "typescript": "catalog:" }, "dependencies": { From f39d37d2150346da48d8205c5af286c9606363c4 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 15 Apr 2026 11:08:35 -0400 Subject: [PATCH 4/4] fix(effect): bridge workflow approval callback context --- packages/opencode/src/session/llm.ts | 679 ++++++++++++++------------- 1 file changed, 340 insertions(+), 339 deletions(-) diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index 5a4c041196db..05d788275789 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -20,13 +20,12 @@ import { Wildcard } from "@/util/wildcard" import { SessionID } from "@/session/schema" import { Auth } from "@/auth" import { Installation } from "@/installation" -import { makeRuntime } from "@/effect/run-service" +import { EffectBridge } from "@/effect/bridge" import * as Option from "effect/Option" import * as OtelTracer from "@effect/opentelemetry/Tracer" export namespace LLM { const log = Log.create({ service: "llm" }) - const perms = makeRuntime(Permission.Service, Permission.defaultLayer) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX type Result = Awaited> @@ -57,369 +56,371 @@ export namespace LLM { export class Service extends Context.Service()("@opencode/LLM") {} - export const layer: Layer.Layer = - Layer.effect( - Service, - Effect.gen(function* () { - const auth = yield* Auth.Service - const config = yield* Config.Service - const provider = yield* Provider.Service - const plugin = yield* Plugin.Service - - const run = Effect.fn("LLM.run")(function* (input: StreamRequest) { - const l = log - .clone() - .tag("providerID", input.model.providerID) - .tag("modelID", input.model.id) - .tag("sessionID", input.sessionID) - .tag("small", (input.small ?? false).toString()) - .tag("agent", input.agent.name) - .tag("mode", input.agent.mode) - l.info("stream", { - modelID: input.model.id, - providerID: input.model.providerID, - }) - - const [language, cfg, item, info] = yield* Effect.all( - [ - provider.getLanguage(input.model), - config.get(), - provider.getProvider(input.model.providerID), - auth.get(input.model.providerID), - ], - { concurrency: "unbounded" }, - ) - - // TODO: move this to a proper hook - const isOpenaiOauth = item.id === "openai" && info?.type === "oauth" - - const system: string[] = [] - system.push( - [ - // use agent prompt otherwise provider prompt - ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)), - // any custom prompt passed into this call - ...input.system, - // any custom prompt from last user message - ...(input.user.system ? [input.user.system] : []), - ] - .filter((x) => x) - .join("\n"), - ) - - const header = system[0] - yield* plugin.trigger( - "experimental.chat.system.transform", - { sessionID: input.sessionID, model: input.model }, - { system }, - ) - // rejoin to maintain 2-part structure for caching if header unchanged - if (system.length > 2 && system[0] === header) { - const rest = system.slice(1) - system.length = 0 - system.push(header, rest.join("\n")) - } - - const variant = - !input.small && input.model.variants && input.user.model.variant - ? input.model.variants[input.user.model.variant] - : {} - const base = input.small - ? ProviderTransform.smallOptions(input.model) - : ProviderTransform.options({ - model: input.model, - sessionID: input.sessionID, - providerOptions: item.options, - }) - const options: Record = pipe( - base, - mergeDeep(input.model.options), - mergeDeep(input.agent.options), - mergeDeep(variant), - ) - if (isOpenaiOauth) { - options.instructions = system.join("\n") - } + const live: Layer.Layer< + Service, + never, + Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service + > = Layer.effect( + Service, + Effect.gen(function* () { + const auth = yield* Auth.Service + const config = yield* Config.Service + const provider = yield* Provider.Service + const plugin = yield* Plugin.Service + const perm = yield* Permission.Service + + const run = Effect.fn("LLM.run")(function* (input: StreamRequest) { + const l = log + .clone() + .tag("providerID", input.model.providerID) + .tag("modelID", input.model.id) + .tag("sessionID", input.sessionID) + .tag("small", (input.small ?? false).toString()) + .tag("agent", input.agent.name) + .tag("mode", input.agent.mode) + l.info("stream", { + modelID: input.model.id, + providerID: input.model.providerID, + }) - const isWorkflow = language instanceof GitLabWorkflowLanguageModel - const messages = isOpenaiOauth - ? input.messages - : isWorkflow - ? input.messages - : [ - ...system.map( - (x): ModelMessage => ({ - role: "system", - content: x, - }), - ), - ...input.messages, - ] - - const params = yield* plugin.trigger( - "chat.params", - { - sessionID: input.sessionID, - agent: input.agent.name, + const [language, cfg, item, info] = yield* Effect.all( + [ + provider.getLanguage(input.model), + config.get(), + provider.getProvider(input.model.providerID), + auth.get(input.model.providerID), + ], + { concurrency: "unbounded" }, + ) + + // TODO: move this to a proper hook + const isOpenaiOauth = item.id === "openai" && info?.type === "oauth" + + const system: string[] = [] + system.push( + [ + // use agent prompt otherwise provider prompt + ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)), + // any custom prompt passed into this call + ...input.system, + // any custom prompt from last user message + ...(input.user.system ? [input.user.system] : []), + ] + .filter((x) => x) + .join("\n"), + ) + + const header = system[0] + yield* plugin.trigger( + "experimental.chat.system.transform", + { sessionID: input.sessionID, model: input.model }, + { system }, + ) + // rejoin to maintain 2-part structure for caching if header unchanged + if (system.length > 2 && system[0] === header) { + const rest = system.slice(1) + system.length = 0 + system.push(header, rest.join("\n")) + } + + const variant = + !input.small && input.model.variants && input.user.model.variant + ? input.model.variants[input.user.model.variant] + : {} + const base = input.small + ? ProviderTransform.smallOptions(input.model) + : ProviderTransform.options({ model: input.model, - provider: item, - message: input.user, - }, - { - temperature: input.model.capabilities.temperature - ? (input.agent.temperature ?? ProviderTransform.temperature(input.model)) - : undefined, - topP: input.agent.topP ?? ProviderTransform.topP(input.model), - topK: ProviderTransform.topK(input.model), - maxOutputTokens: ProviderTransform.maxOutputTokens(input.model), - options, - }, - ) - - const { headers } = yield* plugin.trigger( - "chat.headers", - { sessionID: input.sessionID, - agent: input.agent.name, - model: input.model, - provider: item, - message: input.user, - }, - { - headers: {}, - }, - ) - - const tools = resolveTools(input) - - // LiteLLM and some Anthropic proxies require the tools parameter to be present - // when message history contains tool calls, even if no tools are being used. - // Add a dummy tool that is never called to satisfy this validation. - // This is enabled for: - // 1. Providers with "litellm" in their ID or API ID (auto-detected) - // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways) - const isLiteLLMProxy = - item.options?.["litellmProxy"] === true || - input.model.providerID.toLowerCase().includes("litellm") || - input.model.api.id.toLowerCase().includes("litellm") - - // LiteLLM/Bedrock rejects requests where the message history contains tool - // calls but no tools param is present. When there are no active tools (e.g. - // during compaction), inject a stub tool to satisfy the validation requirement. - // The stub description explicitly tells the model not to call it. - if ( - (isLiteLLMProxy || input.model.providerID.includes("github-copilot")) && - Object.keys(tools).length === 0 && - hasToolCalls(input.messages) - ) { - tools["_noop"] = tool({ - description: "Do not call this tool. It exists only for API compatibility and must never be invoked.", - inputSchema: jsonSchema({ - type: "object", - properties: { - reason: { type: "string", description: "Unused" }, - }, - }), - execute: async () => ({ output: "", title: "", metadata: {} }), + providerOptions: item.options, }) + const options: Record = pipe( + base, + mergeDeep(input.model.options), + mergeDeep(input.agent.options), + mergeDeep(variant), + ) + if (isOpenaiOauth) { + options.instructions = system.join("\n") + } + + const isWorkflow = language instanceof GitLabWorkflowLanguageModel + const messages = isOpenaiOauth + ? input.messages + : isWorkflow + ? input.messages + : [ + ...system.map( + (x): ModelMessage => ({ + role: "system", + content: x, + }), + ), + ...input.messages, + ] + + const params = yield* plugin.trigger( + "chat.params", + { + sessionID: input.sessionID, + agent: input.agent.name, + model: input.model, + provider: item, + message: input.user, + }, + { + temperature: input.model.capabilities.temperature + ? (input.agent.temperature ?? ProviderTransform.temperature(input.model)) + : undefined, + topP: input.agent.topP ?? ProviderTransform.topP(input.model), + topK: ProviderTransform.topK(input.model), + maxOutputTokens: ProviderTransform.maxOutputTokens(input.model), + options, + }, + ) + + const { headers } = yield* plugin.trigger( + "chat.headers", + { + sessionID: input.sessionID, + agent: input.agent.name, + model: input.model, + provider: item, + message: input.user, + }, + { + headers: {}, + }, + ) + + const tools = resolveTools(input) + + // LiteLLM and some Anthropic proxies require the tools parameter to be present + // when message history contains tool calls, even if no tools are being used. + // Add a dummy tool that is never called to satisfy this validation. + // This is enabled for: + // 1. Providers with "litellm" in their ID or API ID (auto-detected) + // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways) + const isLiteLLMProxy = + item.options?.["litellmProxy"] === true || + input.model.providerID.toLowerCase().includes("litellm") || + input.model.api.id.toLowerCase().includes("litellm") + + // LiteLLM/Bedrock rejects requests where the message history contains tool + // calls but no tools param is present. When there are no active tools (e.g. + // during compaction), inject a stub tool to satisfy the validation requirement. + // The stub description explicitly tells the model not to call it. + if ( + (isLiteLLMProxy || input.model.providerID.includes("github-copilot")) && + Object.keys(tools).length === 0 && + hasToolCalls(input.messages) + ) { + tools["_noop"] = tool({ + description: "Do not call this tool. It exists only for API compatibility and must never be invoked.", + inputSchema: jsonSchema({ + type: "object", + properties: { + reason: { type: "string", description: "Unused" }, + }, + }), + execute: async () => ({ output: "", title: "", metadata: {} }), + }) + } + + // Wire up toolExecutor for DWS workflow models so that tool calls + // from the workflow service are executed via opencode's tool system + // and results sent back over the WebSocket. + if (language instanceof GitLabWorkflowLanguageModel) { + const workflowModel = language as GitLabWorkflowLanguageModel & { + sessionID?: string + sessionPreapprovedTools?: string[] + approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }> } - - // Wire up toolExecutor for DWS workflow models so that tool calls - // from the workflow service are executed via opencode's tool system - // and results sent back over the WebSocket. - if (language instanceof GitLabWorkflowLanguageModel) { - const workflowModel = language as GitLabWorkflowLanguageModel & { - sessionID?: string - sessionPreapprovedTools?: string[] - approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }> + workflowModel.sessionID = input.sessionID + workflowModel.systemPrompt = system.join("\n") + workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => { + const t = tools[toolName] + if (!t || !t.execute) { + return { result: "", error: `Unknown tool: ${toolName}` } } - workflowModel.sessionID = input.sessionID - workflowModel.systemPrompt = system.join("\n") - workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => { - const t = tools[toolName] - if (!t || !t.execute) { - return { result: "", error: `Unknown tool: ${toolName}` } - } - try { - const result = await t.execute!(JSON.parse(argsJson), { - toolCallId: _requestID, - messages: input.messages, - abortSignal: input.abort, - }) - const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result)) - return { - result: output, - metadata: typeof result === "object" ? result?.metadata : undefined, - title: typeof result === "object" ? result?.title : undefined, - } - } catch (e: any) { - return { result: "", error: e.message ?? String(e) } + try { + const result = await t.execute!(JSON.parse(argsJson), { + toolCallId: _requestID, + messages: input.messages, + abortSignal: input.abort, + }) + const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result)) + return { + result: output, + metadata: typeof result === "object" ? result?.metadata : undefined, + title: typeof result === "object" ? result?.title : undefined, } + } catch (e: any) { + return { result: "", error: e.message ?? String(e) } } + } - const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? []) - workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => { - const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission)) - return !match || match.action !== "ask" - }) + const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? []) + workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => { + const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission)) + return !match || match.action !== "ask" + }) - const approvedToolsForSession = new Set() - workflowModel.approvalHandler = Instance.bind(async (approvalTools) => { - const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[] - // Auto-approve tools that were already approved in this session - // (prevents infinite approval loops for server-side MCP tools) - if (uniqueNames.every((name) => approvedToolsForSession.has(name))) { - return { approved: true } - } + const bridge = yield* EffectBridge.make() + const approvedToolsForSession = new Set() + workflowModel.approvalHandler = Instance.bind(async (approvalTools) => { + const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[] + // Auto-approve tools that were already approved in this session + // (prevents infinite approval loops for server-side MCP tools) + if (uniqueNames.every((name) => approvedToolsForSession.has(name))) { + return { approved: true } + } - const id = PermissionID.ascending() - let reply: Permission.Reply | undefined - let unsub: (() => void) | undefined - try { - unsub = Bus.subscribe(Permission.Event.Replied, (evt) => { - if (evt.properties.requestID === id) reply = evt.properties.reply - }) - const toolPatterns = approvalTools.map((t: { name: string; args: string }) => { - try { - const parsed = JSON.parse(t.args) as Record - const title = (parsed?.title ?? parsed?.name ?? "") as string - return title ? `${t.name}: ${title}` : t.name - } catch { - return t.name - } - }) - const uniquePatterns = [...new Set(toolPatterns)] as string[] - await perms.runPromise((svc) => - svc.ask({ - id, - sessionID: SessionID.make(input.sessionID), - permission: "workflow_tool_approval", - patterns: uniquePatterns, - metadata: { tools: approvalTools }, - always: uniquePatterns, - ruleset: [], - }), - ) - for (const name of uniqueNames) approvedToolsForSession.add(name) - workflowModel.sessionPreapprovedTools = [ - ...(workflowModel.sessionPreapprovedTools ?? []), - ...uniqueNames, - ] - return { approved: true } - } catch { - return { approved: false } - } finally { - unsub?.() - } - }) - } + const id = PermissionID.ascending() + let reply: Permission.Reply | undefined + let unsub: (() => void) | undefined + try { + unsub = Bus.subscribe(Permission.Event.Replied, (evt) => { + if (evt.properties.requestID === id) reply = evt.properties.reply + }) + const toolPatterns = approvalTools.map((t: { name: string; args: string }) => { + try { + const parsed = JSON.parse(t.args) as Record + const title = (parsed?.title ?? parsed?.name ?? "") as string + return title ? `${t.name}: ${title}` : t.name + } catch { + return t.name + } + }) + const uniquePatterns = [...new Set(toolPatterns)] as string[] + await bridge.promise( + perm.ask({ + id, + sessionID: SessionID.make(input.sessionID), + permission: "workflow_tool_approval", + patterns: uniquePatterns, + metadata: { tools: approvalTools }, + always: uniquePatterns, + ruleset: [], + }), + ) + for (const name of uniqueNames) approvedToolsForSession.add(name) + workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames] + return { approved: true } + } catch { + return { approved: false } + } finally { + unsub?.() + } + }) + } - const tracer = cfg.experimental?.openTelemetry - ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer)) - : undefined + const tracer = cfg.experimental?.openTelemetry + ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer)) + : undefined - return streamText({ - onError(error) { - l.error("stream error", { - error, + return streamText({ + onError(error) { + l.error("stream error", { + error, + }) + }, + async experimental_repairToolCall(failed) { + const lower = failed.toolCall.toolName.toLowerCase() + if (lower !== failed.toolCall.toolName && tools[lower]) { + l.info("repairing tool call", { + tool: failed.toolCall.toolName, + repaired: lower, }) - }, - async experimental_repairToolCall(failed) { - const lower = failed.toolCall.toolName.toLowerCase() - if (lower !== failed.toolCall.toolName && tools[lower]) { - l.info("repairing tool call", { - tool: failed.toolCall.toolName, - repaired: lower, - }) - return { - ...failed.toolCall, - toolName: lower, - } - } return { ...failed.toolCall, - input: JSON.stringify({ - tool: failed.toolCall.toolName, - error: failed.error.message, - }), - toolName: "invalid", + toolName: lower, } - }, - temperature: params.temperature, - topP: params.topP, - topK: params.topK, - providerOptions: ProviderTransform.providerOptions(input.model, params.options), - activeTools: Object.keys(tools).filter((x) => x !== "invalid"), - tools, - toolChoice: input.toolChoice, - maxOutputTokens: params.maxOutputTokens, - abortSignal: input.abort, - headers: { - ...(input.model.providerID.startsWith("opencode") - ? { - "x-opencode-project": Instance.project.id, - "x-opencode-session": input.sessionID, - "x-opencode-request": input.user.id, - "x-opencode-client": Flag.OPENCODE_CLIENT, + } + return { + ...failed.toolCall, + input: JSON.stringify({ + tool: failed.toolCall.toolName, + error: failed.error.message, + }), + toolName: "invalid", + } + }, + temperature: params.temperature, + topP: params.topP, + topK: params.topK, + providerOptions: ProviderTransform.providerOptions(input.model, params.options), + activeTools: Object.keys(tools).filter((x) => x !== "invalid"), + tools, + toolChoice: input.toolChoice, + maxOutputTokens: params.maxOutputTokens, + abortSignal: input.abort, + headers: { + ...(input.model.providerID.startsWith("opencode") + ? { + "x-opencode-project": Instance.project.id, + "x-opencode-session": input.sessionID, + "x-opencode-request": input.user.id, + "x-opencode-client": Flag.OPENCODE_CLIENT, + } + : { + "x-session-affinity": input.sessionID, + ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), + "User-Agent": `opencode/${Installation.VERSION}`, + }), + ...input.model.headers, + ...headers, + }, + maxRetries: input.retries ?? 0, + messages, + model: wrapLanguageModel({ + model: language, + middleware: [ + { + specificationVersion: "v3" as const, + async transformParams(args) { + if (args.type === "stream") { + // @ts-expect-error + args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options) } - : { - "x-session-affinity": input.sessionID, - ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), - "User-Agent": `opencode/${Installation.VERSION}`, - }), - ...input.model.headers, - ...headers, - }, - maxRetries: input.retries ?? 0, - messages, - model: wrapLanguageModel({ - model: language, - middleware: [ - { - specificationVersion: "v3" as const, - async transformParams(args) { - if (args.type === "stream") { - // @ts-expect-error - args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options) - } - return args.params - }, + return args.params }, - ], - }), - experimental_telemetry: { - isEnabled: cfg.experimental?.openTelemetry, - functionId: "session.llm", - tracer, - metadata: { - userId: cfg.username ?? "unknown", - sessionId: input.sessionID, }, + ], + }), + experimental_telemetry: { + isEnabled: cfg.experimental?.openTelemetry, + functionId: "session.llm", + tracer, + metadata: { + userId: cfg.username ?? "unknown", + sessionId: input.sessionID, }, - }) + }, }) + }) - const stream: Interface["stream"] = (input) => - Stream.scoped( - Stream.unwrap( - Effect.gen(function* () { - const ctrl = yield* Effect.acquireRelease( - Effect.sync(() => new AbortController()), - (ctrl) => Effect.sync(() => ctrl.abort()), - ) + const stream: Interface["stream"] = (input) => + Stream.scoped( + Stream.unwrap( + Effect.gen(function* () { + const ctrl = yield* Effect.acquireRelease( + Effect.sync(() => new AbortController()), + (ctrl) => Effect.sync(() => ctrl.abort()), + ) - const result = yield* run({ ...input, abort: ctrl.signal }) + const result = yield* run({ ...input, abort: ctrl.signal }) - return Stream.fromAsyncIterable(result.fullStream, (e) => - e instanceof Error ? e : new Error(String(e)), - ) - }), - ), - ) + return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e)))) + }), + ), + ) - return Service.of({ stream }) - }), - ) + return Service.of({ stream }) + }), + ) + + export const layer = live.pipe(Layer.provide(Permission.defaultLayer)) export const defaultLayer = Layer.suspend(() => layer.pipe(