Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/core/src/flag/flag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ export const Flag = {
OPENCODE_EXPERIMENTAL_HTTPAPI:
truthy("OPENCODE_EXPERIMENTAL_HTTPAPI") ||
(!falsy("OPENCODE_EXPERIMENTAL_HTTPAPI") && HTTPAPI_DEFAULT_ON_CHANNELS.has(InstallationChannel)),
// Kill-switch that forces the effect-httpapi backend back through the legacy
// hono runtime adapter (Hono.fetch + createBunWebSocket) instead of the
// native Bun.serve listener. Defaults to false; set to "true"/"1" to revert
// if the native listener regresses for a user. Has no effect when the hono
// backend is selected.
OPENCODE_HTTPAPI_LEGACY_LISTENER: truthy("OPENCODE_HTTPAPI_LEGACY_LISTENER"),
OPENCODE_EXPERIMENTAL_WORKSPACES: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES"),
OPENCODE_EXPERIMENTAL_EVENT_SYSTEM: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EVENT_SYSTEM"),

Expand Down
209 changes: 197 additions & 12 deletions packages/opencode/src/server/httpapi-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@

import type { ServerWebSocket } from "bun"
import { Effect, Schema } from "effect"
import { Flag } from "@opencode-ai/core/flag/flag"
import { AppRuntime } from "@/effect/app-runtime"
import { WithInstance } from "@/project/with-instance"
import { Pty } from "@/pty"
import { handlePtyInput } from "@/pty/input"
import { PtyID } from "@/pty/schema"
import { PtyPaths } from "@/server/routes/instance/httpapi/groups/pty"
import { ExperimentalHttpApiServer } from "@/server/routes/instance/httpapi/server"
import { getAdapter } from "@/control-plane/adapters"
import { WorkspaceID } from "@/control-plane/schema"
import { Workspace } from "@/control-plane/workspace"
import { Session } from "@/session/session"
import * as Log from "@opencode-ai/core/util/log"
import type { CorsOptions } from "./cors"
import { ProxyUtil } from "./proxy-util"
import { getWorkspaceRouteSessionID, isLocalWorkspaceRoute, workspaceProxyURL } from "./shared/workspace-routing"

const log = Log.create({ service: "httpapi-listener" })
const decodePtyID = Schema.decodeUnknownSync(PtyID)
Expand All @@ -33,18 +40,24 @@ export type ListenOptions = CorsOptions & {
hostname: string
}

type WsKind = { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string }
type WsKind =
| { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string }
| { kind: "proxy"; remoteURL: string; subprotocols: string[] }

type PtyHandler = {
onMessage: (message: string | ArrayBuffer) => void
onClose: () => void
}

type WsState = WsKind & {
// pty fields
handler?: PtyHandler
pending: Array<string | Uint8Array>
ready: boolean
closed: boolean
// proxy fields
remote?: WebSocket
proxyQueue?: Array<string | Uint8Array | ArrayBuffer>
}

// Derive from the OpenAPI path so this stays in sync if the route literal moves.
Expand All @@ -57,6 +70,71 @@ function parseCursor(value: string | null): number | undefined {
return parsed
}

function openProxy(ws: ServerWebSocket<WsState>) {
const data = ws.data
if (data.kind !== "proxy") return
let remote: WebSocket
try {
remote = new WebSocket(data.remoteURL, data.subprotocols.length ? data.subprotocols : undefined)
} catch (err) {
log.error("proxy remote WebSocket construct failed", { error: err })
ws.close(1011, "proxy connect failed")
return
}
remote.binaryType = "arraybuffer"
data.remote = remote

remote.onopen = () => {
const queue = data.proxyQueue
if (queue) {
for (const item of queue) {
try {
remote.send(item as never)
} catch {
// ignore — close handlers will clean up
}
}
queue.length = 0
}
}
remote.onmessage = (event: MessageEvent) => {
try {
const payload = event.data
if (typeof payload === "string") {
ws.send(payload)
} else if (payload instanceof ArrayBuffer) {
ws.send(new Uint8Array(payload))
} else if (payload instanceof Uint8Array) {
ws.send(payload)
} else if (payload instanceof Blob) {
void payload.arrayBuffer().then((buf) => {
try {
ws.send(new Uint8Array(buf))
} catch {
// ignore
}
})
}
} catch {
// ignore — socket likely closed
}
}
remote.onerror = () => {
try {
ws.close(1011, "proxy error")
} catch {
// ignore
}
}
remote.onclose = (event: CloseEvent) => {
try {
ws.close(event.code, event.reason)
} catch {
// ignore
}
}
}

function asAdapter(ws: ServerWebSocket<WsState>) {
return {
get readyState() {
Expand All @@ -80,6 +158,55 @@ function asAdapter(ws: ServerWebSocket<WsState>) {
}
}

async function resolveWorkspaceProxy(
request: Request,
url: URL,
): Promise<{ remoteURL: URL; subprotocols: string[] } | undefined> {
// Skip proxy resolution entirely when this process is pinned to a single
// workspace (the Hono path's WorkspaceRouterMiddleware uses the same guard).
if (Flag.OPENCODE_WORKSPACE_ID) return undefined

// Local-only routes (e.g. /experimental/workspace, GET /session) never
// forward — match the Hono behavior even though those routes don't currently
// upgrade to WS.
if (isLocalWorkspaceRoute(request.method, url.pathname)) return undefined

// /console paths are served locally and never proxied.
if (url.pathname.startsWith("/console")) return undefined

let workspaceID: string | null = null

// Prefer session-derived workspace lookup when a session ID is present in
// the path; fall back to the explicit ?workspace=... query parameter.
const sessionID = getWorkspaceRouteSessionID(url)
if (sessionID) {
const session = await AppRuntime.runPromise(
Session.Service.use((svc) => svc.get(sessionID)).pipe(Effect.withSpan("HttpApiListener.proxy.session")),
).catch(() => undefined)
if (session?.workspaceID) workspaceID = session.workspaceID
}
if (!workspaceID) workspaceID = url.searchParams.get("workspace")
if (!workspaceID) return undefined

const workspace = await AppRuntime.runPromise(
Workspace.Service.use((svc) => svc.get(WorkspaceID.make(workspaceID))).pipe(
Effect.withSpan("HttpApiListener.proxy.workspace"),
),
).catch(() => undefined)
if (!workspace) return undefined

const adapter = getAdapter(workspace.projectID, workspace.type)
const target = await adapter.target(workspace)
if (target.type !== "remote") return undefined

const proxyURL = workspaceProxyURL(target.url, url)
const remoteURL = new URL(ProxyUtil.websocketTargetURL(proxyURL))
return {
remoteURL,
subprotocols: ProxyUtil.websocketProtocols(request),
}
}

/**
* Spin up a native Bun.serve that:
* 1. Routes all HTTP traffic through the HttpApi web handler.
Expand All @@ -98,10 +225,11 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
hostname: opts.hostname,
port,
idleTimeout: 0,
fetch(request, server) {
async fetch(request, server) {
const url = new URL(request.url)
const isUpgrade = request.headers.get("upgrade")?.toLowerCase() === "websocket"
const ptyMatch = url.pathname.match(ptyConnectPattern)
if (ptyMatch && request.headers.get("upgrade")?.toLowerCase() === "websocket") {
if (ptyMatch && isUpgrade) {
const ptyID = ptyMatch[1]!
const cursor = parseCursor(url.searchParams.get("cursor"))
// Resolve the instance directory the same way the HttpApi
Expand All @@ -124,20 +252,50 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
return new Response("upgrade failed", { status: 400 })
}

// TODO: workspace-proxy WS upgrade detection. The Hono path forwards via a
// remote `new WebSocket(url, ...)` (see ServerProxy.websocket). To support
// that here we'd need to (a) resolve the workspace target the same way
// `WorkspaceRouterMiddleware` does today, then (b) `server.upgrade(request,
// { data: { kind: "proxy", target, headers, protocols } })` and bridge the
// ServerWebSocket to a remote WebSocket inside the `websocket` handlers.
// Deferred to a follow-up — the proxy story needs more design (auth header
// forwarding, fence sync, reconnection semantics) than fits this PR.
// Workspace-proxy WS forwarding. Mirrors the Hono path's
// `WorkspaceRouterMiddleware` → `ServerProxy.websocket` flow but inline.
// Bridging to the remote `new WebSocket(...)` happens inside the
// `websocket.open` handler below.
//
// TODO: Node adapter (no Bun.serve) needs an equivalent path using
// `node:http` + `ws`.
if (isUpgrade) {
try {
const proxy = await resolveWorkspaceProxy(request, url)
if (proxy) {
log.info("workspace-proxy websocket", {
request: url.toString(),
remote: proxy.remoteURL.toString(),
})
const upgraded = server.upgrade(request, {
data: {
kind: "proxy",
remoteURL: proxy.remoteURL.toString(),
subprotocols: proxy.subprotocols,
pending: [],
ready: false,
closed: false,
proxyQueue: [],
} satisfies WsState,
})
if (upgraded) return undefined
return new Response("upgrade failed", { status: 400 })
}
} catch (err) {
log.error("workspace-proxy ws resolve failed", { error: err })
return new Response("workspace lookup failed", { status: 500 })
}
}

return handler(request as Request, context as never)
},
websocket: {
open(ws) {
const data = ws.data
if (data.kind === "proxy") {
openProxy(ws)
return
}
if (data.kind !== "pty") {
ws.close(1011, "unknown ws kind")
return
Expand Down Expand Up @@ -187,6 +345,25 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
},
message(ws, message) {
const data = ws.data
if (data.kind === "proxy") {
const payload =
typeof message === "string"
? message
: message instanceof Buffer
? new Uint8Array(message.buffer, message.byteOffset, message.byteLength)
: (message as Uint8Array)
const remote = data.remote
if (remote && remote.readyState === WebSocket.OPEN) {
try {
remote.send(payload)
} catch {
// ignore send errors; lifecycle handlers will tear things down
}
return
}
data.proxyQueue?.push(payload)
return
}
if (data.kind !== "pty") return
const payload =
typeof message === "string"
Expand All @@ -200,9 +377,17 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
}
AppRuntime.runPromise(handlePtyInput(data.handler, payload)).catch(() => undefined)
},
close(ws) {
close(ws, code, reason) {
const data = ws.data
data.closed = true
if (data.kind === "proxy") {
try {
data.remote?.close(code, reason)
} catch {
// ignore
}
return
}
data.handler?.onClose()
},
},
Expand Down
39 changes: 29 additions & 10 deletions packages/opencode/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { InstanceMiddleware } from "./routes/instance/middleware"
import { WorkspaceRoutes } from "./routes/control/workspace"
import { ExperimentalHttpApiServer } from "./routes/instance/httpapi/server"
import { PublicApi } from "./routes/instance/httpapi/public"
import { HttpApiListener } from "./httpapi-listener"
import * as ServerBackend from "./backend"
import type { CorsOptions } from "./cors"

Expand Down Expand Up @@ -182,35 +183,53 @@ export async function openapiHono() {
export let url: URL

export async function listen(opts: ListenOptions): Promise<Listener> {
const built = create(opts)
const server = await built.runtime.listen(opts)
const selected = select()
const native = selected.backend === "effect-httpapi" && !Flag.OPENCODE_HTTPAPI_LEGACY_LISTENER
let inner: Listener
if (native) {
log.info("server backend selected", {
...ServerBackend.attributes(selected),
"opencode.server.listener": "bun-native",
})
inner = await HttpApiListener.listen(opts)
} else {
const built = create(opts)
const server = await built.runtime.listen(opts)
const innerUrl = new URL("http://localhost")
innerUrl.hostname = opts.hostname
innerUrl.port = String(server.port)
inner = {
hostname: opts.hostname,
port: server.port,
url: innerUrl,
stop: (close?: boolean) => server.stop(close),
}
}

const next = new URL("http://localhost")
next.hostname = opts.hostname
next.port = String(server.port)
const next = new URL(inner.url)
url = next

const mdns =
opts.mdns &&
server.port &&
inner.port &&
opts.hostname !== "127.0.0.1" &&
opts.hostname !== "localhost" &&
opts.hostname !== "::1"
if (mdns) {
MDNS.publish(server.port, opts.mdnsDomain)
MDNS.publish(inner.port, opts.mdnsDomain)
} else if (opts.mdns) {
log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
}

let closing: Promise<void> | undefined
return {
hostname: opts.hostname,
port: server.port,
hostname: inner.hostname,
port: inner.port,
url: next,
stop(close?: boolean) {
closing ??= (async () => {
if (mdns) MDNS.unpublish()
await server.stop(close)
await inner.stop(close)
})()
return closing
},
Expand Down
Loading
Loading