Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions packages/opencode/src/bus/global.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { EventEmitter } from "events"

export type GlobalEvent = {
directory?: string
project?: string
workspace?: string
payload: any
}

export const GlobalBus = new EventEmitter<{
event: [
{
directory?: string
project?: string
workspace?: string
payload: any
},
]
event: [GlobalEvent]
}>()
37 changes: 37 additions & 0 deletions packages/opencode/src/control-plane/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { GlobalBus, type GlobalEvent } from "@/bus/global"

export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) {
if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted"))

return new Promise<void>((resolve, reject) => {
const abort = () => {
cleanup()
reject(input.signal?.reason ?? new Error("Request aborted"))
}

const handler = (event: GlobalEvent) => {
try {
if (!input.fn(event)) return
cleanup()
resolve()
} catch (error) {
cleanup()
reject(error)
}
}

const cleanup = () => {
clearTimeout(timeout)
GlobalBus.off("event", handler)
input.signal?.removeEventListener("abort", abort)
}

const timeout = setTimeout(() => {
cleanup()
reject(new Error("Timed out waiting for global event"))
}, input.timeout)

GlobalBus.on("event", handler)
input.signal?.addEventListener("abort", abort, { once: true })
})
}
101 changes: 90 additions & 11 deletions packages/opencode/src/control-plane/workspace.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import z from "zod"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
import { Database, asc, eq } from "@/storage/db"
import { Database, asc, eq, inArray } from "@/storage/db"
import { Project } from "@/project/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
Expand All @@ -22,6 +22,8 @@ import { SessionTable } from "@/session/session.sql"
import { SessionID } from "@/session/schema"
import { errorData } from "@/util/error"
import { AppRuntime } from "@/effect/app-runtime"
import { EventSequenceTable } from "@/sync/event.sql"
import { waitEvent } from "./util"

