Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/opencode/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"@octokit/rest": "catalog:",
"@openauthjs/openauth": "catalog:",
"@opencode-ai/plugin": "workspace:*",
"@opencode-ai/server": "workspace:*",
"@opencode-ai/script": "workspace:*",
"@opencode-ai/sdk": "workspace:*",
"@opencode-ai/util": "workspace:*",
Expand Down
5 changes: 3 additions & 2 deletions packages/opencode/src/bus/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import z from "zod"
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { Log } from "../util/log"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
Expand Down Expand Up @@ -128,6 +128,7 @@ export namespace Bus {
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
return Effect.gen(function* () {
log.info("subscribing", { type })
const bridge = yield* EffectBridge.make()
const scope = yield* Scope.make()
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))

Expand All @@ -147,7 +148,7 @@ export namespace Bus {

return () => {
log.info("unsubscribing", { type })
Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer)))
bridge.fork(Scope.close(scope, Exit.void))
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions packages/opencode/src/command/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
import { EffectBridge } from "@/effect/bridge"
import type { InstanceContext } from "@/project/instance"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, Context } from "effect"
import { EffectLogger } from "@/effect/logger"
import z from "zod"
import { Config } from "../config/config"
import { MCP } from "../mcp"
Expand Down Expand Up @@ -82,6 +82,7 @@ export namespace Command {

const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) {
const cfg = yield* config.get()
const bridge = yield* EffectBridge.make()
const commands: Record<string, Info> = {}

commands[Default.INIT] = {
Expand Down Expand Up @@ -125,7 +126,7 @@ export namespace Command {
source: "mcp",
description: prompt.description,
get template() {
return Effect.runPromise(
return bridge.promise(
mcp
.getPrompt(
prompt.client,
Expand All @@ -141,7 +142,6 @@ export namespace Command {
.map((message) => (message.content.type === "text" ? message.content.text : ""))
.join("\n") || "",
),
Effect.provide(EffectLogger.layer),
),
)
},
Expand Down
4 changes: 4 additions & 0 deletions packages/opencode/src/control-plane/workspace-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export const WorkspaceContext = {
return context.provide({ workspaceID: input.workspaceID as string }, () => input.fn())
},

restore<R>(workspaceID: string, fn: () => R): R {
return context.provide({ workspaceID }, fn)
},

get workspaceID() {
try {
return context.use().workspaceID
Expand Down
49 changes: 49 additions & 0 deletions packages/opencode/src/effect/bridge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Effect, Fiber } from "effect"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { Instance, type InstanceContext } from "@/project/instance"
import { LocalContext } from "@/util/local-context"
import { InstanceRef, WorkspaceRef } from "./instance-ref"
import { attachWith } from "./run-service"

export namespace EffectBridge {
export interface Shape {
readonly promise: <A, E, R>(effect: Effect.Effect<A, E, R>) => Promise<A>
readonly fork: <A, E, R>(effect: Effect.Effect<A, E, R>) => Fiber.Fiber<A, E>
}

function restore<R>(instance: InstanceContext | undefined, workspace: string | undefined, fn: () => R): R {
if (instance && workspace !== undefined) {
return WorkspaceContext.restore(workspace, () => Instance.restore(instance, fn))
}
if (instance) return Instance.restore(instance, fn)
if (workspace !== undefined) return WorkspaceContext.restore(workspace, fn)
return fn()
}

export function make(): Effect.Effect<Shape> {
return Effect.gen(function* () {
const ctx = yield* Effect.context()
const value = yield* InstanceRef
const instance =
value ??
(() => {
try {
return Instance.current
} catch (err) {
if (!(err instanceof LocalContext.NotFound)) throw err
}
})()
const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID
const attach = <A, E, R>(effect: Effect.Effect<A, E, R>) => attachWith(effect, { instance, workspace })
const wrap = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
attach(effect).pipe(Effect.provide(ctx)) as Effect.Effect<A, E, never>

return {
promise: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
restore(instance, workspace, () => Effect.runPromise(wrap(effect))),
fork: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
restore(instance, workspace, () => Effect.runFork(wrap(effect))),
} satisfies Shape
})
}
}
23 changes: 20 additions & 3 deletions packages/opencode/src/effect/run-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,31 @@ import { LocalContext } from "@/util/local-context"
import { InstanceRef, WorkspaceRef } from "./instance-ref"
import { Observability } from "./oltp"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import type { InstanceContext } from "@/project/instance"

export const memoMap = Layer.makeMemoMapUnsafe()

type Refs = {
instance?: InstanceContext
workspace?: string
}

export function attachWith<A, E, R>(effect: Effect.Effect<A, E, R>, refs: Refs): Effect.Effect<A, E, R> {
if (!refs.instance && !refs.workspace) return effect
if (!refs.instance) return effect.pipe(Effect.provideService(WorkspaceRef, refs.workspace))
if (!refs.workspace) return effect.pipe(Effect.provideService(InstanceRef, refs.instance))
return effect.pipe(
Effect.provideService(InstanceRef, refs.instance),
Effect.provideService(WorkspaceRef, refs.workspace),
)
}

export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
try {
const ctx = Instance.current
const workspaceID = WorkspaceContext.workspaceID
return effect.pipe(Effect.provideService(InstanceRef, ctx), Effect.provideService(WorkspaceRef, workspaceID))
return attachWith(effect, {
instance: Instance.current,
workspace: WorkspaceContext.workspaceID,
})
} catch (err) {
if (!(err instanceof LocalContext.NotFound)) throw err
}
Expand Down
16 changes: 8 additions & 8 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { Bus } from "@/bus"
import { TuiEvent } from "@/cli/cmd/tui/event"
import open from "open"
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
Expand Down Expand Up @@ -471,25 +471,24 @@ export namespace MCP {
Effect.catch(() => Effect.succeed([] as number[])),
)

function watch(s: State, name: string, client: MCPClient, timeout?: number) {
function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
log.info("tools list changed notification received", { server: name })
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer)))
const listed = await bridge.promise(defs(name, client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

s.defs[name] = listed
await Effect.runPromise(
bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)),
)
await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
})
}

