Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/lab-notes/2026-04-20-coding-cli-session-contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ The implementation plan file is dated `2026-04-19` because the design work was w
"opencode": {
"executable": "opencode",
"resolvedPath": "/home/user/.opencode/bin/opencode",
"version": "1.14.39",
"version": "1.14.40",
"runCommandTemplate": "opencode run <prompt> --format json --dangerously-skip-permissions",
"serveCommandTemplate": "opencode serve --hostname 127.0.0.1 --port <port>",
"globalHealthPath": "/global/health",
Expand Down Expand Up @@ -290,7 +290,7 @@ command -v opencode
# /home/user/.opencode/bin/opencode

opencode --version
# 1.14.39
# 1.14.40
```

Fresh isolated runs were probed with:
Expand All @@ -315,7 +315,7 @@ curl http://127.0.0.1:<port>/session/status

Observed control behavior:

- `/global/health` returned a healthy payload with version `1.14.39`.
- `/global/health` returned a healthy payload with version `1.14.40`.
- `/session/status` returned `{}` while idle.
- During an attached `opencode run ... --attach http://127.0.0.1:<port>`, `/session/status` returned the same authoritative `sessionID` with `{ "type": "busy" }`.

Expand Down
194 changes: 140 additions & 54 deletions server/coding-cli/opencode-activity-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ import { EventEmitter } from 'events'
import { z } from 'zod'
import type { OpencodeServerEndpoint } from '../local-port.js'
import { logger } from '../logger.js'
import {
confirmOpencodeAssociation,
createOpencodeOwnershipState,
reduceOpencodeOwnership,
rejectOpencodeAssociation,
type OpencodeObservation,
type OpencodeOwnershipAction,
type OpencodeOwnershipState,
} from './opencode-ownership-reducer.js'

export const OPENCODE_HEALTH_POLL_MS = 200
// Applies per health-wait cycle; connection failures restart the cycle after backoff.
Expand All @@ -23,6 +32,17 @@ export type OpencodeActivityChange = {
remove: string[]
}

export type OpencodeAssociationRequestedEvent = {
terminalId: string
sessionId: string
}

export type OpencodeTurnCompleteEvent = {
terminalId: string
sessionId: string
at: number
}

const SessionIdleStatusSchema = z.object({
type: z.literal('idle'),
}).passthrough()
Expand Down Expand Up @@ -92,6 +112,7 @@ type MonitorState = {
reconnectDelayMs: number
reconnectTimer?: ReturnType<typeof setTimeout>
reconnectResolve?: () => void
ownership: OpencodeOwnershipState
}

function createAbortError(): Error {
Expand Down Expand Up @@ -148,21 +169,6 @@ function parseOpencodeEvent(data: string): z.infer<typeof OpencodeEventSchema> |
return parsedEvent.data
}

function extractBusySessionId(
snapshot: Record<string, z.infer<typeof SessionStatusSchema>>,
currentSessionId?: string,
): string | undefined {
const busySessionIds = Object.entries(snapshot)
.filter(([, status]) => status.type !== 'idle')
.map(([sessionId]) => sessionId)
.sort()
if (busySessionIds.length === 0) return undefined
if (currentSessionId && busySessionIds.includes(currentSessionId)) {
return currentSessionId
}
return busySessionIds[0]
}

export class OpencodeActivityTracker extends EventEmitter {
private readonly records = new Map<string, OpencodeActivityRecord>()
private readonly monitors = new Map<string, MonitorState>()
Expand All @@ -172,6 +178,8 @@ export class OpencodeActivityTracker extends EventEmitter {
private readonly setTimeoutFn: typeof setTimeout
private readonly clearTimeoutFn: typeof clearTimeout
private readonly random: () => number
private nextCycleId = 0
private nextStreamId = 0

constructor(input: {
fetchImpl?: FetchLike
Expand All @@ -198,14 +206,15 @@ export class OpencodeActivityTracker extends EventEmitter {
return this.records.get(terminalId)
}

trackTerminal(input: { terminalId: string; endpoint: OpencodeServerEndpoint }): void {
trackTerminal(input: { terminalId: string; endpoint: OpencodeServerEndpoint; sessionId?: string }): void {
const existing = this.monitors.get(input.terminalId)
if (
existing
&& existing.endpoint.hostname === input.endpoint.hostname
&& existing.endpoint.port === input.endpoint.port
&& !existing.disposed
) {
existing.ownership = createOpencodeOwnershipState(input.sessionId)
return
}

Expand All @@ -216,6 +225,7 @@ export class OpencodeActivityTracker extends EventEmitter {
endpoint: input.endpoint,
disposed: false,
reconnectDelayMs: OPENCODE_RECONNECT_BASE_MS,
ownership: createOpencodeOwnershipState(input.sessionId),
}
this.monitors.set(input.terminalId, monitor)
void this.runMonitor(monitor)
Expand Down Expand Up @@ -247,11 +257,11 @@ export class OpencodeActivityTracker extends EventEmitter {
while (!monitor.disposed) {
const controller = new AbortController()
monitor.controller = controller
const cycleId = ++this.nextCycleId
try {
await this.waitForHealth(monitor, controller.signal)
await this.refreshSnapshot(monitor, controller.signal)
monitor.reconnectDelayMs = OPENCODE_RECONNECT_BASE_MS
await this.consumeEvents(monitor, controller.signal)
await this.consumeEvents(monitor, cycleId, controller.signal)
} catch (error) {
if (monitor.disposed || isAbortError(error)) {
return
Expand Down Expand Up @@ -294,7 +304,12 @@ export class OpencodeActivityTracker extends EventEmitter {
}
}

private async refreshSnapshot(monitor: MonitorState, signal: AbortSignal): Promise<void> {
private async refreshSnapshot(
monitor: MonitorState,
cycleId: number,
streamId: number,
signal: AbortSignal,
): Promise<void> {
const response = await this.fetchImpl(this.buildUrl(monitor.endpoint, '/session/status'), {
signal,
})
Expand All @@ -307,22 +322,16 @@ export class OpencodeActivityTracker extends EventEmitter {
throw new Error('OpenCode session status response did not match the expected schema.')
}

const current = this.records.get(monitor.terminalId)
const busySessionId = extractBusySessionId(parsed.data, current?.sessionId)
if (!busySessionId) {
this.removeRecord(monitor.terminalId)
return
}

this.upsertRecord({
terminalId: monitor.terminalId,
sessionId: busySessionId,
phase: 'busy',
updatedAt: this.now(),
this.observe(monitor, {
kind: 'snapshot',
cycleId,
streamId,
statuses: parsed.data,
at: this.now(),
})
}

private async consumeEvents(monitor: MonitorState, signal: AbortSignal): Promise<void> {
private async consumeEvents(monitor: MonitorState, cycleId: number, signal: AbortSignal): Promise<void> {
const response = await this.fetchImpl(this.buildUrl(monitor.endpoint, '/event'), {
signal,
headers: { accept: 'text/event-stream' },
Expand All @@ -335,6 +344,8 @@ export class OpencodeActivityTracker extends EventEmitter {
const decoder = new TextDecoder()
const abortPromise = createAbortPromise(signal)
let buffer = ''
let connected = false
const streamId = ++this.nextStreamId

try {
while (true) {
Expand All @@ -355,7 +366,13 @@ export class OpencodeActivityTracker extends EventEmitter {
while (separatorIndex >= 0) {
const block = buffer.slice(0, separatorIndex)
buffer = buffer.slice(separatorIndex + 2)
this.handleSseBlock(monitor.terminalId, block)
const event = this.parseSseBlock(monitor.terminalId, block)
if (event?.type === 'server.connected' && !connected) {
connected = true
await this.refreshSnapshot(monitor, cycleId, streamId, signal)
} else if (event && event.type !== 'server.connected') {
this.handleOpencodeEvent(monitor, cycleId, streamId, event)
}
separatorIndex = buffer.indexOf('\n\n')
}
}
Expand All @@ -368,9 +385,12 @@ export class OpencodeActivityTracker extends EventEmitter {
}
}

private handleSseBlock(terminalId: string, block: string): void {
private parseSseBlock(
terminalId: string,
block: string,
): z.infer<typeof OpencodeEventSchema> | undefined {
const data = parseSseData(block)
if (!data) return
if (!data) return undefined

let event: z.infer<typeof OpencodeEventSchema> | undefined
try {
Expand All @@ -382,28 +402,101 @@ export class OpencodeActivityTracker extends EventEmitter {
endpoint,
err: error,
}, 'OpenCode event payload was invalid; skipping payload.')
return
return undefined
}

if (!event) return
if (event.type === 'server.connected') return
return event
}

private handleOpencodeEvent(
monitor: MonitorState,
cycleId: number,
streamId: number,
event: Exclude<z.infer<typeof OpencodeEventSchema>, { type: 'server.connected' }>,
): void {
if (event.type === 'session.idle') {
this.removeRecordForSession(terminalId, event.properties.sessionID)
return
}
if (event.properties.status.type === 'idle') {
this.removeRecordForSession(terminalId, event.properties.sessionID)
this.observe(monitor, {
kind: 'sse',
cycleId,
streamId,
sessionId: event.properties.sessionID,
status: 'idle',
at: this.now(),
})
return
}

this.upsertRecord({
terminalId,
this.observe(monitor, {
kind: 'sse',
cycleId,
streamId,
sessionId: event.properties.sessionID,
phase: 'busy',
updatedAt: this.now(),
status: event.properties.status.type,
at: this.now(),
})
}

confirmSessionAssociation(input: { terminalId: string; sessionId: string }): void {
const monitor = this.monitors.get(input.terminalId)
if (!monitor || monitor.disposed) return
const result = confirmOpencodeAssociation(monitor.ownership, { sessionId: input.sessionId })
monitor.ownership = result.state
this.applyActions(monitor.terminalId, result.actions)
}

rejectSessionAssociation(input: { terminalId: string; sessionId: string }): void {
const monitor = this.monitors.get(input.terminalId)
if (!monitor || monitor.disposed) return
const result = rejectOpencodeAssociation(monitor.ownership, { sessionId: input.sessionId })
monitor.ownership = result.state
this.applyActions(monitor.terminalId, result.actions)
}

private observe(monitor: MonitorState, observation: OpencodeObservation): void {
const result = reduceOpencodeOwnership(monitor.ownership, observation)
monitor.ownership = result.state
this.applyActions(monitor.terminalId, result.actions)
}

private applyActions(terminalId: string, actions: OpencodeOwnershipAction[]): void {
for (const action of actions) {
if (action.kind === 'activityUpsert') {
this.upsertRecord({
terminalId,
sessionId: action.sessionId,
phase: 'busy',
updatedAt: action.at,
})
continue
}
if (action.kind === 'activityRemove') {
this.removeRecord(terminalId)
continue
}
if (action.kind === 'requestAssociation') {
this.emit('association.requested', {
terminalId,
sessionId: action.sessionId,
} satisfies OpencodeAssociationRequestedEvent)
continue
}
if (action.kind === 'turnComplete') {
this.emit('turn.complete', {
terminalId,
sessionId: action.sessionId,
at: action.at,
} satisfies OpencodeTurnCompleteEvent)
continue
}
if (action.kind === 'warnAmbiguous') {
this.log.warn({
terminalId,
sessionIds: action.sessionIds,
}, 'OpenCode endpoint reported ambiguous session ownership; suppressing durable adoption.')
}
}
}

private async sleepWithBackoff(monitor: MonitorState): Promise<void> {
const baseDelay = monitor.reconnectDelayMs
const jitter = Math.floor(baseDelay * 0.1 * this.random())
Expand Down Expand Up @@ -454,13 +547,6 @@ export class OpencodeActivityTracker extends EventEmitter {
return `http://${endpoint.hostname}:${endpoint.port}${pathname}`
}

private removeRecordForSession(terminalId: string, sessionId: string): void {
const existing = this.records.get(terminalId)
if (!existing) return
if (existing.sessionId && existing.sessionId !== sessionId) return
this.removeRecord(terminalId)
}

private upsertRecord(record: OpencodeActivityRecord): void {
const previous = this.records.get(record.terminalId)
if (
Expand Down
Loading
Loading