export namespace Workspace {
export const Info = WorkspaceInfo.meta({
Expand Down Expand Up @@ -114,6 +116,17 @@ export namespace Workspace {

startSync(info)

await waitEvent({
timeout: TIMEOUT,
fn(event) {
if (event.workspace === info.id && event.payload.type === Event.Status.type) {
const { status } = event.payload.properties
return status === "error" || status === "connected"
}
return false
},
})

return info
})

Expand Down Expand Up @@ -285,10 +298,15 @@ export namespace Workspace {
return spaces
}

export const get = fn(WorkspaceID.zod, async (id) => {
function lookup(id: WorkspaceID) {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
const space = fromRow(row)
return fromRow(row)
}

export const get = fn(WorkspaceID.zod, async (id) => {
const space = lookup(id)
if (!space) return
startSync(space)
return space
})
Expand Down Expand Up @@ -320,12 +338,18 @@ export namespace Workspace {

const connections = new Map<WorkspaceID, ConnectionStatus>()
const aborts = new Map<WorkspaceID, AbortController>()
const TIMEOUT = 5000

function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
const prev = connections.get(id)
if (prev?.status === status && prev?.error === error) return
const next = { workspaceID: id, status, error }
connections.set(id, next)

if (status === "error") {
aborts.delete(id)
}

GlobalBus.emit("event", {
directory: "global",
workspace: id,
Expand All @@ -340,6 +364,52 @@ export namespace Workspace {
return [...connections.values()]
}

function synced(state: Record<string, number>) {
const ids = Object.keys(state)
if (ids.length === 0) return true

const done = Object.fromEntries(
Database.use((db) =>
db
.select({
id: EventSequenceTable.aggregate_id,
seq: EventSequenceTable.seq,
})
.from(EventSequenceTable)
.where(inArray(EventSequenceTable.aggregate_id, ids))
.all(),
).map((row) => [row.id, row.seq]),
) as Record<string, number>

return ids.every((id) => {
return (done[id] ?? -1) >= state[id]
})
}

export async function isSyncing(workspaceID: WorkspaceID) {
return aborts.has(workspaceID)
}

export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
if (synced(state)) return

try {
await waitEvent({
timeout: TIMEOUT,
signal,
fn(event) {
if (event.workspace !== workspaceID && event.payload.type !== "sync") {
return false
}
return synced(state)
},
})
} catch (error) {
if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
}
}

const log = Log.create({ service: "workspace-sync" })

function route(url: string | URL, path: string) {
Expand All @@ -353,6 +423,7 @@ export namespace Workspace {
async function syncWorkspace(space: Info, signal: AbortSignal) {
while (!signal.aborted) {
log.info("connecting to global sync", { workspace: space.name })
setStatus(space.id, "connecting")

const adaptor = await getAdaptor(space.projectID, space.type)
const target = await adaptor.target(space)
Expand All @@ -364,7 +435,7 @@ export namespace Workspace {
headers: target.headers,
signal,
}).catch((err: unknown) => {
setStatus(space.id, "error")
setStatus(space.id, "error", err instanceof Error ? err.message : String(err))

log.info("failed to connect to global sync", {
workspace: space.name,
Expand All @@ -374,8 +445,9 @@ export namespace Workspace {
})

if (!res || !res.ok || !res.body) {
log.info("failed to connect to global sync", { workspace: space.name })
setStatus(space.id, "error")
const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
log.info("failed to connect to global sync", { workspace: space.name, error })
setStatus(space.id, "error", error)
await sleep(1000)
continue
}
Expand Down Expand Up @@ -414,22 +486,29 @@ export namespace Workspace {
}
}

function startSync(space: Info) {
async function startSync(space: Info) {
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return

if (space.type === "worktree") {
void Filesystem.exists(space.directory!).then((exists) => {
const adaptor = await getAdaptor(space.projectID, space.type)
const target = await adaptor.target(space)

if (target.type === "local") {
void Filesystem.exists(target.directory).then((exists) => {
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
})
return
}

if (aborts.has(space.id)) return
if (aborts.has(space.id)) return true

setStatus(space.id, "disconnected")

const abort = new AbortController()
aborts.set(space.id, abort)
setStatus(space.id, "disconnected")

void syncWorkspace(space, abort.signal).catch((error) => {
aborts.delete(space.id)

setStatus(space.id, "error", String(error))
log.warn("workspace listener failed", {
workspaceID: space.id,
Expand Down
4 changes: 3 additions & 1 deletion packages/opencode/src/flag/flag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ export namespace Flag {
Config.withDefault(false),
)
export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE")
export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN")
export const OPENCODE_MODELS_URL = process.env["OPENCODE_MODELS_URL"]
export const OPENCODE_MODELS_PATH = process.env["OPENCODE_MODELS_PATH"]
Expand All @@ -84,6 +83,9 @@ export namespace Flag {
export const OPENCODE_SKIP_MIGRATIONS = truthy("OPENCODE_SKIP_MIGRATIONS")
export const OPENCODE_STRICT_CONFIG_DEPS = truthy("OPENCODE_STRICT_CONFIG_DEPS")

export const OPENCODE_WORKSPACE_ID = process.env["OPENCODE_WORKSPACE_ID"]
export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")

function number(key: string) {
const value = process.env[key]
if (!value) return undefined
Expand Down
81 changes: 81 additions & 0 deletions packages/opencode/src/server/fence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { MiddlewareHandler } from "hono"
import { Database, inArray } from "@/storage/db"
import { EventSequenceTable } from "@/sync/event.sql"
import { Workspace } from "@/control-plane/workspace"
import type { WorkspaceID } from "@/control-plane/schema"
import { Log } from "@/util/log"

const HEADER = "x-opencode-sync"
type State = Record<string, number>
const log = Log.create({ service: "fence" })

export function load(ids?: string[]) {
const rows = Database.use((db) => {
if (!ids?.length) {
return db.select().from(EventSequenceTable).all()
}

return db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, ids)).all()
})

return Object.fromEntries(rows.map((row) => [row.aggregate_id, row.seq])) as State
}

export function diff(prev: State, next: State) {
const ids = new Set([...Object.keys(prev), ...Object.keys(next)])
return Object.fromEntries(
[...ids]
.map((id) => [id, next[id] ?? -1] as const)
.filter(([id, seq]) => {
return (prev[id] ?? -1) !== seq
}),
) as State
}

export function parse(headers: Headers) {
const raw = headers.get(HEADER)
if (!raw) return

let data

try {
data = JSON.parse(raw)
} catch (err) {
return
}

if (!data || typeof data !== "object") return

return Object.fromEntries(
Object.entries(data).filter(([id, seq]) => {
return typeof id === "string" && Number.isInteger(seq)
}),
) as State
}

export async function wait(workspaceID: WorkspaceID, state: State, signal?: AbortSignal) {
log.info("waiting for state", {
workspaceID,
state,
})
await Workspace.waitForSync(workspaceID, state, signal)
log.info("state fully synced", {
workspaceID,
state,
})
}

export const FenceMiddleware: MiddlewareHandler = async (c, next) => {
if (c.req.method === "GET" || c.req.method === "HEAD" || c.req.method === "OPTIONS") return next()

const prev = load()
await next()
const current = diff(prev, load())

if (Object.keys(current).length > 0) {
log.info("header", {
diff: current,
})
c.res.headers.set(HEADER, JSON.stringify(current))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this end up being too large?
It depends on the runtime, but Bun has 16k limit on sum of all headers.

Can be changed thou: https://bun.com/docs/runtime#param-max-http-header-size

}
}
9 changes: 5 additions & 4 deletions packages/opencode/src/server/instance/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ServerProxy } from "../proxy"
import { Filesystem } from "@/util/filesystem"
import { Instance } from "@/project/instance"
import { InstanceBootstrap } from "@/project/bootstrap"
import { Flag } from "@/flag/flag"
import { Session } from "@/session"
import { SessionID } from "@/session/schema"
import { WorkspaceContext } from "@/control-plane/workspace-context"
Expand Down Expand Up @@ -68,10 +69,10 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
const sessionWorkspaceID = await getSessionWorkspace(url)
const workspaceID = sessionWorkspaceID || url.searchParams.get("workspace")

if (!workspaceID || url.pathname.startsWith("/console") || OPENCODE_WORKSPACE) {
if (OPENCODE_WORKSPACE) {
if (!workspaceID || url.pathname.startsWith("/console") || Flag.OPENCODE_WORKSPACE_ID) {
if (Flag.OPENCODE_WORKSPACE_ID) {
return WorkspaceContext.provide({
workspaceID: WorkspaceID.make(OPENCODE_WORKSPACE),
workspaceID: WorkspaceID.make(Flag.OPENCODE_WORKSPACE_ID),
async fn() {
return Instance.provide({
directory,
Expand Down Expand Up @@ -148,6 +149,6 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
headers.delete("x-opencode-workspace")

const req = new Request(c.req.raw, { headers })
return ServerProxy.http(proxyURL, target.headers, req)
return ServerProxy.http(proxyURL, target.headers, req, workspace.id)
}
}
Loading
Loading