Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ FROM base AS runner

ARG UV_VERSION=latest
ARG OPENCODE_VERSION=latest
# Bump TOOLS_CACHEBUST (e.g. via --build-arg) to force a fresh uv/opencode
# install without invalidating the rest of the build cache.
ARG TOOLS_CACHEBUST=0

RUN echo "Installing uv=${UV_VERSION} opencode=${OPENCODE_VERSION}" && \
RUN echo "Installing uv=${UV_VERSION} opencode=${OPENCODE_VERSION} (cachebust=${TOOLS_CACHEBUST})" && \
curl -LsSf https://astral.sh/uv/install.sh | UV_NO_MODIFY_PATH=1 sh && \
mv /root/.local/bin/uv /usr/local/bin/uv && \
mv /root/.local/bin/uvx /usr/local/bin/uvx && \
Expand Down
49 changes: 49 additions & 0 deletions backend/src/routes/sse-writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
export interface QueuedSSEWriterInput {
write: (chunk: Uint8Array) => Promise<unknown> | void
onError: (error: unknown) => void
}

export interface QueuedSSEWriter {
writeSSE: (event: string, data: string) => void
writeFrame: (frame: Uint8Array) => void
close: () => void
}

const sharedEncoder = new TextEncoder()

export function encodeSSEFrame(event: string, data: string): Uint8Array {
const head = event ? `event: ${event}\n` : ''
return sharedEncoder.encode(`${head}data: ${data}\n\n`)
}

export function createQueuedSSEWriter(input: QueuedSSEWriterInput): QueuedSSEWriter {
let chain = Promise.resolve()
let closed = false

const writeFrame = (frame: Uint8Array) => {
if (closed) return

chain = chain
.then(async () => {
if (closed) return
await input.write(frame)
})
.catch((error) => {
if (!closed) {
closed = true
input.onError(error)
}
})
}

const writeSSE = (event: string, data: string) => {
if (closed) return
writeFrame(encodeSSEFrame(event, data))
}

const close = () => {
closed = true
}

return { writeSSE, writeFrame, close }
}
48 changes: 23 additions & 25 deletions backend/src/routes/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { sseAggregator } from '../services/sse-aggregator'
import { SSESubscribeSchema, SSEVisibilitySchema } from '@opencode-manager/shared/schemas'
import { logger } from '../utils/logger'
import { DEFAULTS } from '@opencode-manager/shared/config'
import { createQueuedSSEWriter } from './sse-writer'

const { HEARTBEAT_INTERVAL_MS } = DEFAULTS.SSE

