From 23f5ca38c05a25d291f07eb4810ffd5696c811c3 Mon Sep 17 00:00:00 2001 From: Dan Shapiro Date: Wed, 6 May 2026 21:33:30 -0700 Subject: [PATCH 1/4] Implement OpenCode turn completion control plane --- .../coding-cli/opencode-activity-tracker.ts | 194 ++++++--- server/coding-cli/opencode-activity-wiring.ts | 33 +- .../coding-cli/opencode-ownership-reducer.ts | 411 ++++++++++++++++++ .../coding-cli/opencode-session-controller.ts | 65 +-- server/index.ts | 64 ++- server/session-association-coordinator.ts | 2 +- server/terminal-registry.ts | 2 + server/ws-handler.ts | 16 + shared/ws-protocol.ts | 11 + src/App.tsx | 17 + src/store/selectors/paneTerminalSelectors.ts | 23 + .../server/opencode-session-flow.test.ts | 25 +- test/server/session-association.test.ts | 7 +- test/server/ws-opencode-activity.test.ts | 34 ++ .../components/App.ws-bootstrap.test.tsx | 89 ++++ .../opencode-activity-tracker.test.ts | 78 ++++ .../opencode-ownership-reducer.test.ts | 228 ++++++++++ .../opencode-session-controller.test.ts | 95 ++++ test/unit/server/terminal-registry.test.ts | 13 + 19 files changed, 1278 insertions(+), 129 deletions(-) create mode 100644 server/coding-cli/opencode-ownership-reducer.ts create mode 100644 test/unit/server/coding-cli/opencode-ownership-reducer.test.ts create mode 100644 test/unit/server/coding-cli/opencode-session-controller.test.ts diff --git a/server/coding-cli/opencode-activity-tracker.ts b/server/coding-cli/opencode-activity-tracker.ts index a5136429..8ca021c1 100644 --- a/server/coding-cli/opencode-activity-tracker.ts +++ b/server/coding-cli/opencode-activity-tracker.ts @@ -2,6 +2,15 @@ import { EventEmitter } from 'events' import { z } from 'zod' import type { OpencodeServerEndpoint } from '../local-port.js' import { logger } from '../logger.js' +import { + confirmOpencodeAssociation, + createOpencodeOwnershipState, + reduceOpencodeOwnership, + rejectOpencodeAssociation, + type OpencodeObservation, + type OpencodeOwnershipAction, + type OpencodeOwnershipState, +} from './opencode-ownership-reducer.js' export const OPENCODE_HEALTH_POLL_MS = 200 // Applies per health-wait cycle; connection failures restart the cycle after backoff. @@ -23,6 +32,17 @@ export type OpencodeActivityChange = { remove: string[] } +export type OpencodeAssociationRequestedEvent = { + terminalId: string + sessionId: string +} + +export type OpencodeTurnCompleteEvent = { + terminalId: string + sessionId: string + at: number +} + const SessionIdleStatusSchema = z.object({ type: z.literal('idle'), }).passthrough() @@ -92,6 +112,7 @@ type MonitorState = { reconnectDelayMs: number reconnectTimer?: ReturnType reconnectResolve?: () => void + ownership: OpencodeOwnershipState } function createAbortError(): Error { @@ -148,21 +169,6 @@ function parseOpencodeEvent(data: string): z.infer | return parsedEvent.data } -function extractBusySessionId( - snapshot: Record>, - currentSessionId?: string, -): string | undefined { - const busySessionIds = Object.entries(snapshot) - .filter(([, status]) => status.type !== 'idle') - .map(([sessionId]) => sessionId) - .sort() - if (busySessionIds.length === 0) return undefined - if (currentSessionId && busySessionIds.includes(currentSessionId)) { - return currentSessionId - } - return busySessionIds[0] -} - export class OpencodeActivityTracker extends EventEmitter { private readonly records = new Map() private readonly monitors = new Map() @@ -172,6 +178,8 @@ export class OpencodeActivityTracker extends EventEmitter { private readonly setTimeoutFn: typeof setTimeout private readonly clearTimeoutFn: typeof clearTimeout private readonly random: () => number + private nextCycleId = 0 + private nextStreamId = 0 constructor(input: { fetchImpl?: FetchLike @@ -198,7 +206,7 @@ export class OpencodeActivityTracker extends EventEmitter { return this.records.get(terminalId) } - trackTerminal(input: { terminalId: string; endpoint: OpencodeServerEndpoint }): void { + trackTerminal(input: { terminalId: string; endpoint: OpencodeServerEndpoint; sessionId?: string }): void { const existing = this.monitors.get(input.terminalId) if ( existing @@ -206,6 +214,7 @@ export class OpencodeActivityTracker extends EventEmitter { && existing.endpoint.port === input.endpoint.port && !existing.disposed ) { + existing.ownership = createOpencodeOwnershipState(input.sessionId) return } @@ -216,6 +225,7 @@ export class OpencodeActivityTracker extends EventEmitter { endpoint: input.endpoint, disposed: false, reconnectDelayMs: OPENCODE_RECONNECT_BASE_MS, + ownership: createOpencodeOwnershipState(input.sessionId), } this.monitors.set(input.terminalId, monitor) void this.runMonitor(monitor) @@ -247,11 +257,11 @@ export class OpencodeActivityTracker extends EventEmitter { while (!monitor.disposed) { const controller = new AbortController() monitor.controller = controller + const cycleId = ++this.nextCycleId try { await this.waitForHealth(monitor, controller.signal) - await this.refreshSnapshot(monitor, controller.signal) monitor.reconnectDelayMs = OPENCODE_RECONNECT_BASE_MS - await this.consumeEvents(monitor, controller.signal) + await this.consumeEvents(monitor, cycleId, controller.signal) } catch (error) { if (monitor.disposed || isAbortError(error)) { return @@ -294,7 +304,12 @@ export class OpencodeActivityTracker extends EventEmitter { } } - private async refreshSnapshot(monitor: MonitorState, signal: AbortSignal): Promise { + private async refreshSnapshot( + monitor: MonitorState, + cycleId: number, + streamId: number, + signal: AbortSignal, + ): Promise { const response = await this.fetchImpl(this.buildUrl(monitor.endpoint, '/session/status'), { signal, }) @@ -307,22 +322,16 @@ export class OpencodeActivityTracker extends EventEmitter { throw new Error('OpenCode session status response did not match the expected schema.') } - const current = this.records.get(monitor.terminalId) - const busySessionId = extractBusySessionId(parsed.data, current?.sessionId) - if (!busySessionId) { - this.removeRecord(monitor.terminalId) - return - } - - this.upsertRecord({ - terminalId: monitor.terminalId, - sessionId: busySessionId, - phase: 'busy', - updatedAt: this.now(), + this.observe(monitor, { + kind: 'snapshot', + cycleId, + streamId, + statuses: parsed.data, + at: this.now(), }) } - private async consumeEvents(monitor: MonitorState, signal: AbortSignal): Promise { + private async consumeEvents(monitor: MonitorState, cycleId: number, signal: AbortSignal): Promise { const response = await this.fetchImpl(this.buildUrl(monitor.endpoint, '/event'), { signal, headers: { accept: 'text/event-stream' }, @@ -335,6 +344,8 @@ export class OpencodeActivityTracker extends EventEmitter { const decoder = new TextDecoder() const abortPromise = createAbortPromise(signal) let buffer = '' + let connected = false + const streamId = ++this.nextStreamId try { while (true) { @@ -355,7 +366,13 @@ export class OpencodeActivityTracker extends EventEmitter { while (separatorIndex >= 0) { const block = buffer.slice(0, separatorIndex) buffer = buffer.slice(separatorIndex + 2) - this.handleSseBlock(monitor.terminalId, block) + const event = this.parseSseBlock(monitor.terminalId, block) + if (event?.type === 'server.connected' && !connected) { + connected = true + await this.refreshSnapshot(monitor, cycleId, streamId, signal) + } else if (event && event.type !== 'server.connected') { + this.handleOpencodeEvent(monitor, cycleId, streamId, event) + } separatorIndex = buffer.indexOf('\n\n') } } @@ -368,9 +385,12 @@ export class OpencodeActivityTracker extends EventEmitter { } } - private handleSseBlock(terminalId: string, block: string): void { + private parseSseBlock( + terminalId: string, + block: string, + ): z.infer | undefined { const data = parseSseData(block) - if (!data) return + if (!data) return undefined let event: z.infer | undefined try { @@ -382,28 +402,101 @@ export class OpencodeActivityTracker extends EventEmitter { endpoint, err: error, }, 'OpenCode event payload was invalid; skipping payload.') - return + return undefined } - if (!event) return - if (event.type === 'server.connected') return + return event + } + + private handleOpencodeEvent( + monitor: MonitorState, + cycleId: number, + streamId: number, + event: Exclude, { type: 'server.connected' }>, + ): void { if (event.type === 'session.idle') { - this.removeRecordForSession(terminalId, event.properties.sessionID) - return - } - if (event.properties.status.type === 'idle') { - this.removeRecordForSession(terminalId, event.properties.sessionID) + this.observe(monitor, { + kind: 'sse', + cycleId, + streamId, + sessionId: event.properties.sessionID, + status: 'idle', + at: this.now(), + }) return } - this.upsertRecord({ - terminalId, + this.observe(monitor, { + kind: 'sse', + cycleId, + streamId, sessionId: event.properties.sessionID, - phase: 'busy', - updatedAt: this.now(), + status: event.properties.status.type, + at: this.now(), }) } + confirmSessionAssociation(input: { terminalId: string; sessionId: string }): void { + const monitor = this.monitors.get(input.terminalId) + if (!monitor || monitor.disposed) return + const result = confirmOpencodeAssociation(monitor.ownership, { sessionId: input.sessionId }) + monitor.ownership = result.state + this.applyActions(monitor.terminalId, result.actions) + } + + rejectSessionAssociation(input: { terminalId: string; sessionId: string }): void { + const monitor = this.monitors.get(input.terminalId) + if (!monitor || monitor.disposed) return + const result = rejectOpencodeAssociation(monitor.ownership, { sessionId: input.sessionId }) + monitor.ownership = result.state + this.applyActions(monitor.terminalId, result.actions) + } + + private observe(monitor: MonitorState, observation: OpencodeObservation): void { + const result = reduceOpencodeOwnership(monitor.ownership, observation) + monitor.ownership = result.state + this.applyActions(monitor.terminalId, result.actions) + } + + private applyActions(terminalId: string, actions: OpencodeOwnershipAction[]): void { + for (const action of actions) { + if (action.kind === 'activityUpsert') { + this.upsertRecord({ + terminalId, + sessionId: action.sessionId, + phase: 'busy', + updatedAt: action.at, + }) + continue + } + if (action.kind === 'activityRemove') { + this.removeRecord(terminalId) + continue + } + if (action.kind === 'requestAssociation') { + this.emit('association.requested', { + terminalId, + sessionId: action.sessionId, + } satisfies OpencodeAssociationRequestedEvent) + continue + } + if (action.kind === 'turnComplete') { + this.emit('turn.complete', { + terminalId, + sessionId: action.sessionId, + at: action.at, + } satisfies OpencodeTurnCompleteEvent) + continue + } + if (action.kind === 'warnAmbiguous') { + this.log.warn({ + terminalId, + sessionIds: action.sessionIds, + }, 'OpenCode endpoint reported ambiguous session ownership; suppressing durable adoption.') + } + } + } + private async sleepWithBackoff(monitor: MonitorState): Promise { const baseDelay = monitor.reconnectDelayMs const jitter = Math.floor(baseDelay * 0.1 * this.random()) @@ -454,13 +547,6 @@ export class OpencodeActivityTracker extends EventEmitter { return `http://${endpoint.hostname}:${endpoint.port}${pathname}` } - private removeRecordForSession(terminalId: string, sessionId: string): void { - const existing = this.records.get(terminalId) - if (!existing) return - if (existing.sessionId && existing.sessionId !== sessionId) return - this.removeRecord(terminalId) - } - private upsertRecord(record: OpencodeActivityRecord): void { const previous = this.records.get(record.terminalId) if ( diff --git a/server/coding-cli/opencode-activity-wiring.ts b/server/coding-cli/opencode-activity-wiring.ts index aa475235..d623c43b 100644 --- a/server/coding-cli/opencode-activity-wiring.ts +++ b/server/coding-cli/opencode-activity-wiring.ts @@ -1,8 +1,13 @@ import { OpencodeActivityTracker } from './opencode-activity-tracker.js' +import type { + OpencodeActivityChange, + OpencodeTurnCompleteEvent, +} from './opencode-activity-tracker.js' import { OpencodeSessionController } from './opencode-session-controller.js' import type { OpencodeServerEndpoint } from '../local-port.js' import type { BindSessionResult, TerminalRecord } from '../terminal-registry.js' import type { SessionBindingReason } from '../terminal-stream/registry-events.js' +import type { OpencodeSessionAssociatedEvent } from './opencode-session-controller.js' type OpencodeActivityRegistry = { list: () => Array<{ terminalId: string }> @@ -13,12 +18,6 @@ type OpencodeActivityRegistry = { sessionId: string, reason?: SessionBindingReason, ) => BindSessionResult - rebindSession: ( - terminalId: string, - provider: 'opencode', - sessionId: string, - reason?: SessionBindingReason, - ) => BindSessionResult on: (event: string, handler: (...args: any[]) => void) => void off: (event: string, handler: (...args: any[]) => void) => void } @@ -34,6 +33,9 @@ export function wireOpencodeActivityTracker(input: { setTimeoutFn?: typeof setTimeout clearTimeoutFn?: typeof clearTimeout random?: () => number + onActivityChanged?: (payload: OpencodeActivityChange) => void + onAssociated?: (payload: OpencodeSessionAssociatedEvent) => void + onTurnComplete?: (payload: OpencodeTurnCompleteEvent) => void }) { const tracker = new OpencodeActivityTracker({ fetchImpl: input.fetchImpl, @@ -42,10 +44,19 @@ export function wireOpencodeActivityTracker(input: { clearTimeoutFn: input.clearTimeoutFn, random: input.random, }) + if (input.onActivityChanged) { + tracker.on('changed', input.onActivityChanged) + } + if (input.onTurnComplete) { + tracker.on('turn.complete', input.onTurnComplete) + } const controller = new OpencodeSessionController({ tracker, registry: input.registry, }) + if (input.onAssociated) { + controller.on('associated', input.onAssociated) + } const startTracking = (record: TerminalRecord) => { const endpoint = getEndpoint(record) @@ -53,6 +64,7 @@ export function wireOpencodeActivityTracker(input: { tracker.trackTerminal({ terminalId: record.terminalId, endpoint, + sessionId: record.resumeSessionId, }) } @@ -80,6 +92,15 @@ export function wireOpencodeActivityTracker(input: { dispose(): void { input.registry.off('terminal.created', onCreated) input.registry.off('terminal.exit', onExit) + if (input.onActivityChanged) { + tracker.off('changed', input.onActivityChanged) + } + if (input.onTurnComplete) { + tracker.off('turn.complete', input.onTurnComplete) + } + if (input.onAssociated) { + controller.off('associated', input.onAssociated) + } controller.dispose() tracker.dispose() }, diff --git a/server/coding-cli/opencode-ownership-reducer.ts b/server/coding-cli/opencode-ownership-reducer.ts new file mode 100644 index 00000000..db15e7a6 --- /dev/null +++ b/server/coding-cli/opencode-ownership-reducer.ts @@ -0,0 +1,411 @@ +export type OpencodeSessionStatusType = 'idle' | 'busy' | 'retry' + +export type OpencodeSessionStatus = { + type: OpencodeSessionStatusType +} + +export type OpencodeObservation = + | { + kind: 'snapshot' + cycleId: number + streamId: number + statuses: Record + at: number + } + | { + kind: 'sse' + cycleId: number + streamId: number + sessionId: string + status: OpencodeSessionStatusType + at: number + } + +export type OpencodeOwnershipState = + | { + kind: 'quiet' + knownSessionId?: string + } + | { + kind: 'candidate' + sessionId: string + previousKnownSessionId?: string + startedBy: 'snapshot' | 'sse' + cycleId: number + streamId: number + } + | { + kind: 'knownBusy' + sessionId: string + startedBy: 'snapshot' | 'sse' + cycleId: number + streamId: number + } + | { + kind: 'awaitingAssociation' + sessionId: string + previousKnownSessionId?: string + cycleId: number + streamId: number + completedAt: number + } + | { + kind: 'ambiguous' + knownSessionId?: string + blockedSessionIds: string[] + since: number + } + +export type OpencodeOwnershipAction = + | { + kind: 'activityUpsert' + sessionId?: string + at: number + } + | { + kind: 'activityRemove' + at: number + } + | { + kind: 'requestAssociation' + sessionId: string + } + | { + kind: 'turnComplete' + sessionId: string + at: number + } + | { + kind: 'warnAmbiguous' + sessionIds: string[] + } + +export type OpencodeOwnershipResult = { + state: OpencodeOwnershipState + actions: OpencodeOwnershipAction[] +} + +export function createOpencodeOwnershipState(knownSessionId?: string): OpencodeOwnershipState { + return { kind: 'quiet', knownSessionId } +} + +function sortedBusySessionIds(statuses: Record): string[] { + return Object.entries(statuses) + .filter(([, status]) => status.type !== 'idle') + .map(([sessionId]) => sessionId) + .sort() +} + +function uniqueSorted(values: string[]): string[] { + return Array.from(new Set(values)).sort() +} + +function sameSessionStream( + state: Extract, + observation: Extract, +): boolean { + return state.sessionId === observation.sessionId + && state.cycleId === observation.cycleId + && state.streamId === observation.streamId +} + +function enterAmbiguous(input: { + knownSessionId?: string + blockedSessionIds: string[] + at: number +}): OpencodeOwnershipResult { + const blockedSessionIds = uniqueSorted(input.blockedSessionIds) + return { + state: { + kind: 'ambiguous', + knownSessionId: input.knownSessionId, + blockedSessionIds, + since: input.at, + }, + actions: [ + { kind: 'activityUpsert', at: input.at }, + { kind: 'warnAmbiguous', sessionIds: blockedSessionIds }, + ], + } +} + +function reduceBusy( + state: OpencodeOwnershipState, + observation: Extract, +): OpencodeOwnershipResult { + const nextBusyState = { + sessionId: observation.sessionId, + startedBy: 'sse' as const, + cycleId: observation.cycleId, + streamId: observation.streamId, + } + + if (state.kind === 'quiet') { + if (state.knownSessionId === observation.sessionId) { + return { + state: { kind: 'knownBusy', ...nextBusyState }, + actions: [{ kind: 'activityUpsert', sessionId: observation.sessionId, at: observation.at }], + } + } + return { + state: { + kind: 'candidate', + previousKnownSessionId: state.knownSessionId, + ...nextBusyState, + }, + actions: [{ kind: 'activityUpsert', sessionId: observation.sessionId, at: observation.at }], + } + } + + if (state.kind === 'candidate') { + if (state.sessionId === observation.sessionId) { + return { + state: { ...state, cycleId: observation.cycleId, streamId: observation.streamId, startedBy: 'sse' }, + actions: [{ kind: 'activityUpsert', sessionId: observation.sessionId, at: observation.at }], + } + } + return enterAmbiguous({ + knownSessionId: state.previousKnownSessionId, + blockedSessionIds: [state.sessionId, observation.sessionId], + at: observation.at, + }) + } + + if (state.kind === 'knownBusy') { + if (state.sessionId === observation.sessionId) { + return { + state: { ...state, cycleId: observation.cycleId, streamId: observation.streamId, startedBy: 'sse' }, + actions: [{ kind: 'activityUpsert', sessionId: observation.sessionId, at: observation.at }], + } + } + return enterAmbiguous({ + knownSessionId: state.sessionId, + blockedSessionIds: [state.sessionId, observation.sessionId], + at: observation.at, + }) + } + + if (state.kind === 'ambiguous') { + if (state.blockedSessionIds.includes(observation.sessionId)) { + return { + state, + actions: [{ kind: 'activityUpsert', at: observation.at }], + } + } + const blockedSessionIds = uniqueSorted([...state.blockedSessionIds, observation.sessionId]) + return { + state: { ...state, blockedSessionIds }, + actions: [ + { kind: 'activityUpsert', at: observation.at }, + { kind: 'warnAmbiguous', sessionIds: blockedSessionIds }, + ], + } + } + + return { state, actions: [] } +} + +function reduceIdle( + state: OpencodeOwnershipState, + observation: Extract, +): OpencodeOwnershipResult { + if (state.kind === 'candidate') { + if (state.startedBy !== 'sse' || !sameSessionStream(state, observation)) return { state, actions: [] } + return { + state: { + kind: 'awaitingAssociation', + sessionId: state.sessionId, + previousKnownSessionId: state.previousKnownSessionId, + cycleId: state.cycleId, + streamId: state.streamId, + completedAt: observation.at, + }, + actions: [ + { kind: 'activityRemove', at: observation.at }, + { kind: 'requestAssociation', sessionId: state.sessionId }, + ], + } + } + + if (state.kind === 'knownBusy') { + if (state.startedBy !== 'sse' || !sameSessionStream(state, observation)) return { state, actions: [] } + return { + state: { + kind: 'quiet', + knownSessionId: state.sessionId, + }, + actions: [ + { kind: 'activityRemove', at: observation.at }, + { kind: 'turnComplete', sessionId: state.sessionId, at: observation.at }, + ], + } + } + + return { state, actions: [] } +} + +function reduceSnapshot( + state: OpencodeOwnershipState, + observation: Extract, +): OpencodeOwnershipResult { + const busySessionIds = sortedBusySessionIds(observation.statuses) + + if (state.kind === 'ambiguous') { + if (busySessionIds.length === 0) { + return { + state: { kind: 'quiet', knownSessionId: state.knownSessionId }, + actions: [{ kind: 'activityRemove', at: observation.at }], + } + } + const blockedSessionIds = uniqueSorted([...state.blockedSessionIds, ...busySessionIds]) + return { + state: { ...state, blockedSessionIds }, + actions: blockedSessionIds.length === state.blockedSessionIds.length + ? [{ kind: 'activityUpsert', at: observation.at }] + : [ + { kind: 'activityUpsert', at: observation.at }, + { kind: 'warnAmbiguous', sessionIds: blockedSessionIds }, + ], + } + } + + if (state.kind === 'knownBusy') { + if (busySessionIds.length === 0) { + return { + state: { kind: 'quiet', knownSessionId: state.sessionId }, + actions: [{ kind: 'activityRemove', at: observation.at }], + } + } + if (busySessionIds.length === 1 && busySessionIds[0] === state.sessionId) { + return { + state: { ...state, startedBy: 'snapshot', cycleId: observation.cycleId, streamId: observation.streamId }, + actions: [{ kind: 'activityUpsert', sessionId: state.sessionId, at: observation.at }], + } + } + return enterAmbiguous({ + knownSessionId: state.sessionId, + blockedSessionIds: uniqueSorted([state.sessionId, ...busySessionIds]), + at: observation.at, + }) + } + + if (state.kind === 'candidate') { + if (busySessionIds.length === 0) { + return { + state: { kind: 'quiet', knownSessionId: state.previousKnownSessionId }, + actions: [{ kind: 'activityRemove', at: observation.at }], + } + } + if (busySessionIds.length === 1 && busySessionIds[0] === state.sessionId) { + return { + state: { ...state, startedBy: 'snapshot', cycleId: observation.cycleId, streamId: observation.streamId }, + actions: [{ kind: 'activityUpsert', sessionId: state.sessionId, at: observation.at }], + } + } + return enterAmbiguous({ + knownSessionId: state.previousKnownSessionId, + blockedSessionIds: uniqueSorted([state.sessionId, ...busySessionIds]), + at: observation.at, + }) + } + + if (state.kind === 'awaitingAssociation') { + return { state, actions: [] } + } + + if (busySessionIds.length === 0) { + return { + state, + actions: [{ kind: 'activityRemove', at: observation.at }], + } + } + + if (state.knownSessionId && busySessionIds.includes(state.knownSessionId)) { + if (busySessionIds.length === 1) { + return { + state: { + kind: 'knownBusy', + sessionId: state.knownSessionId, + startedBy: 'snapshot', + cycleId: observation.cycleId, + streamId: observation.streamId, + }, + actions: [{ kind: 'activityUpsert', sessionId: state.knownSessionId, at: observation.at }], + } + } + return enterAmbiguous({ + knownSessionId: state.knownSessionId, + blockedSessionIds: busySessionIds, + at: observation.at, + }) + } + + if (busySessionIds.length === 1) { + return { + state: { + kind: 'candidate', + previousKnownSessionId: state.knownSessionId, + sessionId: busySessionIds[0], + startedBy: 'snapshot', + cycleId: observation.cycleId, + streamId: observation.streamId, + }, + actions: [{ kind: 'activityUpsert', sessionId: busySessionIds[0], at: observation.at }], + } + } + + return enterAmbiguous({ + knownSessionId: state.knownSessionId, + blockedSessionIds: busySessionIds, + at: observation.at, + }) +} + +export function reduceOpencodeOwnership( + state: OpencodeOwnershipState, + observation: OpencodeObservation, +): OpencodeOwnershipResult { + if (observation.kind === 'snapshot') { + return reduceSnapshot(state, observation) + } + if (observation.status === 'idle') { + return reduceIdle(state, observation) + } + return reduceBusy(state, observation) +} + +export function confirmOpencodeAssociation( + state: OpencodeOwnershipState, + input: { sessionId: string }, +): OpencodeOwnershipResult { + if (state.kind !== 'awaitingAssociation' || state.sessionId !== input.sessionId) { + return { state, actions: [] } + } + return { + state: { + kind: 'quiet', + knownSessionId: state.sessionId, + }, + actions: [{ + kind: 'turnComplete', + sessionId: state.sessionId, + at: state.completedAt, + }], + } +} + +export function rejectOpencodeAssociation( + state: OpencodeOwnershipState, + input: { sessionId: string }, +): OpencodeOwnershipResult { + if (state.kind !== 'awaitingAssociation' || state.sessionId !== input.sessionId) { + return { state, actions: [] } + } + return { + state: { + kind: 'quiet', + knownSessionId: state.previousKnownSessionId, + }, + actions: [], + } +} diff --git a/server/coding-cli/opencode-session-controller.ts b/server/coding-cli/opencode-session-controller.ts index e9afe145..81ae61c1 100644 --- a/server/coding-cli/opencode-session-controller.ts +++ b/server/coding-cli/opencode-session-controller.ts @@ -3,14 +3,14 @@ import type { SessionBindingReason } from '../terminal-stream/registry-events.js import type { BindSessionResult, TerminalRecord } from '../terminal-registry.js' import { logger } from '../logger.js' import type { - OpencodeActivityChange, - OpencodeActivityRecord, + OpencodeAssociationRequestedEvent, } from './opencode-activity-tracker.js' type OpencodeActivityTrackerLike = { - list: () => OpencodeActivityRecord[] - on: (event: 'changed', handler: (payload: OpencodeActivityChange) => void) => void - off: (event: 'changed', handler: (payload: OpencodeActivityChange) => void) => void + confirmSessionAssociation: (input: { terminalId: string; sessionId: string }) => void + rejectSessionAssociation: (input: { terminalId: string; sessionId: string }) => void + on: (event: 'association.requested', handler: (payload: OpencodeAssociationRequestedEvent) => void) => void + off: (event: 'association.requested', handler: (payload: OpencodeAssociationRequestedEvent) => void) => void } type OpencodeSessionRegistry = { @@ -21,12 +21,6 @@ type OpencodeSessionRegistry = { sessionId: string, reason?: SessionBindingReason, ) => BindSessionResult - rebindSession?: ( - terminalId: string, - provider: 'opencode', - sessionId: string, - reason?: SessionBindingReason, - ) => BindSessionResult on: (event: 'terminal.exit', handler: (payload: { terminalId?: string }) => void) => void off: (event: 'terminal.exit', handler: (payload: { terminalId?: string }) => void) => void } @@ -46,10 +40,8 @@ export class OpencodeSessionController extends EventEmitter { private readonly log: ControllerLogger private readonly associatedSessionIds = new Map() - private readonly handleTrackerChanged = (payload: OpencodeActivityChange) => { - for (const record of payload.upsert) { - this.promoteRecord(record) - } + private readonly handleAssociationRequested = (payload: OpencodeAssociationRequestedEvent) => { + this.promoteAssociation(payload) } private readonly handleTerminalExit = (payload: { terminalId?: string }) => { @@ -67,56 +59,47 @@ export class OpencodeSessionController extends EventEmitter { this.registry = input.registry this.log = input.log ?? logger.child({ component: 'opencode-session-controller' }) - this.tracker.on('changed', this.handleTrackerChanged) + this.tracker.on('association.requested', this.handleAssociationRequested) this.registry.on('terminal.exit', this.handleTerminalExit) - - const existing = this.tracker.list() - if (existing.length > 0) { - this.handleTrackerChanged({ - upsert: existing, - remove: [], - }) - } } dispose(): void { - this.tracker.off('changed', this.handleTrackerChanged) + this.tracker.off('association.requested', this.handleAssociationRequested) this.registry.off('terminal.exit', this.handleTerminalExit) this.associatedSessionIds.clear() } - private promoteRecord(record: OpencodeActivityRecord): void { - if (!record.sessionId) return - - const terminal = this.registry.get(record.terminalId) + private promoteAssociation(request: OpencodeAssociationRequestedEvent): void { + const terminal = this.registry.get(request.terminalId) if (!terminal || terminal.mode !== 'opencode' || terminal.status !== 'running') { + this.tracker.rejectSessionAssociation(request) return } - const previousSessionId = this.associatedSessionIds.get(record.terminalId) ?? terminal.resumeSessionId - if (previousSessionId === record.sessionId) { - this.associatedSessionIds.set(record.terminalId, record.sessionId) + const previousSessionId = this.associatedSessionIds.get(request.terminalId) ?? terminal.resumeSessionId + if (previousSessionId === request.sessionId) { + this.associatedSessionIds.set(request.terminalId, request.sessionId) + this.tracker.confirmSessionAssociation(request) return } - const bind = previousSessionId && this.registry.rebindSession - ? this.registry.rebindSession.bind(this.registry) - : this.registry.bindSession.bind(this.registry) - const result = bind(record.terminalId, 'opencode', record.sessionId, 'association') + const result = this.registry.bindSession(request.terminalId, 'opencode', request.sessionId, 'association') if (!result.ok) { this.log.warn({ - terminalId: record.terminalId, - sessionId: record.sessionId, + terminalId: request.terminalId, + sessionId: request.sessionId, reason: result.reason, }, 'Failed to promote OpenCode durable session from authoritative control data') + this.tracker.rejectSessionAssociation(request) return } - this.associatedSessionIds.set(record.terminalId, record.sessionId) + this.associatedSessionIds.set(request.terminalId, request.sessionId) this.emit('associated', { - terminalId: record.terminalId, - sessionId: record.sessionId, + terminalId: request.terminalId, + sessionId: request.sessionId, } satisfies OpencodeSessionAssociatedEvent) + this.tracker.confirmSessionAssociation(request) } } diff --git a/server/index.ts b/server/index.ts index 0e10e6c0..008edc3c 100644 --- a/server/index.ts +++ b/server/index.ts @@ -197,7 +197,7 @@ async function main() { const terminalMetadata = new TerminalMetadataService() const layoutStore = new LayoutStore() const codexActivity = wireCodexActivityTracker({ registry, codingCliIndexer }) - const opencodeActivity = wireOpencodeActivityTracker({ registry }) + let opencodeActivity: ReturnType | undefined const sessionRepairService = getSessionRepairService({ skipDiscovery: true }) const serverInstanceId = await loadOrCreateServerInstanceId() @@ -335,7 +335,7 @@ async function main() { extensionManager, codexActivityListProvider: () => codexActivity.tracker.list(), agentHistorySource, - opencodeActivityListProvider: () => opencodeActivity.tracker.list(), + opencodeActivityListProvider: () => opencodeActivity?.tracker.list() ?? [], }, ) attachProxyUpgradeHandler(server) @@ -383,24 +383,6 @@ async function main() { codexActivity.tracker.on('changed', (payload) => { wsHandler.broadcastCodexActivityUpdated(payload) }) - opencodeActivity.tracker.on('changed', (payload) => { - wsHandler.broadcastOpencodeActivityUpdated(payload) - }) - opencodeActivity.controller.on('associated', ({ terminalId, sessionId }) => { - try { - broadcastTerminalSessionAssociation({ - wsHandler, - terminalMetadata, - broadcastTerminalMetaUpserts, - provider: 'opencode', - terminalId, - sessionId, - source: 'opencode_controller', - }) - } catch (err) { - log.warn({ err, terminalId, sessionId }, 'Failed to broadcast OpenCode session association') - } - }) const broadcastTerminalMetaUpserts = (upsert: ReturnType) => { if (upsert.length === 0) return @@ -447,6 +429,46 @@ async function main() { } }) + opencodeActivity = wireOpencodeActivityTracker({ + registry, + onActivityChanged: (payload) => { + wsHandler.broadcastOpencodeActivityUpdated(payload) + }, + onAssociated: ({ terminalId, sessionId }) => { + try { + broadcastTerminalSessionAssociation({ + wsHandler, + terminalMetadata, + broadcastTerminalMetaUpserts, + provider: 'opencode', + terminalId, + sessionId, + source: 'opencode_controller', + }) + } catch (err) { + log.warn({ err, terminalId, sessionId }, 'Failed to broadcast OpenCode session association') + } + }, + onTurnComplete: ({ terminalId, sessionId, at }) => { + const terminal = registry.get(terminalId) + if ( + !terminal + || terminal.mode !== 'opencode' + || terminal.status !== 'running' + || terminal.resumeSessionId !== sessionId + ) { + log.warn({ terminalId, sessionId }, 'Suppressed OpenCode turn completion for terminal without current ownership') + return + } + wsHandler.broadcastTerminalTurnComplete({ + terminalId, + provider: 'opencode', + sessionId, + at, + }) + }, + }) + const applyDebugLogging = (enabled: boolean, source: string) => { const nextEnabled = !!enabled setLogLevel(resolveRuntimeLogLevel(nextEnabled)) @@ -846,7 +868,7 @@ async function main() { // 9b. Stop Codex activity tracker listeners and sweep timer codexActivity.dispose() - opencodeActivity.dispose() + opencodeActivity?.dispose() // 10. Stop session repair service await sessionRepairService.stop() diff --git a/server/session-association-coordinator.ts b/server/session-association-coordinator.ts index 0f2b92f2..a31bcd4f 100644 --- a/server/session-association-coordinator.ts +++ b/server/session-association-coordinator.ts @@ -78,7 +78,7 @@ export class SessionAssociationCoordinator { private associationCandidateReason( session: CodingCliSession, ): 'ok' | NonNullable { - if (session.provider === 'codex') return 'provider_managed' + if (session.provider === 'codex' || session.provider === 'opencode') return 'provider_managed' if (!modeSupportsResume(session.provider)) return 'provider_not_supported' if (!session.cwd) return 'missing_cwd' if (session.isSubagent) return 'subagent' diff --git a/server/terminal-registry.ts b/server/terminal-registry.ts index 8a3699a2..b4c33a83 100644 --- a/server/terminal-registry.ts +++ b/server/terminal-registry.ts @@ -873,6 +873,8 @@ export function buildSpawnSpec( ALLOWED_ORIGINS: _allowedOrigins, NODE_ENV: _nodeEnv, npm_lifecycle_script: _npmLifecycleScript, + OPENCODE_SERVER_USERNAME: _opencodeServerUsername, + OPENCODE_SERVER_PASSWORD: _opencodeServerPassword, ...parentEnv } = process.env const env = { diff --git a/server/ws-handler.ts b/server/ws-handler.ts index 1d474186..a0190022 100644 --- a/server/ws-handler.ts +++ b/server/ws-handler.ts @@ -23,6 +23,7 @@ import type { OpencodeActivityRecord, SdkServerMessage, SdkSessionStatus, + TerminalTurnCompleteMessage, TerminalStatusMessage, } from '../shared/ws-protocol.js' import type { ExtensionManager } from './extension-manager.js' @@ -53,6 +54,7 @@ import { OpencodeActivityListResponseSchema, OpencodeActivityListSchema, OpencodeActivityUpdatedSchema, + TerminalTurnCompleteSchema, HelloSchema, PingSchema, ClientDiagnosticSchema, @@ -3360,6 +3362,20 @@ export class WsHandler { this.broadcastAuthenticated(parsed.data) } + broadcastTerminalTurnComplete(msg: Omit): void { + const parsed = TerminalTurnCompleteSchema.safeParse({ + type: 'terminal.turn.complete', + ...msg, + }) + + if (!parsed.success) { + log.warn({ issues: parsed.error.issues }, 'Invalid terminal.turn.complete payload') + return + } + + this.broadcastAuthenticated(parsed.data) + } + /** * Prepare for hot rebind: close all client connections and set the closed * flag so the patched server.close() → this.close() is a no-op. diff --git a/shared/ws-protocol.ts b/shared/ws-protocol.ts index bff6630f..9755ecb2 100644 --- a/shared/ws-protocol.ts +++ b/shared/ws-protocol.ts @@ -127,6 +127,14 @@ export const OpencodeActivityUpdatedSchema = z.object({ remove: z.array(z.string().min(1)), }) +export const TerminalTurnCompleteSchema = z.object({ + type: z.literal('terminal.turn.complete'), + terminalId: z.string().min(1), + provider: z.literal('opencode'), + sessionId: z.string().min(1), + at: z.number().int().nonnegative(), +}) + // ────────────────────────────────────────────────────────────── // SDK content block schemas (from Claude Code NDJSON) // ────────────────────────────────────────────────────────────── @@ -550,6 +558,8 @@ export type OpencodeActivityListResponseMessage = z.infer +export type TerminalTurnCompleteMessage = z.infer + // -- Sessions -- export type SessionsChangedMessage = { @@ -778,6 +788,7 @@ export type ServerMessage = | CodexActivityUpdatedMessage | OpencodeActivityListResponseMessage | OpencodeActivityUpdatedMessage + | TerminalTurnCompleteMessage | SessionsChangedMessage | SettingsUpdatedMessage | UiCommandMessage diff --git a/src/App.tsx b/src/App.tsx index 379406fc..6c3f02b9 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -59,6 +59,8 @@ import { clearDeadTerminals } from '@/store/panesSlice' import { addTerminalRestoreRequestId } from '@/lib/terminal-restore' import { setCodexActivitySnapshot, upsertCodexActivity, removeCodexActivity, resetCodexActivity } from '@/store/codexActivitySlice' import { setOpencodeActivitySnapshot, upsertOpencodeActivity, removeOpencodeActivity, resetOpencodeActivity } from '@/store/opencodeActivitySlice' +import { recordTurnComplete } from '@/store/turnCompletionSlice' +import { selectTabPaneByTerminalId } from '@/store/selectors/paneTerminalSelectors' import { setRegistry, updateServerStatus } from '@/store/extensionsSlice' import { handleSdkMessage } from '@/lib/sdk-message-handler' import { createLogger } from '@/lib/client-logger' @@ -878,6 +880,21 @@ export default function App() { })) } } + if (msg.type === 'terminal.turn.complete') { + const terminalId = typeof msg.terminalId === 'string' ? msg.terminalId : '' + const at = typeof msg.at === 'number' ? msg.at : Date.now() + if (terminalId) { + const location = selectTabPaneByTerminalId(appStore.getState(), terminalId) + if (location) { + dispatch(recordTurnComplete({ + tabId: location.tabId, + paneId: location.paneId, + terminalId, + at, + })) + } + } + } if (msg.type === 'terminal.exit') { const terminalId = msg.terminalId const code = msg.exitCode diff --git a/src/store/selectors/paneTerminalSelectors.ts b/src/store/selectors/paneTerminalSelectors.ts index 4e76d5ca..180848e9 100644 --- a/src/store/selectors/paneTerminalSelectors.ts +++ b/src/store/selectors/paneTerminalSelectors.ts @@ -49,6 +49,19 @@ export function selectTabIdByTerminalId(state: RootState, terminalId: string): s return undefined } +export function selectTabPaneByTerminalId( + state: RootState, + terminalId: string, +): { tabId: string; paneId: string } | undefined { + for (const [tabId, layout] of Object.entries(state.panes.layouts)) { + const paneId = findPaneIdByTerminalId(layout, terminalId) + if (paneId) { + return { tabId, paneId } + } + } + return undefined +} + function findFirstTerminalId(node: PaneNode): string | undefined { if (node.type === 'leaf') { return node.content.kind === 'terminal' ? node.content.terminalId : undefined @@ -63,3 +76,13 @@ function nodeContainsTerminalId(node: PaneNode, terminalId: string): boolean { return nodeContainsTerminalId(node.children[0], terminalId) || nodeContainsTerminalId(node.children[1], terminalId) } + +function findPaneIdByTerminalId(node: PaneNode, terminalId: string): string | undefined { + if (node.type === 'leaf') { + return node.content.kind === 'terminal' && node.content.terminalId === terminalId + ? node.id + : undefined + } + return findPaneIdByTerminalId(node.children[0], terminalId) + ?? findPaneIdByTerminalId(node.children[1], terminalId) +} diff --git a/test/integration/server/opencode-session-flow.test.ts b/test/integration/server/opencode-session-flow.test.ts index b070d540..62cd3078 100644 --- a/test/integration/server/opencode-session-flow.test.ts +++ b/test/integration/server/opencode-session-flow.test.ts @@ -453,8 +453,9 @@ describe('opencode session flow (integration)', () => { } }) - it('promotes an OpenCode terminal only after authoritative control data exposes a canonical session id', async () => { + it('promotes and completes an OpenCode terminal only after live same-stream idle', async () => { vi.useFakeTimers() + const turnCompletions: Array<{ terminalId: string; sessionId: string; at: number }> = [] const fetchImpl = vi.fn(async (input: RequestInfo | URL) => { const url = String(input) if (url.endsWith('/global/health')) { @@ -466,7 +467,17 @@ describe('opencode session flow (integration)', () => { }) } if (url.endsWith('/event')) { - return createSseResponse([{ type: 'server.connected', properties: {} }]) + return createSseResponse([ + { type: 'server.connected', properties: {} }, + { + type: 'session.status', + properties: { + sessionID: OPENCODE_SESSION_ID, + status: { type: 'busy' }, + }, + }, + { type: 'session.idle', properties: { sessionID: OPENCODE_SESSION_ID } }, + ]) } throw new Error(`Unexpected URL: ${url}`) }) @@ -475,6 +486,9 @@ describe('opencode session flow (integration)', () => { registry: registry as any, fetchImpl: fetchImpl as typeof fetch, random: () => 0, + onTurnComplete: (payload) => { + turnCompletions.push(payload) + }, }) try { @@ -497,6 +511,13 @@ describe('opencode session flow (integration)', () => { reason: 'association', }), ]) + expect(turnCompletions).toEqual([ + expect.objectContaining({ + terminalId: record.terminalId, + sessionId: OPENCODE_SESSION_ID, + at: expect.any(Number), + }), + ]) } finally { wiring.dispose() vi.useRealTimers() diff --git a/test/server/session-association.test.ts b/test/server/session-association.test.ts index 26bf8b89..c358b4cd 100644 --- a/test/server/session-association.test.ts +++ b/test/server/session-association.test.ts @@ -775,7 +775,7 @@ describe('Session-Terminal Association via onUpdate', () => { registry.shutdown() }) - it('associates opencode sessions when resume is supported', () => { + it('skips opencode sessions in onUpdate ownership pass', () => { const registry = new TerminalRegistry() const broadcasts: any[] = [] @@ -796,9 +796,8 @@ describe('Session-Terminal Association via onUpdate', () => { }], }], broadcasts) - expect(broadcasts).toHaveLength(1) - expect(broadcasts[0].terminalId).toBe(term.terminalId) - expect(registry.get(term.terminalId)?.resumeSessionId).toBe('opencode-session-123') + expect(broadcasts).toHaveLength(0) + expect(registry.get(term.terminalId)?.resumeSessionId).toBeUndefined() registry.shutdown() }) diff --git a/test/server/ws-opencode-activity.test.ts b/test/server/ws-opencode-activity.test.ts index 171d4fed..dc56f372 100644 --- a/test/server/ws-opencode-activity.test.ts +++ b/test/server/ws-opencode-activity.test.ts @@ -207,4 +207,38 @@ describe('ws opencode activity protocol', () => { authenticated.close() unauthenticated.close() }) + + it('broadcasts terminal.turn.complete only to authenticated sockets', async () => { + const authenticated = new WebSocket(`ws://127.0.0.1:${port}/ws`) + const unauthenticated = new WebSocket(`ws://127.0.0.1:${port}/ws`) + + await Promise.all([ + new Promise((resolve) => authenticated.on('open', () => resolve())), + new Promise((resolve) => unauthenticated.on('open', () => resolve())), + ]) + + authenticated.send(JSON.stringify({ type: 'hello', token: 'opencode-activity-token', protocolVersion: WS_PROTOCOL_VERSION })) + await waitForMessage(authenticated, (msg) => msg.type === 'ready') + + wsHandler.broadcastTerminalTurnComplete({ + terminalId: 'term-opencode-1', + provider: 'opencode', + sessionId: 'session-opencode-1', + at: 1234, + }) + + const completed = await waitForMessage(authenticated, (msg) => msg.type === 'terminal.turn.complete') + expect(completed).toEqual({ + type: 'terminal.turn.complete', + terminalId: 'term-opencode-1', + provider: 'opencode', + sessionId: 'session-opencode-1', + at: 1234, + }) + + await expect(expectNoMatchingMessage(unauthenticated, (msg) => msg.type === 'terminal.turn.complete')).resolves.toBeUndefined() + + authenticated.close() + unauthenticated.close() + }) }) diff --git a/test/unit/client/components/App.ws-bootstrap.test.tsx b/test/unit/client/components/App.ws-bootstrap.test.tsx index fc0dc0e7..4ad9dae2 100644 --- a/test/unit/client/components/App.ws-bootstrap.test.tsx +++ b/test/unit/client/components/App.ws-bootstrap.test.tsx @@ -11,6 +11,7 @@ import panesReducer from '@/store/panesSlice' import tabRegistryReducer from '@/store/tabRegistrySlice' import terminalMetaReducer from '@/store/terminalMetaSlice' import extensionsReducer from '@/store/extensionsSlice' +import turnCompletionReducer from '@/store/turnCompletionSlice' import { networkReducer } from '@/store/networkSlice' import codexActivityReducer, { type CodexActivityState } from '@/store/codexActivitySlice' import opencodeActivityReducer, { type OpencodeActivityState } from '@/store/opencodeActivitySlice' @@ -53,6 +54,17 @@ const defaultServerSettings = createDefaultServerSettings({ loggingDebug: defaultSettings.logging.debug, }) +function stubAudio(): void { + vi.stubGlobal('Audio', vi.fn(() => ({ + preload: '', + volume: 1, + pause: vi.fn(), + play: vi.fn().mockResolvedValue(undefined), + currentTime: 0, + src: '', + }) as unknown as HTMLAudioElement)) +} + function createSettingsState(options: { server?: ServerSettingsPatch local?: LocalSettingsPatch @@ -168,6 +180,7 @@ function createStore(options?: { tabRegistry: tabRegistryReducer, terminalMeta: terminalMetaReducer, extensions: extensionsReducer, + turnCompletion: turnCompletionReducer, }, middleware: (getDefault) => getDefault({ @@ -214,6 +227,13 @@ function createStore(options?: { }, terminalMeta: { byTerminalId: {} }, extensions: { entries: [] }, + turnCompletion: { + seq: 0, + lastEvent: null, + pendingEvents: [], + attentionByTab: {}, + attentionByPane: {}, + }, }, }) } @@ -222,6 +242,7 @@ describe('App WS bootstrap recovery', () => { beforeEach(() => { cleanup() vi.resetAllMocks() + stubAudio() wsMocks.onReconnect.mockReturnValue(() => {}) wsMocks.onDisconnect.mockImplementation((cb: () => void) => { disconnectHandler = cb @@ -257,6 +278,7 @@ describe('App WS bootstrap recovery', () => { afterEach(() => { cleanup() + vi.unstubAllGlobals() }) it('marks connection as auth-required and skips websocket connect when the bootstrap request returns 401', async () => { @@ -908,6 +930,73 @@ describe('App WS bootstrap recovery', () => { }) }) + it('records OpenCode turn completion from the production WebSocket message path', async () => { + const store = createStore({ + tabs: [{ + id: 'tab-opencode', + createRequestId: 'req-opencode', + title: 'OpenCode', + status: 'running', + mode: 'opencode', + shell: 'system', + terminalId: 'term-opencode', + createdAt: 1, + }], + panes: { + layouts: { + 'tab-opencode': { + type: 'leaf', + id: 'pane-opencode', + content: { + kind: 'terminal', + createRequestId: 'req-opencode', + status: 'running', + mode: 'opencode', + shell: 'system', + terminalId: 'term-opencode', + initialCwd: '/workspace', + }, + }, + }, + activePane: { + 'tab-opencode': 'pane-opencode', + }, + }, + }) + wsMocks.isReady = true + wsMocks.serverInstanceId = 'srv-preconnected-opencode-turn-complete' + + render( + + + + ) + + await waitFor(() => { + expect(store.getState().connection.status).toBe('ready') + }) + + act(() => { + messageHandler?.({ + type: 'terminal.turn.complete', + terminalId: 'term-opencode', + provider: 'opencode', + sessionId: 'session-opencode', + at: 1234, + }) + }) + + await waitFor(() => { + expect(store.getState().turnCompletion.lastEvent).toMatchObject({ + tabId: 'tab-opencode', + paneId: 'pane-opencode', + terminalId: 'term-opencode', + at: 1234, + }) + }) + expect(store.getState().turnCompletion.seq).toBe(1) + }) + it('keeps the WS message handler registered after an initial connect failure, so a later ready can recover state', async () => { const store = createStore() diff --git a/test/unit/server/coding-cli/opencode-activity-tracker.test.ts b/test/unit/server/coding-cli/opencode-activity-tracker.test.ts index 79ff176f..54e4db3a 100644 --- a/test/unit/server/coding-cli/opencode-activity-tracker.test.ts +++ b/test/unit/server/coding-cli/opencode-activity-tracker.test.ts @@ -93,6 +93,62 @@ describe('OpencodeActivityTracker', () => { tracker.dispose() }) + it('opens SSE before snapshot and emits completion only after association is confirmed', async () => { + vi.useFakeTimers() + const requestOrder: string[] = [] + const fetchImpl = vi.fn(async (input: RequestInfo | URL) => { + const url = String(input) + if (url.endsWith('/global/health')) { + requestOrder.push('/global/health') + return createJsonResponse({ ok: true }) + } + if (url.endsWith('/event')) { + requestOrder.push('/event') + return createSseResponse([ + { type: 'server.connected', properties: {} }, + { + type: 'session.status', + properties: { + sessionID: 'session-oc', + status: { type: 'busy' }, + }, + }, + { + type: 'session.idle', + properties: { + sessionID: 'session-oc', + }, + }, + ]) + } + if (url.endsWith('/session/status')) { + requestOrder.push('/session/status') + return createJsonResponse({}) + } + throw new Error(`Unexpected URL: ${url}`) + }) + + const tracker = new OpencodeActivityTracker({ fetchImpl: fetchImpl as typeof fetch, random: () => 0 }) + const completions: unknown[] = [] + tracker.on('association.requested', (payload) => { + expect(completions).toEqual([]) + tracker.confirmSessionAssociation(payload) + }) + tracker.on('turn.complete', (payload) => completions.push(payload)) + + tracker.trackTerminal({ terminalId: 'term-oc', endpoint: TEST_ENDPOINT }) + await vi.advanceTimersByTimeAsync(0) + + expect(requestOrder.slice(0, 3)).toEqual(['/global/health', '/event', '/session/status']) + expect(completions).toEqual([{ + terminalId: 'term-oc', + sessionId: 'session-oc', + at: expect.any(Number), + }]) + + tracker.dispose() + }) + it('keeps health polling on connection errors until the endpoint comes up', async () => { vi.useFakeTimers() let healthCalls = 0 @@ -148,6 +204,13 @@ describe('OpencodeActivityTracker', () => { if (url.endsWith('/event')) { return createSseResponse([ { type: 'server.connected', properties: {} }, + { + type: 'session.status', + properties: { + sessionID: 'session-oc', + status: { type: 'busy' }, + }, + }, { type: 'session.status', properties: { @@ -284,7 +347,15 @@ describe('OpencodeActivityTracker', () => { } if (url.endsWith('/event')) { return createRawSseResponse([ + `data: ${JSON.stringify({ type: 'server.connected', properties: {} })}\n\n`, 'data: {not valid json}\n\n', + `data: ${JSON.stringify({ + type: 'session.status', + properties: { + sessionID: 'session-oc', + status: { type: 'busy' }, + }, + })}\n\n`, `data: ${JSON.stringify({ type: 'session.idle', properties: { sessionID: 'session-oc' } })}\n\n`, ]) } @@ -340,6 +411,13 @@ describe('OpencodeActivityTracker', () => { return createRawSseResponse([ `data: ${JSON.stringify({ type: 'server.connected', properties: {} })}\n\n`, `data: ${JSON.stringify({ type: 'session.progress', properties: { percent: 50 } })}\n\n`, + `data: ${JSON.stringify({ + type: 'session.status', + properties: { + sessionID: 'session-oc', + status: { type: 'busy' }, + }, + })}\n\n`, `data: ${JSON.stringify({ type: 'session.idle', properties: { sessionID: 'session-oc' } })}\n\n`, ]) } diff --git a/test/unit/server/coding-cli/opencode-ownership-reducer.test.ts b/test/unit/server/coding-cli/opencode-ownership-reducer.test.ts new file mode 100644 index 00000000..8ff16283 --- /dev/null +++ b/test/unit/server/coding-cli/opencode-ownership-reducer.test.ts @@ -0,0 +1,228 @@ +import { describe, expect, it } from 'vitest' +import { + confirmOpencodeAssociation, + createOpencodeOwnershipState, + reduceOpencodeOwnership, +} from '../../../../server/coding-cli/opencode-ownership-reducer' + +describe('opencode ownership reducer', () => { + it('requests association before completing a fresh live candidate', () => { + let state = createOpencodeOwnershipState() + + let result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'busy', + at: 10, + }) + state = result.state + expect(result.actions).toContainEqual({ + kind: 'activityUpsert', + sessionId: 'session-a', + at: 10, + }) + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'idle', + at: 20, + }) + state = result.state + + expect(result.actions).toEqual([ + { kind: 'activityRemove', at: 20 }, + { kind: 'requestAssociation', sessionId: 'session-a' }, + ]) + + result = confirmOpencodeAssociation(state, { sessionId: 'session-a' }) + + expect(result.state).toEqual({ + kind: 'quiet', + knownSessionId: 'session-a', + }) + expect(result.actions).toEqual([ + { + kind: 'turnComplete', + sessionId: 'session-a', + at: 20, + }, + ]) + }) + + it('completes a known busy interval only from the same live stream', () => { + let state = createOpencodeOwnershipState('session-a') + + let result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'busy', + at: 10, + }) + state = result.state + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 2, + sessionId: 'session-a', + status: 'idle', + at: 20, + }) + + expect(result.state).toEqual(state) + expect(result.actions).toEqual([]) + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'idle', + at: 30, + }) + + expect(result.state).toEqual({ + kind: 'quiet', + knownSessionId: 'session-a', + }) + expect(result.actions).toEqual([ + { kind: 'activityRemove', at: 30 }, + { kind: 'turnComplete', sessionId: 'session-a', at: 30 }, + ]) + }) + + it('treats competing candidate sessions as durable ambiguity and blocks third-session adoption until quiet', () => { + let state = createOpencodeOwnershipState() + + let result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'busy', + at: 10, + }) + state = result.state + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-b', + status: 'busy', + at: 11, + }) + state = result.state + + expect(state).toEqual({ + kind: 'ambiguous', + knownSessionId: undefined, + blockedSessionIds: ['session-a', 'session-b'], + since: 11, + }) + expect(result.actions).toContainEqual({ + kind: 'activityUpsert', + at: 11, + }) + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-c', + status: 'busy', + at: 12, + }) + state = result.state + + expect(state).toEqual({ + kind: 'ambiguous', + knownSessionId: undefined, + blockedSessionIds: ['session-a', 'session-b', 'session-c'], + since: 11, + }) + expect(result.actions).not.toContainEqual(expect.objectContaining({ + kind: 'requestAssociation', + })) + + result = reduceOpencodeOwnership(state, { + kind: 'snapshot', + cycleId: 1, + streamId: 1, + statuses: {}, + at: 30, + }) + + expect(result.state).toEqual({ + kind: 'quiet', + knownSessionId: undefined, + }) + expect(result.actions).toEqual([{ kind: 'activityRemove', at: 30 }]) + }) + + it('never emits turn completion from snapshots', () => { + let state = createOpencodeOwnershipState('session-a') + + let result = reduceOpencodeOwnership(state, { + kind: 'snapshot', + cycleId: 1, + streamId: 1, + statuses: { + 'session-a': { type: 'busy' }, + }, + at: 10, + }) + state = result.state + + result = reduceOpencodeOwnership(state, { + kind: 'snapshot', + cycleId: 2, + streamId: 2, + statuses: {}, + at: 20, + }) + + expect(result.state).toEqual({ + kind: 'quiet', + knownSessionId: 'session-a', + }) + expect(result.actions).toEqual([{ kind: 'activityRemove', at: 20 }]) + expect(result.actions).not.toContainEqual(expect.objectContaining({ + kind: 'turnComplete', + })) + }) + + it('does not associate or complete a snapshot-only busy interval from live idle', () => { + let state = createOpencodeOwnershipState() + + let result = reduceOpencodeOwnership(state, { + kind: 'snapshot', + cycleId: 1, + streamId: 1, + statuses: { + 'session-a': { type: 'busy' }, + }, + at: 10, + }) + state = result.state + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'idle', + at: 20, + }) + + expect(result.state).toEqual(state) + expect(result.actions).toEqual([]) + }) +}) diff --git a/test/unit/server/coding-cli/opencode-session-controller.test.ts b/test/unit/server/coding-cli/opencode-session-controller.test.ts new file mode 100644 index 00000000..b21d8bcd --- /dev/null +++ b/test/unit/server/coding-cli/opencode-session-controller.test.ts @@ -0,0 +1,95 @@ +import { EventEmitter } from 'events' +import { describe, expect, it, vi } from 'vitest' +import { OpencodeSessionController } from '../../../../server/coding-cli/opencode-session-controller' + +class FakeTracker extends EventEmitter { + confirmSessionAssociation = vi.fn() + rejectSessionAssociation = vi.fn() +} + +describe('OpencodeSessionController', () => { + it('uses non-stealing bindSession and confirms successful association requests', () => { + const tracker = new FakeTracker() + const registry = { + get: vi.fn(() => ({ + terminalId: 'term-1', + mode: 'opencode', + status: 'running', + })), + bindSession: vi.fn(() => ({ + ok: true as const, + terminalId: 'term-1', + sessionId: 'session-1', + })), + rebindSession: vi.fn(() => { + throw new Error('rebindSession must not be used for OpenCode control-plane adoption') + }), + on: vi.fn(), + off: vi.fn(), + } + const associated = vi.fn() + const controller = new OpencodeSessionController({ + tracker: tracker as any, + registry: registry as any, + }) + controller.on('associated', associated) + + tracker.emit('association.requested', { + terminalId: 'term-1', + sessionId: 'session-1', + }) + + expect(registry.bindSession).toHaveBeenCalledWith('term-1', 'opencode', 'session-1', 'association') + expect(registry.rebindSession).not.toHaveBeenCalled() + expect(tracker.confirmSessionAssociation).toHaveBeenCalledWith({ + terminalId: 'term-1', + sessionId: 'session-1', + }) + expect(tracker.rejectSessionAssociation).not.toHaveBeenCalled() + expect(associated).toHaveBeenCalledWith({ + terminalId: 'term-1', + sessionId: 'session-1', + }) + + controller.dispose() + }) + + it('rejects association requests when bindSession detects an ownership conflict', () => { + const tracker = new FakeTracker() + const registry = { + get: vi.fn(() => ({ + terminalId: 'term-1', + mode: 'opencode', + status: 'running', + })), + bindSession: vi.fn(() => ({ + ok: false as const, + reason: 'session_already_owned', + owner: 'other-terminal', + })), + on: vi.fn(), + off: vi.fn(), + } + const associated = vi.fn() + const controller = new OpencodeSessionController({ + tracker: tracker as any, + registry: registry as any, + log: { warn: vi.fn() }, + }) + controller.on('associated', associated) + + tracker.emit('association.requested', { + terminalId: 'term-1', + sessionId: 'session-1', + }) + + expect(tracker.confirmSessionAssociation).not.toHaveBeenCalled() + expect(tracker.rejectSessionAssociation).toHaveBeenCalledWith({ + terminalId: 'term-1', + sessionId: 'session-1', + }) + expect(associated).not.toHaveBeenCalled() + + controller.dispose() + }) +}) diff --git a/test/unit/server/terminal-registry.test.ts b/test/unit/server/terminal-registry.test.ts index a4b75fa4..498c2a94 100644 --- a/test/unit/server/terminal-registry.test.ts +++ b/test/unit/server/terminal-registry.test.ts @@ -1027,6 +1027,19 @@ describe('buildSpawnSpec Unix paths', () => { expect(spec.env.OPENCODE_PERMISSION).toBe('{"edit":"allow","bash":"ask"}') }) + + it('scrubs inherited OpenCode server auth env for managed TUI endpoints', () => { + delete process.env.OPENCODE_CMD + process.env.OPENCODE_SERVER_USERNAME = 'user' + process.env.OPENCODE_SERVER_PASSWORD = 'secret' + + const spec = buildSpawnSpec('opencode', '/Users/john/project', 'system', undefined, { + opencodeServer: TEST_OPENCODE_SERVER, + }) + + expect(spec.env).not.toHaveProperty('OPENCODE_SERVER_USERNAME') + expect(spec.env).not.toHaveProperty('OPENCODE_SERVER_PASSWORD') + }) }) describe('environment variables in spawn spec', () => { From 4b4bf3f23af77e0020a809827b829fe94cbad684 Mon Sep 17 00:00:00 2001 From: Dan Shapiro Date: Wed, 6 May 2026 21:36:24 -0700 Subject: [PATCH 2/4] Update OpenCode association coordinator test --- test/unit/server/session-association-coordinator.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/unit/server/session-association-coordinator.test.ts b/test/unit/server/session-association-coordinator.test.ts index f2805c64..0bd9da47 100644 --- a/test/unit/server/session-association-coordinator.test.ts +++ b/test/unit/server/session-association-coordinator.test.ts @@ -91,7 +91,7 @@ describe('SessionAssociationCoordinator', () => { expect(registry.bindSession).toHaveBeenCalledWith('term-1', 'claude', 'session-main', 'association') }) - it('associates opencode sessions with matching unassociated terminals', () => { + it('does not attempt heuristic association for opencode sessions', () => { const registry = { findUnassociatedTerminals: vi.fn(() => [{ terminalId: 'term-2', createdAt: 1_000 }]), bindSession: vi.fn(() => ({ ok: true, terminalId: 'term-2', sessionId: 'session-main' })), @@ -101,9 +101,9 @@ describe('SessionAssociationCoordinator', () => { const result = coordinator.associateSingleSession(createSession({ provider: 'opencode' })) - expect(result).toEqual({ associated: true, terminalId: 'term-2' }) - expect(registry.findUnassociatedTerminals).toHaveBeenCalledWith('opencode', '/repo/project') - expect(registry.bindSession).toHaveBeenCalledWith('term-2', 'opencode', 'session-main', 'association') + expect(result).toEqual({ associated: false, reason: 'provider_managed' }) + expect(registry.findUnassociatedTerminals).not.toHaveBeenCalled() + expect(registry.bindSession).not.toHaveBeenCalled() }) it('skips association when session is already bound to another terminal', () => { From d23f8dea9c1d3751d99c617f1ca11de4627903b4 Mon Sep 17 00:00:00 2001 From: Dan Shapiro Date: Wed, 6 May 2026 21:40:28 -0700 Subject: [PATCH 3/4] Refresh OpenCode real provider version note --- docs/lab-notes/2026-04-20-coding-cli-session-contract.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/lab-notes/2026-04-20-coding-cli-session-contract.md b/docs/lab-notes/2026-04-20-coding-cli-session-contract.md index a1b1fb84..c1975ee2 100644 --- a/docs/lab-notes/2026-04-20-coding-cli-session-contract.md +++ b/docs/lab-notes/2026-04-20-coding-cli-session-contract.md @@ -95,7 +95,7 @@ The implementation plan file is dated `2026-04-19` because the design work was w "opencode": { "executable": "opencode", "resolvedPath": "/home/user/.opencode/bin/opencode", - "version": "1.14.39", + "version": "1.14.40", "runCommandTemplate": "opencode run --format json --dangerously-skip-permissions", "serveCommandTemplate": "opencode serve --hostname 127.0.0.1 --port ", "globalHealthPath": "/global/health", @@ -290,7 +290,7 @@ command -v opencode # /home/user/.opencode/bin/opencode opencode --version -# 1.14.39 +# 1.14.40 ``` Fresh isolated runs were probed with: @@ -315,7 +315,7 @@ curl http://127.0.0.1:/session/status Observed control behavior: -- `/global/health` returned a healthy payload with version `1.14.39`. +- `/global/health` returned a healthy payload with version `1.14.40`. - `/session/status` returned `{}` while idle. - During an attached `opencode run ... --attach http://127.0.0.1:`, `/session/status` returned the same authoritative `sessionID` with `{ "type": "busy" }`. From 940104a7cac4a50ad9604b09f47770d43d725798 Mon Sep 17 00:00:00 2001 From: Dan Shapiro Date: Thu, 7 May 2026 00:35:52 -0700 Subject: [PATCH 4/4] Fix OpenCode turn completion edge cases --- .../coding-cli/opencode-ownership-reducer.ts | 25 +++- src/store/selectors/paneTerminalSelectors.ts | 12 ++ .../components/App.ws-bootstrap.test.tsx | 97 +++++++++++++- .../selectors/paneTerminalSelectors.test.ts | 40 ++++++ .../opencode-activity-tracker.test.ts | 123 ++++++++++++++++++ .../opencode-ownership-reducer.test.ts | 106 ++++++++++++++- 6 files changed, 399 insertions(+), 4 deletions(-) diff --git a/server/coding-cli/opencode-ownership-reducer.ts b/server/coding-cli/opencode-ownership-reducer.ts index db15e7a6..713db1c5 100644 --- a/server/coding-cli/opencode-ownership-reducer.ts +++ b/server/coding-cli/opencode-ownership-reducer.ts @@ -210,7 +210,7 @@ function reduceIdle( observation: Extract, ): OpencodeOwnershipResult { if (state.kind === 'candidate') { - if (state.startedBy !== 'sse' || !sameSessionStream(state, observation)) return { state, actions: [] } + if (!sameSessionStream(state, observation)) return { state, actions: [] } return { state: { kind: 'awaitingAssociation', @@ -228,7 +228,7 @@ function reduceIdle( } if (state.kind === 'knownBusy') { - if (state.startedBy !== 'sse' || !sameSessionStream(state, observation)) return { state, actions: [] } + if (!sameSessionStream(state, observation)) return { state, actions: [] } return { state: { kind: 'quiet', @@ -241,6 +241,27 @@ function reduceIdle( } } + if (state.kind === 'ambiguous') { + if (!state.blockedSessionIds.includes(observation.sessionId)) { + return { state, actions: [] } + } + + const blockedSessionIds = state.blockedSessionIds.filter( + (sessionId) => sessionId !== observation.sessionId, + ) + if (blockedSessionIds.length === 0) { + return { + state: { kind: 'quiet', knownSessionId: state.knownSessionId }, + actions: [{ kind: 'activityRemove', at: observation.at }], + } + } + + return { + state: { ...state, blockedSessionIds }, + actions: [{ kind: 'activityUpsert', at: observation.at }], + } + } + return { state, actions: [] } } diff --git a/src/store/selectors/paneTerminalSelectors.ts b/src/store/selectors/paneTerminalSelectors.ts index 180848e9..41ea779b 100644 --- a/src/store/selectors/paneTerminalSelectors.ts +++ b/src/store/selectors/paneTerminalSelectors.ts @@ -53,7 +53,19 @@ export function selectTabPaneByTerminalId( state: RootState, terminalId: string, ): { tabId: string; paneId: string } | undefined { + const activeTabId = state.tabs.activeTabId + if (activeTabId) { + const activeLayout = state.panes.layouts[activeTabId] + if (activeLayout) { + const activePaneId = findPaneIdByTerminalId(activeLayout, terminalId) + if (activePaneId) { + return { tabId: activeTabId, paneId: activePaneId } + } + } + } + for (const [tabId, layout] of Object.entries(state.panes.layouts)) { + if (tabId === activeTabId) continue const paneId = findPaneIdByTerminalId(layout, terminalId) if (paneId) { return { tabId, paneId } diff --git a/test/unit/client/components/App.ws-bootstrap.test.tsx b/test/unit/client/components/App.ws-bootstrap.test.tsx index 4ad9dae2..6a98f78f 100644 --- a/test/unit/client/components/App.ws-bootstrap.test.tsx +++ b/test/unit/client/components/App.ws-bootstrap.test.tsx @@ -132,6 +132,7 @@ function createStore(options?: { loaded?: boolean } tabs?: Array> + activeTabId?: string | null panes?: { layouts: Record activePane: Record @@ -188,7 +189,7 @@ function createStore(options?: { }), preloadedState: { settings: createSettingsState(options?.settings), - tabs: { tabs, activeTabId: (tabs[0]?.id as string | undefined) ?? null }, + tabs: { tabs, activeTabId: options?.activeTabId ?? ((tabs[0]?.id as string | undefined) ?? null) }, connection: { status: 'disconnected' as const, lastError: undefined, @@ -997,6 +998,100 @@ describe('App WS bootstrap recovery', () => { expect(store.getState().turnCompletion.seq).toBe(1) }) + it('records OpenCode turn completion against the active tab when a terminal is duplicated', async () => { + const store = createStore({ + activeTabId: 'tab-active', + tabs: [ + { + id: 'tab-background', + createRequestId: 'req-background', + title: 'OpenCode background', + status: 'running', + mode: 'opencode', + shell: 'system', + terminalId: 'term-opencode', + createdAt: 1, + }, + { + id: 'tab-active', + createRequestId: 'req-active', + title: 'OpenCode active', + status: 'running', + mode: 'opencode', + shell: 'system', + terminalId: 'term-opencode', + createdAt: 2, + }, + ], + panes: { + layouts: { + 'tab-background': { + type: 'leaf', + id: 'pane-background', + content: { + kind: 'terminal', + createRequestId: 'req-background', + status: 'running', + mode: 'opencode', + shell: 'system', + terminalId: 'term-opencode', + initialCwd: '/workspace', + }, + }, + 'tab-active': { + type: 'leaf', + id: 'pane-active', + content: { + kind: 'terminal', + createRequestId: 'req-active', + status: 'running', + mode: 'opencode', + shell: 'system', + terminalId: 'term-opencode', + initialCwd: '/workspace', + }, + }, + }, + activePane: { + 'tab-background': 'pane-background', + 'tab-active': 'pane-active', + }, + }, + }) + wsMocks.isReady = true + wsMocks.serverInstanceId = 'srv-preconnected-opencode-turn-complete-duplicate' + + render( + + + + ) + + await waitFor(() => { + expect(store.getState().connection.status).toBe('ready') + }) + + act(() => { + messageHandler?.({ + type: 'terminal.turn.complete', + terminalId: 'term-opencode', + provider: 'opencode', + sessionId: 'session-opencode', + at: 5678, + }) + }) + + await waitFor(() => { + expect(store.getState().turnCompletion.lastEvent).toMatchObject({ + tabId: 'tab-active', + paneId: 'pane-active', + terminalId: 'term-opencode', + at: 5678, + }) + }) + expect(store.getState().turnCompletion.seq).toBe(1) + }) + it('keeps the WS message handler registered after an initial connect failure, so a later ready can recover state', async () => { const store = createStore() diff --git a/test/unit/client/store/selectors/paneTerminalSelectors.test.ts b/test/unit/client/store/selectors/paneTerminalSelectors.test.ts index f5cc2450..c7bdbb5f 100644 --- a/test/unit/client/store/selectors/paneTerminalSelectors.test.ts +++ b/test/unit/client/store/selectors/paneTerminalSelectors.test.ts @@ -3,6 +3,7 @@ import { describe, it, expect } from 'vitest' import type { PaneNode } from '@/store/paneTypes' import { + selectTabPaneByTerminalId, selectTerminalIdsForTab, selectPrimaryTerminalIdForTab, selectTabIdByTerminalId, @@ -48,8 +49,12 @@ function makeSplit(left: PaneNode, right: PaneNode): PaneNode { function makeState(overrides: { layouts?: Record activePane?: Record + activeTabId?: string | null }) { return { + tabs: { + activeTabId: overrides.activeTabId ?? null, + }, panes: { layouts: overrides.layouts ?? {}, activePane: overrides.activePane ?? {}, @@ -108,6 +113,41 @@ describe('selectTerminalIdsForTab', () => { }) }) +describe('selectTabPaneByTerminalId', () => { + it('prefers the active tab when the same running terminal is present in multiple tabs', () => { + const state = makeState({ + activeTabId: 'tab-active', + layouts: { + 'tab-background': makeLeaf('pane-background', 'term-shared'), + 'tab-active': makeLeaf('pane-active', 'term-shared'), + }, + activePane: { + 'tab-active': 'pane-active', + }, + }) + + expect(selectTabPaneByTerminalId(state, 'term-shared')).toEqual({ + tabId: 'tab-active', + paneId: 'pane-active', + }) + }) + + it('falls back to the first matching pane when the active tab does not contain the terminal', () => { + const state = makeState({ + activeTabId: 'tab-other', + layouts: { + 'tab-background': makeLeaf('pane-background', 'term-shared'), + 'tab-other': makeLeaf('pane-other', 'term-other'), + }, + }) + + expect(selectTabPaneByTerminalId(state, 'term-shared')).toEqual({ + tabId: 'tab-background', + paneId: 'pane-background', + }) + }) +}) + describe('selectPrimaryTerminalIdForTab', () => { it('returns undefined when no layout exists', () => { const state = makeState({}) diff --git a/test/unit/server/coding-cli/opencode-activity-tracker.test.ts b/test/unit/server/coding-cli/opencode-activity-tracker.test.ts index 54e4db3a..6de6a4fc 100644 --- a/test/unit/server/coding-cli/opencode-activity-tracker.test.ts +++ b/test/unit/server/coding-cli/opencode-activity-tracker.test.ts @@ -149,6 +149,129 @@ describe('OpencodeActivityTracker', () => { tracker.dispose() }) + it('emits completion when the initial snapshot observes busy before a same-stream idle event', async () => { + vi.useFakeTimers() + const fetchImpl = vi.fn(async (input: RequestInfo | URL) => { + const url = String(input) + if (url.endsWith('/global/health')) { + return createJsonResponse({ ok: true }) + } + if (url.endsWith('/event')) { + return createSseResponse([ + { type: 'server.connected', properties: {} }, + { + type: 'session.idle', + properties: { + sessionID: 'session-oc', + }, + }, + ]) + } + if (url.endsWith('/session/status')) { + return createJsonResponse({ + 'session-oc': { type: 'busy' }, + }) + } + throw new Error(`Unexpected URL: ${url}`) + }) + + const tracker = new OpencodeActivityTracker({ fetchImpl: fetchImpl as typeof fetch, random: () => 0 }) + const completions: unknown[] = [] + tracker.on('association.requested', (payload) => { + expect(completions).toEqual([]) + tracker.confirmSessionAssociation(payload) + }) + tracker.on('turn.complete', (payload) => completions.push(payload)) + + tracker.trackTerminal({ terminalId: 'term-oc', endpoint: TEST_ENDPOINT }) + await vi.advanceTimersByTimeAsync(0) + + expect(completions).toEqual([{ + terminalId: 'term-oc', + sessionId: 'session-oc', + at: expect.any(Number), + }]) + expect(tracker.list()).toEqual([]) + + tracker.dispose() + }) + + it('clears ambiguous busy state when every ambiguous session idles on the same SSE stream', async () => { + vi.useFakeTimers() + const fetchImpl = vi.fn(async (input: RequestInfo | URL) => { + const url = String(input) + if (url.endsWith('/global/health')) { + return createJsonResponse({ ok: true }) + } + if (url.endsWith('/event')) { + return createSseResponse([ + { type: 'server.connected', properties: {} }, + { + type: 'session.status', + properties: { + sessionID: 'session-a', + status: { type: 'busy' }, + }, + }, + { + type: 'session.status', + properties: { + sessionID: 'session-b', + status: { type: 'busy' }, + }, + }, + { + type: 'session.idle', + properties: { + sessionID: 'session-a', + }, + }, + { + type: 'session.idle', + properties: { + sessionID: 'session-b', + }, + }, + ]) + } + if (url.endsWith('/session/status')) { + return createJsonResponse({}) + } + throw new Error(`Unexpected URL: ${url}`) + }) + + const log = { warn: vi.fn() } + const tracker = new OpencodeActivityTracker({ + fetchImpl: fetchImpl as typeof fetch, + log, + random: () => 0, + }) + const changes: Array<{ upsert: unknown[]; remove: string[] }> = [] + const completions: unknown[] = [] + tracker.on('changed', (payload) => changes.push(payload)) + tracker.on('association.requested', (payload) => tracker.confirmSessionAssociation(payload)) + tracker.on('turn.complete', (payload) => completions.push(payload)) + + tracker.trackTerminal({ terminalId: 'term-oc', endpoint: TEST_ENDPOINT }) + await vi.advanceTimersByTimeAsync(0) + + expect(log.warn).toHaveBeenCalledWith( + { + terminalId: 'term-oc', + sessionIds: ['session-a', 'session-b'], + }, + 'OpenCode endpoint reported ambiguous session ownership; suppressing durable adoption.', + ) + expect(changes).toContainEqual({ + upsert: [], + remove: ['term-oc'], + }) + expect(completions).toEqual([]) + expect(tracker.list()).toEqual([]) + + tracker.dispose() + }) + it('keeps health polling on connection errors until the endpoint comes up', async () => { vi.useFakeTimers() let healthCalls = 0 diff --git a/test/unit/server/coding-cli/opencode-ownership-reducer.test.ts b/test/unit/server/coding-cli/opencode-ownership-reducer.test.ts index 8ff16283..0628d6ea 100644 --- a/test/unit/server/coding-cli/opencode-ownership-reducer.test.ts +++ b/test/unit/server/coding-cli/opencode-ownership-reducer.test.ts @@ -167,6 +167,70 @@ describe('opencode ownership reducer', () => { expect(result.actions).toEqual([{ kind: 'activityRemove', at: 30 }]) }) + it('clears ambiguous ownership after every blocked session idles on the live stream', () => { + let state = createOpencodeOwnershipState() + + let result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'busy', + at: 10, + }) + state = result.state + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-b', + status: 'busy', + at: 11, + }) + state = result.state + + expect(state).toEqual({ + kind: 'ambiguous', + knownSessionId: undefined, + blockedSessionIds: ['session-a', 'session-b'], + since: 11, + }) + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-a', + status: 'idle', + at: 20, + }) + state = result.state + + expect(state).toEqual({ + kind: 'ambiguous', + knownSessionId: undefined, + blockedSessionIds: ['session-b'], + since: 11, + }) + expect(result.actions).toEqual([{ kind: 'activityUpsert', at: 20 }]) + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 1, + sessionId: 'session-b', + status: 'idle', + at: 21, + }) + + expect(result.state).toEqual({ + kind: 'quiet', + knownSessionId: undefined, + }) + expect(result.actions).toEqual([{ kind: 'activityRemove', at: 21 }]) + }) + it('never emits turn completion from snapshots', () => { let state = createOpencodeOwnershipState('session-a') @@ -199,7 +263,7 @@ describe('opencode ownership reducer', () => { })) }) - it('does not associate or complete a snapshot-only busy interval from live idle', () => { + it('requests association for a snapshot-seeded candidate that idles on the same live stream', () => { let state = createOpencodeOwnershipState() let result = reduceOpencodeOwnership(state, { @@ -221,6 +285,46 @@ describe('opencode ownership reducer', () => { status: 'idle', at: 20, }) + state = result.state + + expect(result.actions).toEqual([ + { kind: 'activityRemove', at: 20 }, + { kind: 'requestAssociation', sessionId: 'session-a' }, + ]) + + result = confirmOpencodeAssociation(state, { sessionId: 'session-a' }) + + expect(result.state).toEqual({ + kind: 'quiet', + knownSessionId: 'session-a', + }) + expect(result.actions).toEqual([ + { kind: 'turnComplete', sessionId: 'session-a', at: 20 }, + ]) + }) + + it('ignores a stale-stream idle for a snapshot-seeded busy interval', () => { + let state = createOpencodeOwnershipState() + + let result = reduceOpencodeOwnership(state, { + kind: 'snapshot', + cycleId: 1, + streamId: 1, + statuses: { + 'session-a': { type: 'busy' }, + }, + at: 10, + }) + state = result.state + + result = reduceOpencodeOwnership(state, { + kind: 'sse', + cycleId: 1, + streamId: 2, + sessionId: 'session-a', + status: 'idle', + at: 20, + }) expect(result.state).toEqual(state) expect(result.actions).toEqual([])