From 39288a6953361c9d9af6b6522a218a66ef34725b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 3 May 2026 09:18:28 -0400 Subject: [PATCH 1/2] feat(httpapi-listener): workspace-proxy WS forwarding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bridge workspace-proxy WebSocket upgrades inside the Bun.serve listener so the Hono path's WorkspaceRouterMiddleware → ServerProxy.websocket flow has a native equivalent. The HttpApi handler still owns HTTP; the listener now also resolves the workspace target inline (?workspace=… or session lookup), upgrades the client connection, and bridges it to a remote WebSocket with queueing, subprotocol forwarding, and close-code propagation. This unblocks flipping Server.listen() over to the new listener but does not flip it — the Hono path remains canonical. Bun-only; node:http + ws adapter is a follow-up (TODO inline). --- .../opencode/src/server/httpapi-listener.ts | 209 +++++++++++++++++- .../test/server/httpapi-listener.test.ts | 102 +++++++++ 2 files changed, 299 insertions(+), 12 deletions(-) diff --git a/packages/opencode/src/server/httpapi-listener.ts b/packages/opencode/src/server/httpapi-listener.ts index fd65b0ae67e5..42bc3876fe41 100644 --- a/packages/opencode/src/server/httpapi-listener.ts +++ b/packages/opencode/src/server/httpapi-listener.ts @@ -8,6 +8,7 @@ import type { ServerWebSocket } from "bun" import { Effect, Schema } from "effect" +import { Flag } from "@opencode-ai/core/flag/flag" import { AppRuntime } from "@/effect/app-runtime" import { WithInstance } from "@/project/with-instance" import { Pty } from "@/pty" @@ -15,8 +16,14 @@ import { handlePtyInput } from "@/pty/input" import { PtyID } from "@/pty/schema" import { PtyPaths } from "@/server/routes/instance/httpapi/groups/pty" import { ExperimentalHttpApiServer } from "@/server/routes/instance/httpapi/server" +import { getAdapter } from "@/control-plane/adapters" +import { WorkspaceID } from "@/control-plane/schema" +import { Workspace } from "@/control-plane/workspace" +import { Session } from "@/session/session" import * as Log from "@opencode-ai/core/util/log" import type { CorsOptions } from "./cors" +import { ProxyUtil } from "./proxy-util" +import { getWorkspaceRouteSessionID, isLocalWorkspaceRoute, workspaceProxyURL } from "./shared/workspace-routing" const log = Log.create({ service: "httpapi-listener" }) const decodePtyID = Schema.decodeUnknownSync(PtyID) @@ -33,7 +40,9 @@ export type ListenOptions = CorsOptions & { hostname: string } -type WsKind = { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string } +type WsKind = + | { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string } + | { kind: "proxy"; remoteURL: string; subprotocols: string[] } type PtyHandler = { onMessage: (message: string | ArrayBuffer) => void @@ -41,10 +50,14 @@ type PtyHandler = { } type WsState = WsKind & { + // pty fields handler?: PtyHandler pending: Array ready: boolean closed: boolean + // proxy fields + remote?: WebSocket + proxyQueue?: Array } // Derive from the OpenAPI path so this stays in sync if the route literal moves. @@ -57,6 +70,71 @@ function parseCursor(value: string | null): number | undefined { return parsed } +function openProxy(ws: ServerWebSocket) { + const data = ws.data + if (data.kind !== "proxy") return + let remote: WebSocket + try { + remote = new WebSocket(data.remoteURL, data.subprotocols.length ? data.subprotocols : undefined) + } catch (err) { + log.error("proxy remote WebSocket construct failed", { error: err }) + ws.close(1011, "proxy connect failed") + return + } + remote.binaryType = "arraybuffer" + data.remote = remote + + remote.onopen = () => { + const queue = data.proxyQueue + if (queue) { + for (const item of queue) { + try { + remote.send(item as never) + } catch { + // ignore — close handlers will clean up + } + } + queue.length = 0 + } + } + remote.onmessage = (event: MessageEvent) => { + try { + const payload = event.data + if (typeof payload === "string") { + ws.send(payload) + } else if (payload instanceof ArrayBuffer) { + ws.send(new Uint8Array(payload)) + } else if (payload instanceof Uint8Array) { + ws.send(payload) + } else if (payload instanceof Blob) { + void payload.arrayBuffer().then((buf) => { + try { + ws.send(new Uint8Array(buf)) + } catch { + // ignore + } + }) + } + } catch { + // ignore — socket likely closed + } + } + remote.onerror = () => { + try { + ws.close(1011, "proxy error") + } catch { + // ignore + } + } + remote.onclose = (event: CloseEvent) => { + try { + ws.close(event.code, event.reason) + } catch { + // ignore + } + } +} + function asAdapter(ws: ServerWebSocket) { return { get readyState() { @@ -80,6 +158,55 @@ function asAdapter(ws: ServerWebSocket) { } } +async function resolveWorkspaceProxy( + request: Request, + url: URL, +): Promise<{ remoteURL: URL; subprotocols: string[] } | undefined> { + // Skip proxy resolution entirely when this process is pinned to a single + // workspace (the Hono path's WorkspaceRouterMiddleware uses the same guard). + if (Flag.OPENCODE_WORKSPACE_ID) return undefined + + // Local-only routes (e.g. /experimental/workspace, GET /session) never + // forward — match the Hono behavior even though those routes don't currently + // upgrade to WS. + if (isLocalWorkspaceRoute(request.method, url.pathname)) return undefined + + // /console paths are served locally and never proxied. + if (url.pathname.startsWith("/console")) return undefined + + let workspaceID: string | null = null + + // Prefer session-derived workspace lookup when a session ID is present in + // the path; fall back to the explicit ?workspace=... query parameter. + const sessionID = getWorkspaceRouteSessionID(url) + if (sessionID) { + const session = await AppRuntime.runPromise( + Session.Service.use((svc) => svc.get(sessionID)).pipe(Effect.withSpan("HttpApiListener.proxy.session")), + ).catch(() => undefined) + if (session?.workspaceID) workspaceID = session.workspaceID + } + if (!workspaceID) workspaceID = url.searchParams.get("workspace") + if (!workspaceID) return undefined + + const workspace = await AppRuntime.runPromise( + Workspace.Service.use((svc) => svc.get(WorkspaceID.make(workspaceID))).pipe( + Effect.withSpan("HttpApiListener.proxy.workspace"), + ), + ).catch(() => undefined) + if (!workspace) return undefined + + const adapter = getAdapter(workspace.projectID, workspace.type) + const target = await adapter.target(workspace) + if (target.type !== "remote") return undefined + + const proxyURL = workspaceProxyURL(target.url, url) + const remoteURL = new URL(ProxyUtil.websocketTargetURL(proxyURL)) + return { + remoteURL, + subprotocols: ProxyUtil.websocketProtocols(request), + } +} + /** * Spin up a native Bun.serve that: * 1. Routes all HTTP traffic through the HttpApi web handler. @@ -98,10 +225,11 @@ export async function listen(opts: ListenOptions): Promise { hostname: opts.hostname, port, idleTimeout: 0, - fetch(request, server) { + async fetch(request, server) { const url = new URL(request.url) + const isUpgrade = request.headers.get("upgrade")?.toLowerCase() === "websocket" const ptyMatch = url.pathname.match(ptyConnectPattern) - if (ptyMatch && request.headers.get("upgrade")?.toLowerCase() === "websocket") { + if (ptyMatch && isUpgrade) { const ptyID = ptyMatch[1]! const cursor = parseCursor(url.searchParams.get("cursor")) // Resolve the instance directory the same way the HttpApi @@ -124,20 +252,50 @@ export async function listen(opts: ListenOptions): Promise { return new Response("upgrade failed", { status: 400 }) } - // TODO: workspace-proxy WS upgrade detection. The Hono path forwards via a - // remote `new WebSocket(url, ...)` (see ServerProxy.websocket). To support - // that here we'd need to (a) resolve the workspace target the same way - // `WorkspaceRouterMiddleware` does today, then (b) `server.upgrade(request, - // { data: { kind: "proxy", target, headers, protocols } })` and bridge the - // ServerWebSocket to a remote WebSocket inside the `websocket` handlers. - // Deferred to a follow-up — the proxy story needs more design (auth header - // forwarding, fence sync, reconnection semantics) than fits this PR. + // Workspace-proxy WS forwarding. Mirrors the Hono path's + // `WorkspaceRouterMiddleware` → `ServerProxy.websocket` flow but inline. + // Bridging to the remote `new WebSocket(...)` happens inside the + // `websocket.open` handler below. + // + // TODO: Node adapter (no Bun.serve) needs an equivalent path using + // `node:http` + `ws`. + if (isUpgrade) { + try { + const proxy = await resolveWorkspaceProxy(request, url) + if (proxy) { + log.info("workspace-proxy websocket", { + request: url.toString(), + remote: proxy.remoteURL.toString(), + }) + const upgraded = server.upgrade(request, { + data: { + kind: "proxy", + remoteURL: proxy.remoteURL.toString(), + subprotocols: proxy.subprotocols, + pending: [], + ready: false, + closed: false, + proxyQueue: [], + } satisfies WsState, + }) + if (upgraded) return undefined + return new Response("upgrade failed", { status: 400 }) + } + } catch (err) { + log.error("workspace-proxy ws resolve failed", { error: err }) + return new Response("workspace lookup failed", { status: 500 }) + } + } return handler(request as Request, context as never) }, websocket: { open(ws) { const data = ws.data + if (data.kind === "proxy") { + openProxy(ws) + return + } if (data.kind !== "pty") { ws.close(1011, "unknown ws kind") return @@ -187,6 +345,25 @@ export async function listen(opts: ListenOptions): Promise { }, message(ws, message) { const data = ws.data + if (data.kind === "proxy") { + const payload = + typeof message === "string" + ? message + : message instanceof Buffer + ? new Uint8Array(message.buffer, message.byteOffset, message.byteLength) + : (message as Uint8Array) + const remote = data.remote + if (remote && remote.readyState === WebSocket.OPEN) { + try { + remote.send(payload) + } catch { + // ignore send errors; lifecycle handlers will tear things down + } + return + } + data.proxyQueue?.push(payload) + return + } if (data.kind !== "pty") return const payload = typeof message === "string" @@ -200,9 +377,17 @@ export async function listen(opts: ListenOptions): Promise { } AppRuntime.runPromise(handlePtyInput(data.handler, payload)).catch(() => undefined) }, - close(ws) { + close(ws, code, reason) { const data = ws.data data.closed = true + if (data.kind === "proxy") { + try { + data.remote?.close(code, reason) + } catch { + // ignore + } + return + } data.handler?.onClose() }, }, diff --git a/packages/opencode/test/server/httpapi-listener.test.ts b/packages/opencode/test/server/httpapi-listener.test.ts index de7b5987ec34..4c5c8c1be8eb 100644 --- a/packages/opencode/test/server/httpapi-listener.test.ts +++ b/packages/opencode/test/server/httpapi-listener.test.ts @@ -1,10 +1,19 @@ import { afterEach, describe, expect, test } from "bun:test" +import type { ServerWebSocket } from "bun" +import { mkdir } from "node:fs/promises" +import path from "node:path" import { Flag } from "@opencode-ai/core/flag/flag" import * as Log from "@opencode-ai/core/util/log" import { resetDatabase } from "../fixture/db" import { disposeAllInstances, tmpdir } from "../fixture/fixture" +import { registerAdapter } from "../../src/control-plane/adapters" +import type { WorkspaceAdapter } from "../../src/control-plane/types" +import { Workspace } from "../../src/control-plane/workspace" +import { AppRuntime } from "../../src/effect/app-runtime" +import { Project } from "../../src/project/project" import { HttpApiListener } from "../../src/server/httpapi-listener" import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty" +import { Effect } from "effect" void Log.init({ print: false }) @@ -43,6 +52,99 @@ describe("native HttpApi listener", () => { } }) + test("workspace-proxy WS forwarding round-trips through a fake remote", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + + // Tiny Bun.serve fake remote that echoes every WS frame it receives. + type EchoState = { closed: boolean } + const remote = Bun.serve({ + hostname: "127.0.0.1", + port: 0, + fetch(request, server) { + if (request.headers.get("upgrade")?.toLowerCase() === "websocket") { + if (server.upgrade(request, { data: { closed: false } })) return undefined + return new Response("upgrade failed", { status: 400 }) + } + return new Response("ok") + }, + websocket: { + open(_ws: ServerWebSocket) {}, + message(ws: ServerWebSocket, msg: string | Buffer) { + ws.send(typeof msg === "string" ? `echo:${msg}` : msg) + }, + close(_ws: ServerWebSocket) {}, + }, + }) + + // The path "/probe" is not a known local-only or PTY route, so the listener + // should treat it as a candidate for workspace-proxy WS forwarding. + const remoteBase = `http://${remote.hostname}:${remote.port}` + + // Register a remote workspace whose target points at the echo server. + const adapter: WorkspaceAdapter = { + name: "Remote Listener Test", + description: "Remote workspace target for HttpApiListener proxy WS test", + configure: (info) => ({ ...info, name: "remote-listener-test", directory: path.join(tmp.path, ".remote") }), + create: async () => { + await mkdir(path.join(tmp.path, ".remote"), { recursive: true }) + }, + async remove() {}, + target: () => ({ type: "remote" as const, url: remoteBase }), + } + + const workspaceID = await AppRuntime.runPromise( + Effect.gen(function* () { + const project = yield* Project.Service.use((svc) => svc.fromDirectory(tmp.path)) + registerAdapter(project.project.id, "httpapi-listener-proxy-ws", adapter) + const created = yield* Workspace.Service.use((svc) => + svc.create({ + type: "httpapi-listener-proxy-ws", + branch: null, + extra: null, + projectID: project.project.id, + }), + ) + return created.id + }), + ) + + const listener = await startListener() + try { + const wsURL = new URL("/probe", listener.url) + wsURL.protocol = "ws:" + wsURL.searchParams.set("workspace", workspaceID) + + const messages: string[] = [] + const ws = new WebSocket(wsURL) + ws.binaryType = "arraybuffer" + + const opened = new Promise((resolve, reject) => { + ws.addEventListener("open", () => resolve(), { once: true }) + ws.addEventListener("error", () => reject(new Error("ws error before open")), { once: true }) + }) + + ws.addEventListener("message", (event) => { + const data = event.data + messages.push(typeof data === "string" ? data : new TextDecoder().decode(data as ArrayBuffer)) + }) + + await opened + ws.send("hello-proxy") + + const start = Date.now() + while (!messages.some((m) => m === "echo:hello-proxy") && Date.now() - start < 5_000) { + await new Promise((r) => setTimeout(r, 25)) + } + + expect(messages).toContain("echo:hello-proxy") + + ws.close(1000, "done") + } finally { + await listener.stop(true) + remote.stop(true) + } + }) + testPty("PTY websocket connect echoes input back to the client", async () => { await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) const listener = await startListener() From adc5f528afd13a7e1dc754e02c9cdecc7d25a46d Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 3 May 2026 09:24:18 -0400 Subject: [PATCH 2/2] feat(server): wire Server.listen() through native HttpApi listener (kill-switch) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the effect-httpapi backend is selected, Server.listen() now delegates to HttpApiListener.listen() — a native Bun.serve listener with inline WebSocket upgrade handling — instead of routing through the Hono runtime adapter (Hono.fetch + createBunWebSocket). The Hono backend path is unchanged, and a kill-switch env var (OPENCODE_HTTPAPI_LEGACY_LISTENER) forces the effect-httpapi backend back through the Hono adapter as an escape hatch if the native listener regresses for a user. This unblocks the Hono deletion arc by giving the native listener real production traffic on dev/beta/local channels (where OPENCODE_EXPERIMENTAL_HTTPAPI defaults on) while leaving prod/latest channels on the Hono path. --- packages/core/src/flag/flag.ts | 6 ++++ packages/opencode/src/server/server.ts | 39 +++++++++++++++++++------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/packages/core/src/flag/flag.ts b/packages/core/src/flag/flag.ts index 0daae55800c1..466ed4e12e8b 100644 --- a/packages/core/src/flag/flag.ts +++ b/packages/core/src/flag/flag.ts @@ -94,6 +94,12 @@ export const Flag = { OPENCODE_EXPERIMENTAL_HTTPAPI: truthy("OPENCODE_EXPERIMENTAL_HTTPAPI") || (!falsy("OPENCODE_EXPERIMENTAL_HTTPAPI") && HTTPAPI_DEFAULT_ON_CHANNELS.has(InstallationChannel)), + // Kill-switch that forces the effect-httpapi backend back through the legacy + // hono runtime adapter (Hono.fetch + createBunWebSocket) instead of the + // native Bun.serve listener. Defaults to false; set to "true"/"1" to revert + // if the native listener regresses for a user. Has no effect when the hono + // backend is selected. + OPENCODE_HTTPAPI_LEGACY_LISTENER: truthy("OPENCODE_HTTPAPI_LEGACY_LISTENER"), OPENCODE_EXPERIMENTAL_WORKSPACES: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES"), OPENCODE_EXPERIMENTAL_EVENT_SYSTEM: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EVENT_SYSTEM"), diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 13ec7061639a..44875965df29 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -19,6 +19,7 @@ import { InstanceMiddleware } from "./routes/instance/middleware" import { WorkspaceRoutes } from "./routes/control/workspace" import { ExperimentalHttpApiServer } from "./routes/instance/httpapi/server" import { PublicApi } from "./routes/instance/httpapi/public" +import { HttpApiListener } from "./httpapi-listener" import * as ServerBackend from "./backend" import type { CorsOptions } from "./cors" @@ -182,35 +183,53 @@ export async function openapiHono() { export let url: URL export async function listen(opts: ListenOptions): Promise { - const built = create(opts) - const server = await built.runtime.listen(opts) + const selected = select() + const native = selected.backend === "effect-httpapi" && !Flag.OPENCODE_HTTPAPI_LEGACY_LISTENER + let inner: Listener + if (native) { + log.info("server backend selected", { + ...ServerBackend.attributes(selected), + "opencode.server.listener": "bun-native", + }) + inner = await HttpApiListener.listen(opts) + } else { + const built = create(opts) + const server = await built.runtime.listen(opts) + const innerUrl = new URL("http://localhost") + innerUrl.hostname = opts.hostname + innerUrl.port = String(server.port) + inner = { + hostname: opts.hostname, + port: server.port, + url: innerUrl, + stop: (close?: boolean) => server.stop(close), + } + } - const next = new URL("http://localhost") - next.hostname = opts.hostname - next.port = String(server.port) + const next = new URL(inner.url) url = next const mdns = opts.mdns && - server.port && + inner.port && opts.hostname !== "127.0.0.1" && opts.hostname !== "localhost" && opts.hostname !== "::1" if (mdns) { - MDNS.publish(server.port, opts.mdnsDomain) + MDNS.publish(inner.port, opts.mdnsDomain) } else if (opts.mdns) { log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish") } let closing: Promise | undefined return { - hostname: opts.hostname, - port: server.port, + hostname: inner.hostname, + port: inner.port, url: next, stop(close?: boolean) { closing ??= (async () => { if (mdns) MDNS.unpublish() - await server.stop(close) + await inner.stop(close) })() return closing },