Expand All @@ -21,42 +22,39 @@ export function createSSERoutes() {
c.header('X-Accel-Buffering', 'no')

return stream(c, async (writer) => {
const encoder = new TextEncoder()
const writeSSE = (event: string, data: string) => {
const lines = []
if (event) lines.push(`event: ${event}`)
lines.push(`data: ${data}`)
lines.push('')
lines.push('')
writer.write(encoder.encode(lines.join('\n')))
}

const cleanup = sseAggregator.addClient(
let cleanup: () => void = () => {}

const queuedWriter = createQueuedSSEWriter({
write: (chunk) => writer.write(chunk),
onError: (error) => {
logger.error(`SSE write failed for ${clientId}:`, error)
clearInterval(heartbeatInterval)
cleanup()
},
})

const heartbeatInterval = setInterval(() => {
queuedWriter.writeSSE('heartbeat', JSON.stringify({ timestamp: Date.now() }))
}, HEARTBEAT_INTERVAL_MS)

cleanup = sseAggregator.addClient(
clientId,
(event, data) => {
writeSSE(event, data)
queuedWriter.writeSSE(event, data)
},
(frame) => {
queuedWriter.writeFrame(frame)
},
directories
)

const heartbeatInterval = setInterval(() => {
try {
writeSSE('heartbeat', JSON.stringify({ timestamp: Date.now() }))
} catch {
clearInterval(heartbeatInterval)
}
}, HEARTBEAT_INTERVAL_MS)

writer.onAbort(() => {
queuedWriter.close()
clearInterval(heartbeatInterval)
cleanup()
})

try {
writeSSE('connected', JSON.stringify({ clientId, directories, ...sseAggregator.getConnectionStatus() }))
} catch (err) {
logger.error(`Failed to send SSE connected event for ${clientId}:`, err)
}
queuedWriter.writeSSE('connected', JSON.stringify({ clientId, directories, ...sseAggregator.getConnectionStatus() }))

await new Promise(() => {})
})
Expand Down
2 changes: 1 addition & 1 deletion backend/src/services/assistant-mode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ export async function warmAssistantWorkspace(deps: {
db: deps.db,
apiBaseUrl: deps.apiBaseUrl,
})
await deps.openCodeClient.getJson('/session', {
await deps.openCodeClient.getJson('/api/session?limit=1&order=desc', {
directory: status.directory,
signal: AbortSignal.timeout(ASSISTANT_WARMUP_OPENCODE_TIMEOUT_MS),
})
Expand Down
10 changes: 8 additions & 2 deletions backend/src/services/opencode-single-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { writeFileContent } from './file-operations'
const MIN_OPENCODE_VERSION = '1.0.137'
const MAX_STDERR_SIZE = 10240
const HEALTH_CHECK_TIMEOUT_MS = 3000
const PLUGIN_INSTALL_TIMEOUT_MS = 120000
const DEPRECATED_PLUGIN_PACKAGES = ['opencode-openai-codex-auth', 'opencode-copilot-auth']

type StartupValidationIssue = {
Expand Down Expand Up @@ -585,19 +586,24 @@ class OpenCodeServerManager {

await fs.mkdir(installDir, { recursive: true })
if (!await fs.access(path.join(installDir, 'package.json')).then(() => true).catch(() => false)) {
const init = spawnSync('bun', ['init', '-y'], { cwd: installDir, encoding: 'utf8' })
const init = spawnSync('bun', ['init', '-y'], { cwd: installDir, encoding: 'utf8', timeout: 30000 })
if (init.status !== 0) {
logger.warn(`Failed to initialize OpenCode plugin cache for ${plugin}: ${init.stderr || init.stdout}`)
continue
}
}

const result = spawnSync('bun', ['add', '--ignore-scripts', installSpec], { cwd: installDir, encoding: 'utf8' })
const result = spawnSync('bun', ['add', '--ignore-scripts', installSpec], { cwd: installDir, encoding: 'utf8', timeout: PLUGIN_INSTALL_TIMEOUT_MS })
if (result.status === 0) {
logger.info(`Installed OpenCode plugin: ${plugin}`)
continue
}

if (result.error) {
logger.warn(`Failed to install OpenCode plugin ${plugin}: ${result.error.message}`)
continue
}

logger.warn(`Failed to install OpenCode plugin ${plugin}: ${result.stderr || result.stdout}`)
}
}
Expand Down
20 changes: 14 additions & 6 deletions backend/src/services/sse-aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ import { logger } from '../utils/logger'
import { ENV } from '@opencode-manager/shared/config/env'
import { DEFAULTS } from '@opencode-manager/shared/config'
import { getOpenCodeBasicAuthHeader, type OpenCodePasswordResolver } from './opencode/auth'
import { encodeSSEFrame } from '../routes/sse-writer'

type SSEClientCallback = (event: string, data: string) => void
type SSEClientFrameWriter = (frame: Uint8Array) => void
type SSEEventListener = (directory: string, event: SSEEvent) => void

interface SSEClient {
id: string
callback: SSEClientCallback
writeFrame: SSEClientFrameWriter
directories: Set<string>
visible: boolean
activeSessionId: string | null
Expand Down Expand Up @@ -95,10 +98,11 @@ class SSEAggregator {
void this.connectUpstream()
}

addClient(id: string, callback: SSEClientCallback, directories: string[]): () => void {
addClient(id: string, callback: SSEClientCallback, writeFrame: SSEClientFrameWriter, directories: string[]): () => void {
const client: SSEClient = {
id,
callback,
writeFrame,
directories: new Set(directories),
visible: false,
activeSessionId: null
Expand Down Expand Up @@ -211,7 +215,7 @@ class SSEAggregator {
if (!client || !client.directories.has(directory)) return

for (const item of items) {
const payload = JSON.stringify({ type, properties: item, directory })
const payload = JSON.stringify({ directory, payload: { type, properties: item } })
try {
client.callback('message', payload)
} catch (error) {
Expand Down Expand Up @@ -316,11 +320,13 @@ class SSEAggregator {
try { listener(directory, parsed) } catch { /* ignore listener errors */ }
})

const clientData = JSON.stringify({ ...parsed, directory })
let frame: Uint8Array | undefined
const getFrame = (): Uint8Array => (frame ??= encodeSSEFrame('message', data))

this.clients.forEach((client) => {
if (client.directories.has(directory)) {
try {
client.callback('message', clientData)
client.writeFrame(getFrame())
} catch (error) {
logger.error(`Failed to send to client ${client.id}:`, error)
}
Expand Down Expand Up @@ -478,8 +484,10 @@ export const sseAggregator = SSEAggregator.getInstance()

export function broadcastSSHHostKeyRequest(data: Record<string, unknown>): void {
const event = JSON.stringify({
type: 'ssh.host-key-request',
properties: data,
payload: {
type: 'ssh.host-key-request',
properties: data,
},
})
sseAggregator.broadcastToAll('message', event)
}
88 changes: 88 additions & 0 deletions backend/test/routes/sse-writer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { describe, it, expect, vi } from 'vitest'
import { createQueuedSSEWriter, encodeSSEFrame } from '../../src/routes/sse-writer'

describe('encodeSSEFrame', () => {
const decoder = new TextDecoder()

it('encodes an event frame', () => {
expect(decoder.decode(encodeSSEFrame('message', '{"n":1}'))).toBe('event: message\ndata: {"n":1}\n\n')
})

it('omits the event line when event is empty', () => {
expect(decoder.decode(encodeSSEFrame('', '{"n":1}'))).toBe('data: {"n":1}\n\n')
})
})

describe('createQueuedSSEWriter', () => {
describe('writeFrame', () => {
it('writes a pre-encoded frame through the serialized chain', async () => {
const writes: Uint8Array[] = []
const write = vi.fn((chunk: Uint8Array) => { writes.push(chunk) })
const onError = vi.fn()

const writer = createQueuedSSEWriter({ write, onError })
const frame = encodeSSEFrame('message', '{"shared":true}')
writer.writeFrame(frame)

await vi.waitFor(() => expect(write).toHaveBeenCalledTimes(1))
expect(writes[0]).toBe(frame)
expect(onError).not.toHaveBeenCalled()
})
})

describe('serializes frames in enqueue order', () => {
it('should not execute second write until first resolves', async () => {
let firstWriteResolve!: () => void
const writes: Uint8Array[] = []
const write = vi.fn((chunk: Uint8Array) => {
writes.push(chunk)
if (writes.length === 1) {
return new Promise<void>((resolve) => {
firstWriteResolve = resolve
})
}
})
const onError = vi.fn()

const writer = createQueuedSSEWriter({ write, onError })

writer.writeSSE('message', '{"n":1}')
writer.writeSSE('message', '{"n":2}')

await new Promise((resolve) => setTimeout(resolve, 0))
expect(write).toHaveBeenCalledTimes(1)
expect(onError).not.toHaveBeenCalled()

firstWriteResolve()
await new Promise((resolve) => setTimeout(resolve, 0))
expect(write).toHaveBeenCalledTimes(2)

const decoder = new TextDecoder()
expect(decoder.decode(writes[0])).toBe('event: message\ndata: {"n":1}\n\n')
expect(decoder.decode(writes[1])).toBe('event: message\ndata: {"n":2}\n\n')
})
})

describe('stops writing after a write failure', () => {
it('should call onError and skip subsequent writes', async () => {
const write = vi.fn().mockRejectedValueOnce(new Error('write failed'))
const onError = vi.fn()

const writer = createQueuedSSEWriter({ write, onError })

writer.writeSSE('message', '{"n":1}')

await vi.waitFor(() => {
expect(onError).toHaveBeenCalledTimes(1)
})
expect(onError).toHaveBeenCalledWith(new Error('write failed'))
expect(write).toHaveBeenCalledTimes(1)

writer.writeSSE('message', '{"n":2}')

await vi.waitFor(() => {
expect(write).toHaveBeenCalledTimes(1)
})
})
})
})
4 changes: 2 additions & 2 deletions backend/test/services/assistant-mode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ describe('warmAssistantWorkspace', () => {
})
afterEach(async () => { await ws.cleanup() })

it('provisions the workspace and triggers a directory-scoped session request', async () => {
it('provisions the workspace and triggers a bounded directory-scoped session request', async () => {
const getJsonCalls: Array<{ path: string; directory?: string }> = []
const client = {
getJson: async (requestPath: string, opts?: { directory?: string }) => {
Expand All @@ -634,7 +634,7 @@ describe('warmAssistantWorkspace', () => {
const opencodeJson = await readFile(path.join(ws.assistantDir, 'opencode.json'), 'utf8')
expect(JSON.parse(opencodeJson).default_agent).toBe('assistant')
expect(getJsonCalls).toHaveLength(1)
expect(getJsonCalls[0]?.path).toBe('/session')
expect(getJsonCalls[0]?.path).toBe('/api/session?limit=1&order=desc')
expect(getJsonCalls[0]?.directory).toBe(ws.assistantDir)
})

Expand Down
Loading