const state = yield* InstanceState.make<State>(
Effect.fn("MCP.state")(function* () {
const cfg = yield* cfgSvc.get()
const bridge = yield* EffectBridge.make()
const config = cfg.mcp ?? {}
const s: State = {
status: {},
Expand Down Expand Up @@ -518,7 +517,7 @@ export namespace MCP {
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs!
watch(s, key, result.mcpClient, mcp.timeout)
watch(s, key, result.mcpClient, bridge, mcp.timeout)
}
}),
{ concurrency: "unbounded" },
Expand Down Expand Up @@ -565,11 +564,12 @@ export namespace MCP {
listed: MCPToolDef[],
timeout?: number,
) {
const bridge = yield* EffectBridge.make()
yield* closeClient(s, name)
s.status[name] = { status: "connected" }
s.clients[name] = client
s.defs[name] = listed
watch(s, name, client, timeout)
watch(s, name, client, bridge, timeout)
return s.status[name]
})

Expand Down
23 changes: 10 additions & 13 deletions packages/opencode/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
import { PoeAuthPlugin } from "opencode-poe-auth"
import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare"
import { Effect, Layer, Context, Stream } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { errorMessage } from "@/util/error"
import { PluginLoader } from "./loader"
Expand Down Expand Up @@ -90,14 +90,6 @@ export namespace Plugin {
return result
}

function publishPluginError(bus: Bus.Interface, message: string) {
Effect.runFork(
bus
.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
.pipe(Effect.provide(EffectLogger.layer)),
)
}

async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) {
const plugin = readV1Plugin(load.mod, load.spec, "server", "detect")
if (plugin) {
Expand All @@ -120,6 +112,11 @@ export namespace Plugin {
const state = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
const bridge = yield* EffectBridge.make()

function publishPluginError(message: string) {
bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }))
}

const { Server } = yield* Effect.promise(() => import("../server/server"))

Expand Down Expand Up @@ -187,24 +184,24 @@ export namespace Plugin {
if (stage === "install") {
const parsed = parsePluginSpecifier(spec)
log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message })
publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
return
}

if (stage === "compatibility") {
log.warn("plugin incompatible", { path: spec, error: message })
publishPluginError(bus, `Plugin ${spec} skipped: ${message}`)
publishPluginError(`Plugin ${spec} skipped: ${message}`)
return
}

if (stage === "entry") {
log.error("failed to resolve plugin server entry", { path: spec, error: message })
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
return
}

log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message })
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
},
},
}),
Expand Down
6 changes: 3 additions & 3 deletions packages/opencode/src/provider/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { iife } from "@/util/iife"
import { Global } from "../global"
import path from "path"
import { Effect, Layer, Context } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { AppFileSystem } from "@/filesystem"
import { isRecord } from "@/util/record"
Expand Down Expand Up @@ -1043,6 +1043,7 @@ export namespace Provider {
const state = yield* InstanceState.make<State>(() =>
Effect.gen(function* () {
using _ = log.time("state")
const bridge = yield* EffectBridge.make()
const cfg = yield* config.get()
const modelsDev = yield* Effect.promise(() => ModelsDev.get())
const database = mapValues(modelsDev, fromModelsDevProvider)
Expand Down Expand Up @@ -1223,8 +1224,7 @@ export namespace Provider {

const options = yield* Effect.promise(() =>
plugin.auth!.loader!(
() =>
Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any,
() => bridge.promise(auth.get(providerID).pipe(Effect.orDie)) as any,
database[plugin.auth!.provider],
),
)
Expand Down
7 changes: 4 additions & 3 deletions packages/opencode/src/pty/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
import { Effect, Layer, Context } from "effect"
import { EffectLogger } from "@/effect/logger"
import { EffectBridge } from "@/effect/bridge"

export namespace Pty {
const log = Log.create({ service: "pty" })
Expand Down Expand Up @@ -173,6 +173,7 @@ export namespace Pty {

const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
const s = yield* InstanceState.get(state)
const bridge = yield* EffectBridge.make()
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
Expand Down Expand Up @@ -256,8 +257,8 @@ export namespace Pty {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer)))
Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer)))
bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
bridge.fork(remove(id))
}),
)
yield* bus.publish(Event.Created, { info })
Expand Down
Loading
Loading