diff --git a/.trajectories/active/traj_v87cyrs8dke9.json b/.trajectories/active/traj_v87cyrs8dke9.json index 199ef0397..47c2220c5 100644 --- a/.trajectories/active/traj_v87cyrs8dke9.json +++ b/.trajectories/active/traj_v87cyrs8dke9.json @@ -6,8 +6,47 @@ }, "status": "active", "startedAt": "2026-05-14T14:28:34.155Z", - "agents": [], - "chapters": [], + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-18T04:01:59.334Z" + } + ], + "chapters": [ + { + "id": "chap_alw03jfffmtf", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-18T04:01:59.334Z", + "events": [ + { + "ts": 1779076919335, + "type": "decision", + "content": "Factored broker-connection discovery into src/cli/lib/broker-connection.ts", + "raw": { + "question": "Factored broker-connection discovery into src/cli/lib/broker-connection.ts", + "chosen": "Factored broker-connection discovery into src/cli/lib/broker-connection.ts", + "alternatives": [], + "reasoning": "Drive and view both need the same flag/env/connection.json fallback chain; pulling it out keeps both verbs aligned and unblocks future relay/new/run verbs from sub-PR 4." + }, + "significance": "high" + }, + { + "ts": 1779076919835, + "type": "decision", + "content": "Hand-rolled keybind state machine in drive.ts instead of pulling in readline.emitKeypressEvents", + "raw": { + "question": "Hand-rolled keybind state machine in drive.ts instead of pulling in readline.emitKeypressEvents", + "chosen": "Hand-rolled keybind state machine in drive.ts instead of pulling in readline.emitKeypressEvents", + "alternatives": [], + "reasoning": "All bindings are ASCII control chars (Ctrl+G/Ctrl+C/Ctrl+B prefix). Tiny stateful parser handles cross-chunk Ctrl+B prefix cleanly and is trivially testable." + }, + "significance": "high" + } + ] + } + ], "commits": [], "filesChanged": [], "projectId": "/Users/will/Projects/AgentWorkforce/relay", @@ -16,4 +55,4 @@ "startRef": "83ecfbca9cd87540629ae0a9b2f155cd2c3070cf", "endRef": "83ecfbca9cd87540629ae0a9b2f155cd2c3070cf" } -} \ No newline at end of file +} diff --git a/.trajectories/completed/2026-05/traj_ryf5sstno6p3.json b/.trajectories/completed/2026-05/traj_ryf5sstno6p3.json index d80f08e03..bed95a718 100644 --- a/.trajectories/completed/2026-05/traj_ryf5sstno6p3.json +++ b/.trajectories/completed/2026-05/traj_ryf5sstno6p3.json @@ -48,7 +48,7 @@ }, "commits": [], "filesChanged": [], - "projectId": "/Users/will/Projects/AgentWorkforce/relay/.claude/worktrees/agent-aff3649a00a0f36ab", + "projectId": "", "tags": [], "_trace": { "startRef": "a30e5c89ae5b80bf733a407590643d51f4c998b4", diff --git a/.trajectories/index.json b/.trajectories/index.json index 3fbf2c278..29604fd39 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-18T03:02:35.347Z", + "lastUpdated": "2026-05-18T04:01:59.836Z", "trajectories": { "traj_9gq96irkj00s": { "title": "Update relay to use published relaycast Rust reclaim fix", @@ -414,7 +414,7 @@ "title": "Upgrade .agentworkforce personas to latest 3.x shape", "status": "active", "startedAt": "2026-05-14T14:28:34.155Z", - "path": "/Users/will/Projects/AgentWorkforce/relay/.trajectories/active/traj_v87cyrs8dke9.json" + "path": ".trajectories/active/traj_v87cyrs8dke9.json" }, "traj_v1wexlfur5zr": { "title": "Fix broker headless reliability doc", diff --git a/src/cli/bootstrap.test.ts b/src/cli/bootstrap.test.ts index b9a3bc0de..3308380b9 100644 --- a/src/cli/bootstrap.test.ts +++ b/src/cli/bootstrap.test.ts @@ -38,6 +38,7 @@ const expectedLeafCommands = [ 'run', 'connect', 'view', + 'drive', 'dlq list', 'dlq inspect', 'dlq replay', diff --git a/src/cli/bootstrap.ts b/src/cli/bootstrap.ts index bcc3d1609..cf8edaf3d 100644 --- a/src/cli/bootstrap.ts +++ b/src/cli/bootstrap.ts @@ -27,6 +27,7 @@ import { registerConnectCommands } from './commands/connect.js'; import { registerOnCommands } from './commands/on.js'; import { registerDlqCommands } from './commands/dlq.js'; import { registerViewCommands } from './commands/view.js'; +import { registerDriveCommands } from './commands/drive.js'; dotenvConfig({ quiet: true }); @@ -283,6 +284,7 @@ export function createProgram(options: { name?: string } = {}): Command { registerConnectCommands(program); registerDlqCommands(program); registerViewCommands(program); + registerDriveCommands(program); return program; } diff --git a/src/cli/commands/drive.test.ts b/src/cli/commands/drive.test.ts new file mode 100644 index 000000000..187ad4d7b --- /dev/null +++ b/src/cli/commands/drive.test.ts @@ -0,0 +1,748 @@ +import { Buffer } from 'node:buffer'; + +import { Command } from 'commander'; +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { + KeybindParser, + classifyWsEvent, + registerDriveCommands, + renderStatusLine, + runDriveSession, + type DriveDependencies, + type DriveStdin, + type DriveTerminal, + type DriveWebSocket, +} from './drive.js'; + +class ExitSignal extends Error { + constructor(public readonly code: number) { + super(`exit:${code}`); + } +} + +type WsListener = (...args: unknown[]) => void; + +class FakeWebSocket implements DriveWebSocket { + readonly url: string; + readonly headers: Record; + readonly listeners = new Map(); + closed = false; + closeCode?: number; + closeReason?: string; + + constructor(url: string, headers: Record) { + this.url = url; + this.headers = headers; + } + + on(event: string, listener: (...args: unknown[]) => void): unknown { + const bucket = this.listeners.get(event) ?? []; + bucket.push(listener); + this.listeners.set(event, bucket); + return this; + } + + emit(event: string, ...args: unknown[]): void { + for (const listener of this.listeners.get(event) ?? []) { + listener(...args); + } + } + + close(code?: number, reason?: string): void { + this.closed = true; + this.closeCode = code; + this.closeReason = reason; + } +} + +class FakeStdin implements DriveStdin { + isTTY = true; + setRawMode = vi.fn<(mode: boolean) => unknown>(() => undefined); + resume = vi.fn(() => undefined); + pause = vi.fn(() => undefined); + private listener: ((chunk: Buffer) => void) | null = null; + rawModeCalls: boolean[] = []; + + constructor() { + this.setRawMode = vi.fn((mode: boolean) => { + this.rawModeCalls.push(mode); + return undefined; + }); + } + + on(event: 'data', listener: (chunk: Buffer) => void): unknown { + if (event === 'data') this.listener = listener; + return this; + } + + off(event: 'data', listener: (chunk: Buffer) => void): unknown { + if (event === 'data' && this.listener === listener) this.listener = null; + return this; + } + + removeListener(event: 'data', listener: (chunk: Buffer) => void): unknown { + return this.off(event, listener); + } + + /** Tests use this to simulate the user typing. */ + type(chunk: Buffer): void { + this.listener?.(chunk); + } +} + +/** + * Fake terminal size source. Tests control the current `(rows, cols)` + * via `setSize` and synthesize a resize event via `triggerResize`. + * `null` size simulates "not a TTY" so the resize-forwarding path can + * be exercised in both modes. + */ +class FakeTerminal implements DriveTerminal { + private currentSize: { rows: number; cols: number } | null; + private handlers: Array<() => void> = []; + /** Records every `(rows, cols)` reported via `getSize` *after* it + * was called by the system under test. Useful for assertions. */ + readonly sizeReadCount = { value: 0 }; + + constructor(initial: { rows: number; cols: number } | null = { rows: 30, cols: 100 }) { + this.currentSize = initial; + } + + getSize(): { rows: number; cols: number } | null { + this.sizeReadCount.value += 1; + return this.currentSize; + } + + onResize(handler: () => void): () => void { + this.handlers.push(handler); + return () => { + this.handlers = this.handlers.filter((h) => h !== handler); + }; + } + + /** Update the reported size *and* fire a resize event. */ + setSize(size: { rows: number; cols: number } | null): void { + this.currentSize = size; + for (const h of this.handlers) h(); + } + + /** Returns the number of currently-subscribed resize listeners. */ + listenerCount(): number { + return this.handlers.length; + } +} + +/** Routed fetch — keyed on `${method} ${pathSuffix}`. */ +type FetchRoute = (init?: RequestInit) => Promise; + +interface FetchScript { + /** Map of route key → handler. Default behaviour returns 200 + sensible body. */ + routes?: Record; + /** Default mode reported by `GET …/mode`. */ + initialMode?: 'human' | 'relay'; + /** Default pending count reported by `GET …/pending`. */ + initialPending?: number; + /** Make `PUT …/mode` to `human` fail with this status / body. */ + modeFlipFailure?: { status: number; error?: string }; + /** Make `captureAndRenderSnapshot` return this status. */ + snapshotResult?: Awaited>; + /** Initial local terminal size. Defaults to `{ rows: 30, cols: 100 }`; + * pass `null` to simulate "not a TTY" so the resize-forwarding path + * short-circuits. */ + terminalSize?: { rows: number; cols: number } | null; +} + +function createHarness(opts: FetchScript = {}): { + deps: DriveDependencies; + stdin: FakeStdin; + terminal: FakeTerminal; + sockets: FakeWebSocket[]; + writes: string[]; + errors: unknown[][]; + logs: unknown[][]; + signals: Map void | Promise>; + fetchLog: Array<{ url: string; method: string; body?: unknown }>; +} { + const writes: string[] = []; + const errors: unknown[][] = []; + const logs: unknown[][] = []; + const signals = new Map void | Promise>(); + const sockets: FakeWebSocket[] = []; + const fetchLog: Array<{ url: string; method: string; body?: unknown }> = []; + const stdin = new FakeStdin(); + const terminal = new FakeTerminal( + opts.terminalSize === undefined ? { rows: 30, cols: 100 } : opts.terminalSize + ); + + const initialMode = opts.initialMode ?? 'relay'; + const initialPending = opts.initialPending ?? 0; + + const defaultRoutes: Record = { + 'POST /resize': async () => + new Response(JSON.stringify({ ok: true }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }), + 'GET /mode': async () => + new Response(JSON.stringify({ mode: initialMode }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }), + 'PUT /mode': async (init) => { + if (opts.modeFlipFailure) { + return new Response(JSON.stringify({ error: opts.modeFlipFailure.error ?? 'fail' }), { + status: opts.modeFlipFailure.status, + headers: { 'Content-Type': 'application/json' }, + }); + } + const body = init?.body ? (JSON.parse(String(init.body)) as { mode: string }) : { mode: '' }; + return new Response(JSON.stringify({ mode: body.mode, flushed: 0 }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }); + }, + 'GET /pending': async () => { + const pending = Array.from({ length: initialPending }, (_, i) => ({ event_id: `e${i}` })); + return new Response(JSON.stringify({ pending }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }); + }, + 'POST /flush': async () => + new Response(JSON.stringify({ flushed: 0 }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }), + 'POST /input': async () => + new Response(JSON.stringify({ ok: true }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }), + }; + const routes = { ...defaultRoutes, ...(opts.routes ?? {}) }; + + const fetchFn = vi.fn(async (input: RequestInfo | URL, init?: RequestInit): Promise => { + const url = typeof input === 'string' ? input : input.toString(); + const method = (init?.method ?? 'GET').toUpperCase(); + let bodyJson: unknown; + if (init?.body) { + try { + bodyJson = JSON.parse(String(init.body)); + } catch { + bodyJson = String(init.body); + } + } + fetchLog.push({ url, method, body: bodyJson }); + + // Match by the trailing path segment (`/mode`, `/pending`, `/flush`) + // or the `/api/input/...` prefix. + let key: string | null = null; + if (/\/api\/spawned\/[^/]+\/mode$/.test(url)) { + key = `${method} /mode`; + } else if (/\/api\/spawned\/[^/]+\/pending$/.test(url)) { + key = `${method} /pending`; + } else if (/\/api\/spawned\/[^/]+\/flush$/.test(url)) { + key = `${method} /flush`; + } else if (/\/api\/input\/[^/]+$/.test(url)) { + key = `${method} /input`; + } else if (/\/api\/resize\/[^/]+$/.test(url)) { + key = `${method} /resize`; + } + if (key && routes[key]) { + return routes[key](init); + } + return new Response('not mocked', { status: 500 }); + }) as unknown as typeof globalThis.fetch; + + const deps: DriveDependencies = { + readConnectionFile: vi.fn(() => ({ url: 'http://localhost:3889', api_key: 'k' })), + getDefaultStateDir: vi.fn(() => '/tmp/fake/.agent-relay'), + env: {}, + createWebSocket: vi.fn((url: string, headers: Record) => { + const socket = new FakeWebSocket(url, headers); + sockets.push(socket); + return socket; + }), + writeChunk: (chunk: string) => { + writes.push(chunk); + }, + onSignal: (signal, handler) => { + signals.set(signal, handler); + }, + log: (...args: unknown[]) => { + logs.push(args); + }, + error: (...args: unknown[]) => { + errors.push(args); + }, + exit: vi.fn((code: number) => { + throw new ExitSignal(code); + }) as unknown as DriveDependencies['exit'], + fetch: fetchFn, + captureAndRenderSnapshot: vi.fn(async (_conn, _name, snapshotDeps) => { + // The default behaviour writes nothing — most tests assert on the + // status line + WS chunks, not on the snapshot. + void snapshotDeps; + return opts.snapshotResult ?? { status: 'ok' }; + }) as DriveDependencies['captureAndRenderSnapshot'], + stdin, + terminal, + }; + + return { deps, stdin, terminal, sockets, writes, errors, logs, signals, fetchLog }; +} + +afterEach(() => { + vi.restoreAllMocks(); +}); + +/** Helpers ----- */ + +async function openSocket(sockets: FakeWebSocket[]): Promise { + // Allow the awaited mode-flip + snapshot HTTP calls to settle before + // the WS factory is invoked. + for (let i = 0; i < 10 && sockets.length === 0; i++) { + await new Promise((resolve) => setImmediate(resolve)); + } + expect(sockets).toHaveLength(1); + const socket = sockets[0]; + socket.emit('open'); + // Let the stdin-takeover microtasks run. + await new Promise((resolve) => setImmediate(resolve)); + return socket; +} + +function jsonMessage(payload: Record): Buffer { + return Buffer.from(JSON.stringify(payload)); +} + +/** Tests ----- */ + +describe('classifyWsEvent', () => { + it('matches worker_stream for the targeted agent', () => { + expect( + classifyWsEvent(JSON.stringify({ kind: 'worker_stream', name: 'Alice', chunk: 'hi' }), 'Alice') + ).toEqual({ kind: 'worker_stream', chunk: 'hi' }); + }); + + it('filters worker_stream for other agents', () => { + expect( + classifyWsEvent(JSON.stringify({ kind: 'worker_stream', name: 'Bob', chunk: 'hi' }), 'Alice') + ).toEqual({ kind: 'other' }); + }); + + it('classifies delivery_queued for the targeted agent', () => { + expect( + classifyWsEvent(JSON.stringify({ kind: 'delivery_queued', name: 'Alice', event_id: 'e1' }), 'Alice') + ).toEqual({ kind: 'delivery_queued' }); + }); + + it('classifies agent_pending_drained with optional count', () => { + expect( + classifyWsEvent(JSON.stringify({ kind: 'agent_pending_drained', name: 'Alice', count: 3 }), 'Alice') + ).toEqual({ kind: 'agent_pending_drained', count: 3 }); + }); + + it('returns other for unrelated kinds', () => { + expect(classifyWsEvent(JSON.stringify({ kind: 'agent_spawned', name: 'Alice' }), 'Alice')).toEqual({ + kind: 'other', + }); + }); + + it('returns other for non-JSON payloads', () => { + expect(classifyWsEvent('not-json', 'Alice')).toEqual({ kind: 'other' }); + }); +}); + +describe('KeybindParser', () => { + it('forwards ordinary keystrokes unchanged', () => { + const p = new KeybindParser(); + const out = p.feed(Buffer.from('hello')); + expect(out.forward.toString()).toBe('hello'); + expect(out.actions).toEqual([]); + }); + + it('intercepts Ctrl+G as flush', () => { + const p = new KeybindParser(); + const out = p.feed(Buffer.from([0x68, 0x07, 0x69])); // h, Ctrl+G, i + expect(out.forward.toString()).toBe('hi'); + expect(out.actions).toEqual(['flush']); + }); + + it('intercepts Ctrl+C as detach', () => { + const p = new KeybindParser(); + const out = p.feed(Buffer.from([0x03])); + expect(out.forward.length).toBe(0); + expect(out.actions).toEqual(['detach']); + }); + + it('recognises Ctrl+B D (capital) as detach across chunks', () => { + const p = new KeybindParser(); + const first = p.feed(Buffer.from([0x02])); + expect(first.forward.length).toBe(0); + expect(first.actions).toEqual([]); + const second = p.feed(Buffer.from([0x44])); // 'D' + expect(second.forward.length).toBe(0); + expect(second.actions).toEqual(['detach']); + }); + + it('recognises Ctrl+B d (lowercase) and Ctrl+B Ctrl+D as detach', () => { + const p1 = new KeybindParser(); + expect(p1.feed(Buffer.from([0x02, 0x64])).actions).toEqual(['detach']); + const p2 = new KeybindParser(); + expect(p2.feed(Buffer.from([0x02, 0x04])).actions).toEqual(['detach']); + }); + + it('recognises Ctrl+B ? as toggle_help', () => { + const p = new KeybindParser(); + expect(p.feed(Buffer.from([0x02, 0x3f])).actions).toEqual(['toggle_help']); + }); + + it('forwards Ctrl+B + unknown byte verbatim so the agent is not deprived', () => { + const p = new KeybindParser(); + const out = p.feed(Buffer.from([0x02, 0x78])); // Ctrl+B, 'x' + expect(Array.from(out.forward)).toEqual([0x02, 0x78]); + expect(out.actions).toEqual([]); + }); + + it('handles multiple keybinds in one chunk in order', () => { + const p = new KeybindParser(); + const out = p.feed(Buffer.from([0x61, 0x07, 0x62, 0x02, 0x64])); // 'a', Ctrl+G, 'b', Ctrl+B, 'd' + expect(out.forward.toString()).toBe('ab'); + expect(out.actions).toEqual(['flush', 'detach']); + }); +}); + +describe('renderStatusLine', () => { + it('includes agent name, mode, pending count, and detach hint', () => { + const out = renderStatusLine({ agentName: 'Alice', mode: 'human', pending: 3, showHelp: false }); + expect(out).toContain('drive Alice'); + expect(out).toContain('mode=human'); + expect(out).toContain('pending=3'); + expect(out).toContain('Ctrl+B D detach'); + }); + + it('uses save/restore cursor + reverse video so the agent screen is preserved', () => { + const out = renderStatusLine({ agentName: 'Alice', mode: 'human', pending: 0, showHelp: false }); + expect(out.startsWith('\x1b7')).toBe(true); // save cursor + expect(out.endsWith('\x1b8')).toBe(true); // restore cursor + expect(out).toContain('\x1b[7m'); // reverse video + expect(out).toContain('\x1b[0m'); // reset + }); + + it('positions at the given row', () => { + const out = renderStatusLine({ + agentName: 'A', + mode: 'human', + pending: 0, + showHelp: false, + rows: 50, + }); + expect(out).toContain('\x1b[50;1H'); + }); + + it('shows extra hint when help is toggled on', () => { + const out = renderStatusLine({ agentName: 'A', mode: 'human', pending: 0, showHelp: true }); + expect(out).toContain('hide help'); + }); +}); + +describe('runDriveSession', () => { + it('flips to human mode, renders snapshot, opens WS, then restores prior mode on detach', async () => { + const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'relay' }); + const sessionPromise = runDriveSession('Alice', {}, deps); + const socket = await openSocket(sockets); + expect(socket.url).toBe('ws://localhost:3889/ws'); + expect(socket.headers['X-API-Key']).toBe('k'); + + // PUT /mode body should be { mode: 'human' }. + const flipCall = fetchLog.find((c) => c.method === 'PUT' && c.url.endsWith('/mode')); + expect(flipCall?.body).toEqual({ mode: 'human' }); + + // Raw mode should be on after open. + expect(stdin.rawModeCalls.includes(true)).toBe(true); + + // Detach via Ctrl+B D. + stdin.type(Buffer.from([0x02, 0x44])); + const code = await sessionPromise; + expect(code).toBe(0); + + // Raw mode restored. + expect(stdin.rawModeCalls).toEqual([true, false]); + + // Last PUT /mode call should restore to 'relay' (the prior mode). + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); + expect(modeCalls).toHaveLength(2); + expect(modeCalls[1].body).toEqual({ mode: 'relay' }); + }); + + it('aborts before opening the WS when the broker rejects the mode flip', async () => { + const { deps, sockets, errors } = createHarness({ + modeFlipFailure: { status: 404, error: "no agent named 'Ghost'" }, + }); + const code = await runDriveSession('Ghost', {}, deps); + expect(code).toBe(1); + expect(sockets).toHaveLength(0); + expect(errors.some((args) => String(args[0]).includes("no agent named 'Ghost'"))).toBe(true); + }); + + it('aborts before opening the WS when the snapshot is not_found', async () => { + const { deps, sockets, errors, fetchLog } = createHarness({ + snapshotResult: { status: 'not_found', message: "no agent named 'Ghost'" }, + }); + const code = await runDriveSession('Ghost', {}, deps); + expect(code).toBe(1); + expect(sockets).toHaveLength(0); + expect(errors[0]?.[0]).toMatch(/no agent named/); + // Best-effort restore PUT should still have fired. + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); + expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'relay' }]); + }); + + it('aborts before opening the WS when the worker has no PTY', async () => { + const { deps, sockets, errors } = createHarness({ + snapshotResult: { status: 'no_pty', message: "agent 'Headless' has no PTY" }, + }); + const code = await runDriveSession('Headless', {}, deps); + expect(code).toBe(1); + expect(sockets).toHaveLength(0); + expect(errors[0]?.[0]).toMatch(/no PTY/); + }); + + it('continues with a warning when the snapshot is transiently unavailable', async () => { + const { deps, sockets, logs } = createHarness({ + snapshotResult: { status: 'unavailable', message: 'HTTP 504' }, + }); + const sessionPromise = runDriveSession('Alice', {}, deps); + const socket = await openSocket(sockets); + expect(logs.some((args) => String(args[0]).includes('could not capture initial screen'))).toBe(true); + // Detach to let the test finish. + socket.emit('close', 1000, Buffer.from('')); + await sessionPromise; + }); + + it('increments pending on delivery_queued and resets on agent_pending_drained', async () => { + const { deps, sockets, writes, stdin } = createHarness(); + const sessionPromise = runDriveSession('Alice', {}, deps); + const socket = await openSocket(sockets); + // Initial paint should have happened. + const initialPaints = writes.filter((w) => w.includes('drive Alice')).length; + expect(initialPaints).toBeGreaterThan(0); + + socket.emit('message', jsonMessage({ kind: 'delivery_queued', name: 'Alice', event_id: 'e1' })); + socket.emit('message', jsonMessage({ kind: 'delivery_queued', name: 'Alice', event_id: 'e2' })); + expect(writes.some((w) => w.includes('pending=1'))).toBe(true); + expect(writes.some((w) => w.includes('pending=2'))).toBe(true); + + socket.emit('message', jsonMessage({ kind: 'agent_pending_drained', name: 'Alice', count: 2 })); + // After the drained event we should see a pending=0 paint. + expect(writes.filter((w) => w.includes('pending=0')).length).toBeGreaterThan(0); + + stdin.type(Buffer.from([0x03])); // Ctrl+C → detach + await sessionPromise; + }); + + it('writes worker_stream chunks to stdout and repaints the status line', async () => { + const { deps, sockets, writes, stdin } = createHarness(); + const sessionPromise = runDriveSession('Alice', {}, deps); + const socket = await openSocket(sockets); + socket.emit('message', jsonMessage({ kind: 'worker_stream', name: 'Alice', chunk: 'live output' })); + expect(writes.includes('live output')).toBe(true); + // Some paint should follow the worker chunk. + const liveIdx = writes.indexOf('live output'); + const repaintAfter = writes.slice(liveIdx + 1).some((w) => w.includes('drive Alice')); + expect(repaintAfter).toBe(true); + + stdin.type(Buffer.from([0x03])); + await sessionPromise; + }); + + it('forwards stdin keystrokes via POST /api/input/{name}', async () => { + const { deps, sockets, stdin, fetchLog } = createHarness(); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + stdin.type(Buffer.from('hello')); + // Let the fire-and-forget POST settle. + await new Promise((resolve) => setImmediate(resolve)); + const input = fetchLog.find((c) => c.method === 'POST' && c.url.includes('/api/input/')); + expect(input?.body).toEqual({ data: 'hello' }); + + stdin.type(Buffer.from([0x03])); + await sessionPromise; + }); + + it('Ctrl+G triggers POST /api/spawned/{name}/flush', async () => { + const { deps, sockets, stdin, fetchLog } = createHarness(); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + stdin.type(Buffer.from([0x07])); // Ctrl+G + await new Promise((resolve) => setImmediate(resolve)); + const flush = fetchLog.find((c) => c.method === 'POST' && c.url.endsWith('/flush')); + expect(flush).toBeDefined(); + + stdin.type(Buffer.from([0x03])); + await sessionPromise; + }); + + it('restores the prior mode even on abnormal WebSocket close', async () => { + const { deps, sockets, fetchLog, errors } = createHarness({ initialMode: 'relay' }); + const sessionPromise = runDriveSession('Alice', {}, deps); + const socket = await openSocket(sockets); + + socket.emit('close', 1006, Buffer.from('abnormal')); + const code = await sessionPromise; + expect(code).toBe(1); + expect(errors.some((args) => String(args[0]).includes('connection closed'))).toBe(true); + + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); + expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'relay' }]); + }); + + it('proceeds when the worker is already in human mode (re-attach scenario)', async () => { + const { deps, sockets, stdin, fetchLog } = createHarness({ initialMode: 'human' }); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + stdin.type(Buffer.from([0x03])); + await sessionPromise; + + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); + // Restore to 'human' since that was the prior mode. + expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'human' }]); + }); + + it('exits cleanly on SIGINT', async () => { + const { deps, sockets, signals, stdin } = createHarness(); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + const sigint = signals.get('SIGINT'); + expect(sigint).toBeDefined(); + await sigint?.(); + + const code = await sessionPromise; + expect(code).toBe(0); + // Raw mode must be restored. + expect(stdin.rawModeCalls).toEqual([true, false]); + }); + + it('returns 1 when no broker connection can be resolved', async () => { + const { deps, errors } = createHarness(); + deps.readConnectionFile = vi.fn(() => null); + const code = await runDriveSession('Alice', {}, deps); + expect(code).toBe(1); + expect(errors[0]?.[0]).toMatch(/could not locate broker connection/); + }); + + // ---- resize forwarding (table-stakes for a take-over UX) ---- + + it('forwards the local terminal size to the broker on attach', async () => { + const { deps, sockets, signals, fetchLog } = createHarness({ + terminalSize: { rows: 60, cols: 200 }, + }); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + const resizeCalls = fetchLog.filter((call) => call.method === 'POST' && call.url.includes('/resize/')); + expect(resizeCalls).toHaveLength(1); + expect(resizeCalls[0].body).toEqual({ rows: 60, cols: 200 }); + + await signals.get('SIGINT')?.(); + await sessionPromise; + }); + + it('forwards subsequent SIGWINCH resize events to the broker', async () => { + const { deps, sockets, signals, terminal, fetchLog } = createHarness({ + terminalSize: { rows: 30, cols: 100 }, + }); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + // Simulate the user dragging their terminal larger, then smaller. + terminal.setSize({ rows: 50, cols: 150 }); + await new Promise((resolve) => setImmediate(resolve)); + terminal.setSize({ rows: 24, cols: 80 }); + await new Promise((resolve) => setImmediate(resolve)); + + const resizeBodies = fetchLog + .filter((call) => call.method === 'POST' && call.url.includes('/resize/')) + .map((call) => call.body); + // First the on-attach sync, then each user-driven resize. + expect(resizeBodies).toEqual([ + { rows: 30, cols: 100 }, + { rows: 50, cols: 150 }, + { rows: 24, cols: 80 }, + ]); + + await signals.get('SIGINT')?.(); + await sessionPromise; + }); + + it('unsubscribes the resize listener on detach', async () => { + const { deps, sockets, signals, terminal } = createHarness(); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + expect(terminal.listenerCount()).toBe(1); + await signals.get('SIGINT')?.(); + await sessionPromise; + expect(terminal.listenerCount()).toBe(0); + }); + + it('skips resize forwarding when stdout is not a TTY', async () => { + const { deps, sockets, signals, fetchLog } = createHarness({ terminalSize: null }); + const sessionPromise = runDriveSession('Alice', {}, deps); + await openSocket(sockets); + + const resizeCalls = fetchLog.filter((call) => call.method === 'POST' && call.url.includes('/resize/')); + expect(resizeCalls).toHaveLength(0); + + await signals.get('SIGINT')?.(); + await sessionPromise; + }); + + it('logs but continues when the initial resize sync fails', async () => { + const { deps, sockets, signals, logs } = createHarness({ + terminalSize: { rows: 30, cols: 100 }, + routes: { + 'POST /resize': async () => + new Response('boom', { status: 500, headers: { 'Content-Type': 'text/plain' } }), + }, + }); + + const sessionPromise = runDriveSession('Alice', {}, deps); + // Should still open the WS even though resize failed — UX-annoying + // not fatal; the human can still type into an unsync'd-size agent. + const socket = await openSocket(sockets); + + expect(logs.some((args) => String(args[0]).includes('could not sync agent PTY size'))).toBe(true); + expect(socket).toBeDefined(); + + await signals.get('SIGINT')?.(); + await sessionPromise; + }); +}); + +describe('registerDriveCommands', () => { + it('registers a `drive` command on the program', () => { + const { deps } = createHarness(); + const program = new Command(); + program.exitOverride(); + registerDriveCommands(program, deps); + const cmd = program.commands.find((c) => c.name() === 'drive'); + expect(cmd).toBeDefined(); + expect(cmd?.description()).toMatch(/interactive control/i); + }); + + it('wires --broker-url, --api-key, and --state-dir options', () => { + const { deps } = createHarness(); + const program = new Command(); + program.exitOverride(); + registerDriveCommands(program, deps); + const cmd = program.commands.find((c) => c.name() === 'drive'); + const flags = cmd?.options.map((opt) => opt.long).filter(Boolean); + expect(flags).toEqual(expect.arrayContaining(['--broker-url', '--api-key', '--state-dir'])); + }); +}); diff --git a/src/cli/commands/drive.ts b/src/cli/commands/drive.ts new file mode 100644 index 000000000..1b80d1e62 --- /dev/null +++ b/src/cli/commands/drive.ts @@ -0,0 +1,821 @@ +/** + * `agent-relay drive ` — interactive read-write take-over client. + * + * Attaches to a running agent, flips it into `human` session mode so the + * broker parks new relay messages in a per-worker queue, and forwards your + * keystrokes to the worker's PTY. You can drain the queue on demand with + * `Ctrl+G` and detach with `Ctrl+B D` (or `Ctrl+C` as a safety alias). + * Detaching restores the worker's previous session mode and leaves the + * agent running under the broker — `drive` never kills the worker. + * + * Sequence of operations on attach: + * + * 1. Discover broker connection (CLI flag → env → connection.json). + * 2. `GET /api/spawned/{name}/mode` → remember the previous mode. + * 3. `PUT /api/spawned/{name}/mode` → switch to `human`. + * 4. `captureAndRenderSnapshot` → repaint the agent's current screen. + * 5. `GET /api/spawned/{name}/pending` → seed the status-line counter. + * 6. Open `/ws`, subscribe to events for this worker. + * 7. Switch local stdin to raw mode; forward bytes to `POST /api/input/{name}`. + * + * On detach (clean or abnormal), best-effort `PUT .../mode` restores the + * previous mode so the queue doesn't fill up indefinitely. + * + * See GitHub issue #864 for the full verb taxonomy. This module ships only + * the `drive` verb — `relay` and `new`/`run` come in sub-PR 4. + */ + +import { Buffer } from 'node:buffer'; + +import { Command } from 'commander'; +import WebSocket from 'ws'; + +import { + captureAndRenderSnapshot, + type AttachSnapshotConnection, + type AttachSnapshotDeps, +} from '../lib/attach.js'; +import { + defaultStateDir, + readConnectionFileFromDisk, + resolveBrokerConnection, + toWsUrl, + type BrokerConnection, +} from '../lib/broker-connection.js'; +import { defaultExit, runSignalHandler } from '../lib/exit.js'; + +type ExitFn = (code: number) => never; + +/** Wire string for the broker's `SessionMode` enum. */ +export type SessionMode = 'human' | 'relay'; + +/** Minimal WebSocket surface we depend on — same shape as `view`'s. */ +export interface DriveWebSocket { + on(event: 'open', listener: () => void): unknown; + on(event: 'message', listener: (data: WebSocket.RawData) => void): unknown; + on(event: 'close', listener: (code: number, reason: Buffer) => void): unknown; + on(event: 'error', listener: (err: Error) => void): unknown; + close(code?: number, reason?: string): void; +} + +export type DriveWebSocketFactory = (url: string, headers: Record) => DriveWebSocket; + +export interface DriveSignalRegistrar { + (signal: NodeJS.Signals, handler: () => void | Promise): void; +} + +/** Stdin surface — tests provide a fake that never touches the real TTY. */ +export interface DriveStdin { + setRawMode?: (mode: boolean) => unknown; + isTTY?: boolean; + resume(): unknown; + pause(): unknown; + on(event: 'data', listener: (chunk: Buffer) => void): unknown; + off?(event: 'data', listener: (chunk: Buffer) => void): unknown; + removeListener?(event: 'data', listener: (chunk: Buffer) => void): unknown; +} + +/** + * Local terminal-size source. Wraps `process.stdout` in production so + * the resize wiring reads the user's actual terminal dimensions and + * gets a SIGWINCH-equivalent `'resize'` event for free. Tests inject a + * controllable fake. + */ +export interface DriveTerminal { + /** Current `(rows, cols)`. Returns `null` when stdout is not a TTY, + * in which case resize forwarding is skipped entirely. */ + getSize(): { rows: number; cols: number } | null; + /** Subscribe to local-terminal resize events. Returns an unsubscribe + * function the client calls during teardown. */ + onResize(handler: () => void): () => void; +} + +export interface DriveDependencies { + /** Reads `/connection.json` and returns parsed JSON, or null. */ + readConnectionFile: (stateDir: string) => unknown; + /** Project paths helper — used to pick the default state dir. */ + getDefaultStateDir: () => string; + /** Environment variables (so tests can inject). */ + env: NodeJS.ProcessEnv; + /** Factory for the WebSocket — overridden in tests with a mock. */ + createWebSocket: DriveWebSocketFactory; + /** Where the PTY chunks get written. Defaults to `process.stdout.write`. */ + writeChunk: (chunk: string) => void; + /** Signal registration (so tests can drive SIGINT without killing the test). */ + onSignal: DriveSignalRegistrar; + log: (...args: unknown[]) => void; + error: (...args: unknown[]) => void; + exit: ExitFn; + /** HTTP client used for mode/pending/flush/input calls. Defaults to global `fetch`. */ + fetch: typeof globalThis.fetch; + /** Override for the snapshot-on-attach helper (tests substitute a stub). */ + captureAndRenderSnapshot: ( + connection: AttachSnapshotConnection, + agentName: string, + deps: AttachSnapshotDeps + ) => ReturnType; + /** Stdin handle — defaults to `process.stdin`. */ + stdin: DriveStdin; + /** Local terminal size source — defaults to `process.stdout`. */ + terminal: DriveTerminal; +} + +function withDefaults(overrides: Partial = {}): DriveDependencies { + return { + readConnectionFile: readConnectionFileFromDisk, + getDefaultStateDir: defaultStateDir, + env: process.env, + createWebSocket: (url, headers) => new WebSocket(url, { headers }) as DriveWebSocket, + writeChunk: (chunk) => { + process.stdout.write(chunk); + }, + onSignal: (signal, handler) => { + process.on(signal, () => runSignalHandler(handler)); + }, + log: (...args: unknown[]) => console.error(...args), + error: (...args: unknown[]) => console.error(...args), + exit: defaultExit, + fetch: (input, init) => fetch(input, init), + captureAndRenderSnapshot, + stdin: process.stdin as DriveStdin, + terminal: { + getSize: () => { + // process.stdout.isTTY is `true | undefined`; reading + // rows/columns on a non-TTY returns `undefined`. + const stdout = process.stdout; + if (!stdout.isTTY) return null; + const rows = stdout.rows; + const cols = stdout.columns; + if (typeof rows !== 'number' || typeof cols !== 'number') return null; + return { rows, cols }; + }, + onResize: (handler) => { + // Node automatically translates SIGWINCH into a `'resize'` + // event on `process.stdout` when stdout is a TTY. + process.stdout.on('resize', handler); + return () => process.stdout.off('resize', handler); + }, + }, + ...overrides, + }; +} + +/** Build the `X-API-Key` header set, or an empty object when no key. */ +function authHeaders(connection: BrokerConnection): Record { + return connection.apiKey ? { 'X-API-Key': connection.apiKey } : {}; +} + +/** ----- HTTP helpers ----- */ + +/** `GET /api/spawned/{name}/mode` → `'human' | 'relay'` or `null` on failure. */ +export async function getSessionMode( + connection: BrokerConnection, + agentName: string, + fetchFn: typeof globalThis.fetch +): Promise { + const url = `${connection.url}/api/spawned/${encodeURIComponent(agentName)}/mode`; + try { + const res = await fetchFn(url, { headers: authHeaders(connection) }); + if (!res.ok) return null; + const body = (await res.json()) as { mode?: unknown }; + if (body.mode === 'human' || body.mode === 'relay') return body.mode; + return null; + } catch { + return null; + } +} + +/** Outcome of a `PUT /api/spawned/{name}/mode` call. */ +export interface SetSessionModeResult { + ok: boolean; + status: number; + /** Server-reported number of pending messages drained on a `human→relay` flip. */ + flushed?: number; + /** Human-readable error message when `ok` is false. */ + message?: string; +} + +export async function setSessionMode( + connection: BrokerConnection, + agentName: string, + mode: SessionMode, + fetchFn: typeof globalThis.fetch +): Promise { + const url = `${connection.url}/api/spawned/${encodeURIComponent(agentName)}/mode`; + try { + const res = await fetchFn(url, { + method: 'PUT', + headers: { ...authHeaders(connection), 'Content-Type': 'application/json' }, + body: JSON.stringify({ mode }), + }); + if (!res.ok) { + let message = `HTTP ${res.status}`; + try { + const body = (await res.json()) as { error?: unknown }; + if (typeof body.error === 'string') message = body.error; + } catch { + // body wasn't JSON; stick with HTTP status + } + return { ok: false, status: res.status, message }; + } + let flushed: number | undefined; + try { + const body = (await res.json()) as { flushed?: unknown }; + if (typeof body.flushed === 'number') flushed = body.flushed; + } catch { + // missing body is fine — mode flip still succeeded + } + return { ok: true, status: res.status, flushed }; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + return { ok: false, status: 0, message }; + } +} + +/** `GET /api/spawned/{name}/pending` → count, or `0` on failure (best-effort). */ +export async function getPendingCount( + connection: BrokerConnection, + agentName: string, + fetchFn: typeof globalThis.fetch +): Promise { + const url = `${connection.url}/api/spawned/${encodeURIComponent(agentName)}/pending`; + try { + const res = await fetchFn(url, { headers: authHeaders(connection) }); + if (!res.ok) return 0; + const body = (await res.json()) as { pending?: unknown }; + return Array.isArray(body.pending) ? body.pending.length : 0; + } catch { + return 0; + } +} + +/** `POST /api/spawned/{name}/flush` → server returns `{ flushed: N }`. */ +export async function flushPending( + connection: BrokerConnection, + agentName: string, + fetchFn: typeof globalThis.fetch +): Promise<{ ok: boolean; flushed?: number; message?: string }> { + const url = `${connection.url}/api/spawned/${encodeURIComponent(agentName)}/flush`; + try { + const res = await fetchFn(url, { method: 'POST', headers: authHeaders(connection) }); + if (!res.ok) { + return { ok: false, message: `HTTP ${res.status}` }; + } + try { + const body = (await res.json()) as { flushed?: unknown }; + const flushed = typeof body.flushed === 'number' ? body.flushed : undefined; + return { ok: true, flushed }; + } catch { + return { ok: true }; + } + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + return { ok: false, message }; + } +} + +/** `POST /api/input/{name}` body `{ data: "" }`. */ +export async function sendInput( + connection: BrokerConnection, + agentName: string, + data: string, + fetchFn: typeof globalThis.fetch +): Promise<{ ok: boolean; message?: string }> { + const url = `${connection.url}/api/input/${encodeURIComponent(agentName)}`; + try { + const res = await fetchFn(url, { + method: 'POST', + headers: { ...authHeaders(connection), 'Content-Type': 'application/json' }, + body: JSON.stringify({ data }), + }); + if (!res.ok) { + return { ok: false, message: `HTTP ${res.status}` }; + } + return { ok: true }; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + return { ok: false, message }; + } +} + +/** + * `POST /api/resize/{name}` body `{ rows, cols }`. Forwards the + * driver's local terminal dimensions so the agent's PTY (and any TUI + * running in it) sees the size the human is actually looking at. + * Called once on attach and again on every local-terminal resize. + */ +export async function resizeWorker( + connection: BrokerConnection, + agentName: string, + rows: number, + cols: number, + fetchFn: typeof globalThis.fetch +): Promise<{ ok: boolean; message?: string }> { + const url = `${connection.url}/api/resize/${encodeURIComponent(agentName)}`; + try { + const res = await fetchFn(url, { + method: 'POST', + headers: { ...authHeaders(connection), 'Content-Type': 'application/json' }, + body: JSON.stringify({ rows, cols }), + }); + if (!res.ok) { + return { ok: false, message: `HTTP ${res.status}` }; + } + return { ok: true }; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + return { ok: false, message }; + } +} + +/** ----- WS message classification ----- */ + +function isStringObject(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +/** Discriminated union of the broker events `drive` cares about. */ +export type DriveWsEvent = + | { kind: 'worker_stream'; chunk: string } + | { kind: 'delivery_queued' } + | { kind: 'agent_pending_drained'; count?: number } + | { kind: 'other' }; + +/** + * Inspect a single WebSocket frame and classify it relative to the agent + * we're driving. Non-matching / malformed frames return `{ kind: 'other' }` + * so the caller can ignore them cheaply. + * + * Exported for unit testing the filter in isolation. + */ +export function classifyWsEvent(rawMessage: string, agentName: string): DriveWsEvent { + let parsed: unknown; + try { + parsed = JSON.parse(rawMessage); + } catch { + return { kind: 'other' }; + } + if (!isStringObject(parsed)) return { kind: 'other' }; + // All three events we care about are scoped by the worker `name` field. + if (parsed.name !== agentName) return { kind: 'other' }; + + if (parsed.kind === 'worker_stream') { + const chunk = parsed.chunk; + if (typeof chunk !== 'string') return { kind: 'other' }; + return { kind: 'worker_stream', chunk }; + } + if (parsed.kind === 'delivery_queued') { + return { kind: 'delivery_queued' }; + } + if (parsed.kind === 'agent_pending_drained') { + const count = typeof parsed.count === 'number' ? parsed.count : undefined; + return { kind: 'agent_pending_drained', count }; + } + return { kind: 'other' }; +} + +/** ----- Keybind state machine ----- */ + +/** Outcome of feeding one chunk to the keybind parser. */ +export interface KeybindOutcome { + /** Bytes that should be forwarded to the agent (may be empty). */ + forward: Buffer; + /** Local actions the client should perform, in order. */ + actions: KeybindAction[]; +} + +export type KeybindAction = 'flush' | 'detach' | 'toggle_help'; + +/** + * Stateful parser that recognises the `Ctrl+B ` two-byte prefix + * sequence, plus the single-byte safety keybinds (`Ctrl+G` flush, + * `Ctrl+C` detach). + * + * The parser is intentionally tiny — no readline, no keypress — because + * the keybinds are all ASCII control characters, and pulling in a + * keypress parser would just add a dependency for no real benefit. + * + * Semantics: + * - `Ctrl+G` (0x07) → emit `flush`, never forwarded. + * - `Ctrl+C` (0x03) → emit `detach`, never forwarded. + * - `Ctrl+B` (0x02) → swallow, arm the prefix state. + * Next byte (within the same chunk OR a subsequent chunk): + * - 'd' / 'D' / 0x04 (Ctrl+D) → emit `detach`. + * - '?' → emit `toggle_help`. + * - anything else → forward the original `Ctrl+B` byte + * followed by the new byte, so the + * agent isn't deprived if the user + * hit Ctrl+B by accident. + * + * Multiple keybinds in one chunk are handled in order; bytes between + * them are forwarded normally. + */ +export class KeybindParser { + private pendingPrefix = false; + + /** Process one chunk; returns bytes to forward + actions to take. */ + feed(chunk: Buffer): KeybindOutcome { + const forward: number[] = []; + const actions: KeybindAction[] = []; + + for (const byte of chunk) { + if (this.pendingPrefix) { + this.pendingPrefix = false; + if (byte === 0x44 /* 'D' */ || byte === 0x64 /* 'd' */ || byte === 0x04 /* Ctrl+D */) { + actions.push('detach'); + continue; + } + if (byte === 0x3f /* '?' */) { + actions.push('toggle_help'); + continue; + } + // Not a recognised prefix command — forward Ctrl+B + the byte + // so the agent isn't deprived (some TUI apps use Ctrl+B for + // their own bindings). + forward.push(0x02); + forward.push(byte); + continue; + } + if (byte === 0x07 /* Ctrl+G */) { + actions.push('flush'); + continue; + } + if (byte === 0x03 /* Ctrl+C */) { + actions.push('detach'); + continue; + } + if (byte === 0x02 /* Ctrl+B */) { + this.pendingPrefix = true; + continue; + } + forward.push(byte); + } + + return { + forward: Buffer.from(forward), + actions, + }; + } + + /** Reset the parser (e.g. before tearing down). */ + reset(): void { + this.pendingPrefix = false; + } +} + +/** ----- Status line rendering ----- */ + +/** + * Render the bottom-of-terminal status line for `drive`. Uses ANSI + * save-cursor / restore-cursor so the agent's output isn't disturbed. + * + * Exported for unit testing — `runDriveSession` calls it on every + * pending-count change. + */ +export function renderStatusLine(opts: { + agentName: string; + mode: SessionMode; + pending: number; + showHelp: boolean; + /** Terminal rows — defaults to 24 if unknown. The status line lands on row N. */ + rows?: number; +}): string { + const row = Math.max(opts.rows ?? 24, 1); + const help = opts.showHelp + ? ' | Ctrl+G flush | Ctrl+B D detach | Ctrl+B ? hide help' + : ' | Ctrl+G flush | Ctrl+B D detach'; + const text = `[drive ${opts.agentName} | mode=${opts.mode} | pending=${opts.pending}${help}]`; + // ESC 7 = save cursor; ESC[;1H = move to bottom row; ESC[2K = clear line; + // ESC[7m = reverse video; ESC[0m = reset; ESC 8 = restore cursor. + return `\x1b7\x1b[${row};1H\x1b[2K\x1b[7m${text}\x1b[0m\x1b8`; +} + +/** ----- Main session runner ----- */ + +/** + * Open a `drive` session. Resolves with the exit code the CLI should + * propagate. Cleans up its own stdin raw-mode and best-effort restores + * the worker's previous session mode on any exit path. + */ +export async function runDriveSession( + agentName: string, + options: { brokerUrl?: string; apiKey?: string; stateDir?: string }, + deps: DriveDependencies +): Promise { + if (!agentName.trim()) { + deps.error('Error: agent name is required'); + return 1; + } + + const connection = resolveBrokerConnection(options, deps); + if (!connection) { + deps.error( + 'Error: could not locate broker connection. Pass --broker-url, set RELAY_BROKER_URL, ' + + 'or run from a directory containing .agent-relay/connection.json.' + ); + return 1; + } + + // Remember the worker's prior mode so we can restore it on detach. + // `null` means we couldn't read it (broker hiccup or worker missing); + // we default the restore target to `relay` in that case so the queue + // doesn't keep growing. + const previousMode = await getSessionMode(connection, agentName, deps.fetch); + + // Flip the worker into human mode. If this fails outright, abort + // before doing anything else — we don't want to redraw the screen + // and then silently keep auto-injecting into the agent. + const flip = await setSessionMode(connection, agentName, 'human', deps.fetch); + if (!flip.ok) { + if (flip.status === 404) { + deps.error(`Error: no agent named '${agentName}'`); + } else { + deps.error(`Error: could not switch '${agentName}' to human mode: ${flip.message ?? 'unknown error'}`); + } + return 1; + } + + // Render the agent's current visible screen before the live stream + // begins. Same error semantics as `view`: hard errors abort, transient + // errors warn and proceed. + const snapshot = await deps.captureAndRenderSnapshot( + { url: connection.url, apiKey: connection.apiKey }, + agentName, + { fetch: deps.fetch, writeChunk: deps.writeChunk } + ); + switch (snapshot.status) { + case 'ok': + break; + case 'not_found': + // Best-effort restore — we did flip the mode above. + await setSessionMode(connection, agentName, previousMode ?? 'relay', deps.fetch); + deps.error(`Error: ${snapshot.message ?? `no agent named '${agentName}'`}`); + return 1; + case 'no_pty': + await setSessionMode(connection, agentName, previousMode ?? 'relay', deps.fetch); + deps.error(`Error: ${snapshot.message ?? `agent '${agentName}' has no PTY to drive`}`); + return 1; + case 'unavailable': + case 'transport_error': + deps.log( + `[drive] could not capture initial screen (${snapshot.message ?? snapshot.status}); streaming live output only` + ); + break; + } + + // Seed the pending counter so the status line is correct from the + // first paint. + let pending = await getPendingCount(connection, agentName, deps.fetch); + let showHelp = false; + + // Status-line row tracks the LOCAL terminal's bottom row, not the + // agent's PTY rows from the snapshot — those can differ before we + // forward our size to the broker, and the status line needs to land + // where the human is looking. Falls back to the snapshot rows, then + // the renderer's own 24-row default. + const initialLocalSize = deps.terminal.getSize(); + let terminalRows: number | undefined = + initialLocalSize?.rows ?? + (typeof snapshot.rows === 'number' && snapshot.rows > 0 ? snapshot.rows : undefined); + + const paintStatus = (): void => { + deps.writeChunk( + renderStatusLine({ + agentName, + mode: 'human', + pending, + showHelp, + rows: terminalRows, + }) + ); + }; + paintStatus(); + + // Sync the agent's PTY to the driver's local terminal size. tmux / + // screen / ssh all do this — without it, a TUI in the agent renders + // into whatever 24×80 box the PTY was spawned with, ignoring the + // human's actual viewport. Best-effort: a failure here is annoying + // but not fatal (the human can still type, output just renders into + // the old size). Skipped entirely when stdout isn't a TTY. + if (initialLocalSize) { + const initialResize = await resizeWorker( + connection, + agentName, + initialLocalSize.rows, + initialLocalSize.cols, + deps.fetch + ); + if (!initialResize.ok) { + deps.log( + `[drive] could not sync agent PTY size to local terminal (${initialResize.message ?? 'unknown'}); continuing` + ); + } + } + + const wsUrl = toWsUrl(connection.url); + const headers: Record = {}; + if (connection.apiKey) { + headers['X-API-Key'] = connection.apiKey; + } + + return new Promise((resolve) => { + let settled = false; + let rawModeWasSet = false; + let unsubscribeResize: (() => void) | null = null; + const parser = new KeybindParser(); + + // Local-terminal resize handler. Forwards to the broker and + // repaints the status line at the new bottom-row index. Registered + // on `socket.on('open')` (same point we take over stdin) so a + // failed connection doesn't leave a dangling listener; unregistered + // in `teardownStdin` so detach is clean. + const resizeHandler = (): void => { + const size = deps.terminal.getSize(); + if (!size) return; + terminalRows = size.rows; + void resizeWorker(connection, agentName, size.rows, size.cols, deps.fetch).then((res) => { + if (!res.ok) { + deps.log(`[drive] resize forward failed: ${res.message ?? 'unknown error'}`); + } + }); + // Repaint regardless of fetch outcome — the local terminal has + // already moved, so the status line position needs to move with + // it whether or not the broker accepted the resize. + paintStatus(); + }; + + // ---- stdin handling ---- + const stdinDataHandler = (chunk: Buffer): void => { + const outcome = parser.feed(chunk); + if (outcome.forward.length > 0) { + // Fire-and-forget; surface errors via log but don't block the + // event loop on every keystroke. + // UTF-8, not latin1 — the broker deserializes /api/input's + // `data` as a Rust `String` and forwards the bytes verbatim. + // 'binary' would map bytes ≥ 0x80 to Latin-1 code points, + // which then get UTF-8 re-encoded on the wire, doubling + // multi-byte characters (e.g. `é` → `é` on the agent's side). + void sendInput(connection, agentName, outcome.forward.toString('utf-8'), deps.fetch).then((res) => { + if (!res.ok) { + deps.log(`[drive] input send failed: ${res.message ?? 'unknown error'}`); + } + }); + } + for (const action of outcome.actions) { + switch (action) { + case 'flush': + void flushPending(connection, agentName, deps.fetch).then((res) => { + if (!res.ok) { + deps.log(`[drive] flush failed: ${res.message ?? 'unknown error'}`); + } + }); + break; + case 'detach': + finish(0); + return; + case 'toggle_help': + showHelp = !showHelp; + paintStatus(); + break; + } + } + }; + + const teardownStdin = (): void => { + try { + if (deps.stdin.off) { + deps.stdin.off('data', stdinDataHandler); + } else if (deps.stdin.removeListener) { + deps.stdin.removeListener('data', stdinDataHandler); + } + } catch { + // best effort + } + try { + if (rawModeWasSet && typeof deps.stdin.setRawMode === 'function') { + deps.stdin.setRawMode(false); + } + } catch { + // best effort + } + try { + deps.stdin.pause(); + } catch { + // best effort + } + try { + if (unsubscribeResize) { + unsubscribeResize(); + unsubscribeResize = null; + } + } catch { + // best effort + } + rawModeWasSet = false; + }; + + const finish = (code: number): void => { + if (settled) return; + settled = true; + teardownStdin(); + try { + socket.close(1000, 'drive client exiting'); + } catch { + // best effort + } + // Best-effort: restore the worker's previous mode so we don't + // leave it stuck in human and silently piling up queued messages. + void setSessionMode(connection, agentName, previousMode ?? 'relay', deps.fetch).finally(() => { + resolve(code); + }); + }; + + const socket = deps.createWebSocket(wsUrl, headers); + + deps.onSignal('SIGINT', () => finish(0)); + deps.onSignal('SIGTERM', () => finish(0)); + + socket.on('open', () => { + deps.log(`[drive] driving ${agentName} via ${connection.url} (Ctrl+B D to detach)`); + // Now that the WS is up, take over stdin. We do this on `open` + // rather than synchronously so a failed connection doesn't leave + // the user's terminal in raw mode with nothing to type into. + try { + if (typeof deps.stdin.setRawMode === 'function' && deps.stdin.isTTY !== false) { + deps.stdin.setRawMode(true); + rawModeWasSet = true; + } + deps.stdin.resume(); + deps.stdin.on('data', stdinDataHandler); + // Subscribe to local-terminal resize events at the same point + // we take over stdin so the lifecycles match — both go away in + // `teardownStdin` on any exit path. + unsubscribeResize = deps.terminal.onResize(resizeHandler); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + deps.error(`[drive] could not enable raw input mode: ${message}`); + finish(1); + } + }); + + socket.on('message', (data) => { + const text = + typeof data === 'string' ? data : Buffer.isBuffer(data) ? data.toString('utf-8') : String(data); + const event = classifyWsEvent(text, agentName); + switch (event.kind) { + case 'worker_stream': + deps.writeChunk(event.chunk); + // Repaint the status line so the worker's writes don't + // obscure it. Cheap — it's just an ANSI escape sequence. + paintStatus(); + break; + case 'delivery_queued': + pending += 1; + paintStatus(); + break; + case 'agent_pending_drained': + pending = 0; + paintStatus(); + break; + case 'other': + break; + } + }); + + socket.on('error', (err: Error) => { + deps.error(`[drive] WebSocket error: ${err.message}`); + }); + + socket.on('close', (code: number, reason: Buffer) => { + if (settled) return; + const reasonText = reason && reason.length > 0 ? reason.toString('utf-8') : ''; + if (code === 1000 || code === 1005) { + finish(0); + } else { + deps.error(`[drive] connection closed (code: ${code}${reasonText ? `, reason: ${reasonText}` : ''})`); + finish(1); + } + }); + }); +} + +/** Register `agent-relay drive ` on the supplied commander program. */ +export function registerDriveCommands(program: Command, overrides: Partial = {}): void { + const deps = withDefaults(overrides); + + program + .command('drive') + .description( + 'Take interactive control of a running agent: queue inbound relay messages, type into the worker, flush on demand, detach when done' + ) + .argument('', 'Agent name to drive') + .option('--broker-url ', 'Broker base URL (overrides RELAY_BROKER_URL and connection.json)') + .option('--api-key ', 'Broker API key (overrides RELAY_BROKER_API_KEY and connection.json)') + .option('--state-dir ', 'Directory containing connection.json (default: .agent-relay/)') + .action(async (name: string, options: { brokerUrl?: string; apiKey?: string; stateDir?: string }) => { + const code = await runDriveSession(name, options, deps); + if (code !== 0) { + deps.exit(code); + } + }); +} diff --git a/src/cli/lib/broker-connection.test.ts b/src/cli/lib/broker-connection.test.ts new file mode 100644 index 000000000..41fece78c --- /dev/null +++ b/src/cli/lib/broker-connection.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { resolveBrokerConnection, toWsUrl, type BrokerConnectionDeps } from './broker-connection.js'; + +function makeDeps(overrides: Partial = {}): BrokerConnectionDeps { + return { + readConnectionFile: vi.fn(() => null), + getDefaultStateDir: vi.fn(() => '/tmp/fake/.agent-relay'), + env: {}, + ...overrides, + }; +} + +describe('resolveBrokerConnection', () => { + it('prefers --broker-url over env and connection.json', () => { + const deps = makeDeps({ + env: { RELAY_BROKER_URL: 'http://env-host:1234' }, + readConnectionFile: vi.fn(() => ({ url: 'http://file-host:5678', api_key: 'file-key' })), + }); + const conn = resolveBrokerConnection({ brokerUrl: 'http://flag-host:9999' }, deps); + expect(conn).toEqual({ url: 'http://flag-host:9999', apiKey: 'file-key' }); + }); + + it('uses RELAY_BROKER_URL when no flag is provided', () => { + const deps = makeDeps({ + env: { RELAY_BROKER_URL: 'http://env-host:1234', RELAY_BROKER_API_KEY: 'env-key' }, + readConnectionFile: vi.fn(() => ({ url: 'http://file-host:5678', api_key: 'file-key' })), + }); + const conn = resolveBrokerConnection({}, deps); + expect(conn).toEqual({ url: 'http://env-host:1234', apiKey: 'env-key' }); + }); + + it('falls back to connection.json for both url and api_key', () => { + const deps = makeDeps({ + readConnectionFile: vi.fn(() => ({ url: 'http://file-host:5678/', api_key: 'file-key' })), + }); + const conn = resolveBrokerConnection({}, deps); + expect(conn).toEqual({ url: 'http://file-host:5678', apiKey: 'file-key' }); + }); + + it('returns null when no source provides a URL', () => { + expect(resolveBrokerConnection({}, makeDeps())).toBeNull(); + }); + + // ---- Regression: empty-trim falls through (cubic P2 finding) ---- + + it('falls through to env URL when --broker-url is blank/whitespace', () => { + const deps = makeDeps({ + env: { RELAY_BROKER_URL: 'http://env-host:1234' }, + }); + // `' '.trim()` is `''`, which is not nullish — `??` would have + // kept it as the URL. The trim-empty filter must fall through. + const conn = resolveBrokerConnection({ brokerUrl: ' ' }, deps); + expect(conn?.url).toBe('http://env-host:1234'); + }); + + it('falls through to env API key when --api-key is blank/whitespace', () => { + const deps = makeDeps({ + env: { RELAY_BROKER_API_KEY: 'env-key' }, + readConnectionFile: vi.fn(() => ({ url: 'http://localhost:3889' })), + }); + const conn = resolveBrokerConnection({ apiKey: ' ' }, deps); + expect(conn?.apiKey).toBe('env-key'); + }); + + it('falls through to file URL when env URL is blank/whitespace', () => { + const deps = makeDeps({ + env: { RELAY_BROKER_URL: ' ' }, + readConnectionFile: vi.fn(() => ({ url: 'http://file-host:5678' })), + }); + const conn = resolveBrokerConnection({}, deps); + expect(conn?.url).toBe('http://file-host:5678'); + }); + + it('falls through to file API key when env API key is blank/whitespace', () => { + const deps = makeDeps({ + env: { RELAY_BROKER_API_KEY: ' ' }, + readConnectionFile: vi.fn(() => ({ url: 'http://localhost:3889', api_key: 'file-key' })), + }); + const conn = resolveBrokerConnection({}, deps); + expect(conn?.apiKey).toBe('file-key'); + }); +}); + +describe('toWsUrl', () => { + it('rewrites http://host:port to ws://host:port/ws', () => { + expect(toWsUrl('http://localhost:3889')).toBe('ws://localhost:3889/ws'); + }); + + it('rewrites https://… to wss://…/ws', () => { + expect(toWsUrl('https://broker.example.com')).toBe('wss://broker.example.com/ws'); + }); +}); diff --git a/src/cli/lib/broker-connection.ts b/src/cli/lib/broker-connection.ts new file mode 100644 index 000000000..a9f071ef7 --- /dev/null +++ b/src/cli/lib/broker-connection.ts @@ -0,0 +1,111 @@ +/** + * Shared broker-connection discovery for the attach-style CLI verbs + * (`view`, `drive`, and the upcoming `relay` from issue #864). + * + * Resolution order matches `agent-relay-broker dump-pty` so users don't + * have to learn two patterns: + * + * 1. `--broker-url` / `--api-key` CLI flags + * 2. `RELAY_BROKER_URL` / `RELAY_BROKER_API_KEY` environment variables + * 3. `/connection.json` (default `.agent-relay/connection.json`) + */ + +import fs from 'node:fs'; +import path from 'node:path'; + +import { getProjectPaths } from '@agent-relay/config'; + +/** Connection metadata discovered from `connection.json` or CLI/env overrides. */ +export interface BrokerConnection { + url: string; + apiKey?: string; +} + +/** Options the caller may have parsed from CLI flags. */ +export interface BrokerConnectionOptions { + brokerUrl?: string; + apiKey?: string; + stateDir?: string; +} + +/** Injectable bits — tests stub these out instead of touching disk / env. */ +export interface BrokerConnectionDeps { + readConnectionFile: (stateDir: string) => unknown; + getDefaultStateDir: () => string; + env: NodeJS.ProcessEnv; +} + +/** Read `/connection.json` from disk, returning the parsed JSON or `null`. */ +export function readConnectionFileFromDisk(stateDir: string): unknown { + const connPath = path.join(stateDir, 'connection.json'); + try { + const raw = fs.readFileSync(connPath, 'utf-8'); + return JSON.parse(raw) as unknown; + } catch { + return null; + } +} + +/** Default state-directory: `.agent-relay/` under the resolved project root. */ +export function defaultStateDir(): string { + const projectRoot = getProjectPaths().projectRoot; + return path.join(projectRoot, '.agent-relay'); +} + +function isStringObject(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +function readString(obj: unknown, key: string): string | undefined { + if (!isStringObject(obj)) return undefined; + const value = obj[key]; + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + return trimmed === '' ? undefined : trimmed; +} + +/** + * Trim a possibly-undefined string and treat empty results as + * `undefined` so `??` chains correctly fall through to lower-priority + * sources. Plain `value?.trim()` would yield `""` for blank inputs, + * which is not nullish — that would let an empty `--broker-url` flag + * silently override a real `RELAY_BROKER_URL` env var, etc. + */ +function trimOrUndefined(value: string | undefined): string | undefined { + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + return trimmed === '' ? undefined : trimmed; +} + +/** + * Resolve the broker connection in priority order. Returns `null` when no + * source provides a URL — the caller decides how to surface that. + */ +export function resolveBrokerConnection( + options: BrokerConnectionOptions, + deps: BrokerConnectionDeps +): BrokerConnection | null { + const explicitUrl = trimOrUndefined(options.brokerUrl); + const envUrl = trimOrUndefined(deps.env.RELAY_BROKER_URL); + const stateDir = options.stateDir ? path.resolve(options.stateDir) : deps.getDefaultStateDir(); + const connectionFile = deps.readConnectionFile(stateDir); + const fileUrl = readString(connectionFile, 'url'); + + const url = explicitUrl ?? envUrl ?? fileUrl; + if (!url) return null; + + const explicitKey = trimOrUndefined(options.apiKey); + const envKey = trimOrUndefined(deps.env.RELAY_BROKER_API_KEY); + const fileKey = readString(connectionFile, 'api_key'); + const apiKey = explicitKey ?? envKey ?? fileKey; + + return { + url: url.replace(/\/+$/, ''), + apiKey, + }; +} + +/** Convert an `http(s)://host:port` base URL to the matching `ws(s)://…/ws`. */ +export function toWsUrl(baseUrl: string): string { + return `${baseUrl.replace(/^http/, 'ws')}/ws`; +} diff --git a/web/content/docs/reference-cli.mdx b/web/content/docs/reference-cli.mdx index 600b32415..da1503cbe 100644 --- a/web/content/docs/reference-cli.mdx +++ b/web/content/docs/reference-cli.mdx @@ -71,3 +71,61 @@ Flags: If the initial snapshot can't be served (broker hiccup, transient timeout) `view` prints a warning to stderr and falls through to the live stream — you may briefly see a blank screen until the agent next produces output. If the agent doesn't exist or has no PTY (headless worker), `view` aborts with an explanatory error. `view` is part of the broker-owned agent model described in [issue #864](https://github.com/AgentWorkforce/relay/issues/864). It shares the snapshot-on-attach helper with the upcoming `drive` and `relay` verbs so all three open with a faithful redraw of the agent's current screen. + +## `drive` + +Take interactive control of a running agent. `drive` flips the worker's session mode to `human` so the broker parks new inbound relay messages in a per-worker FIFO queue, forwards your keystrokes to the worker's PTY, lets you drain the queue on demand, and detaches cleanly without killing the agent. + +```bash +agent-relay drive reviewer +``` + +The client auto-discovers the running broker the same way `view` does (`--broker-url` → `RELAY_BROKER_URL` → `.agent-relay/connection.json`). On attach it: + +1. Records the worker's current session mode so it can be restored on detach. +2. Calls `PUT /api/spawned/{name}/mode` with `{ "mode": "human" }` to start queueing relay messages. +3. Captures and renders the agent's current visible screen via `GET /api/spawned/{name}/snapshot?format=ansi`. +4. Calls `GET /api/spawned/{name}/pending` once to seed the status-line counter. +5. Opens a WebSocket to `/ws` and subscribes to `worker_stream`, `delivery_queued`, and `agent_pending_drained` events for this agent. +6. Switches local stdin to raw mode and forwards keystrokes to `POST /api/input/{name}`. +7. Syncs the agent's PTY dimensions to your local terminal via `POST /api/resize/{name}` and forwards every subsequent `SIGWINCH`. Without this, a TUI in the agent would render into whatever 24×80 box the PTY was spawned with regardless of how big your terminal actually is. Skipped entirely when stdout isn't a TTY. + +On detach (clean or abnormal), `drive` best-effort restores the worker's previous session mode so the queue doesn't fill up indefinitely, and leaves the agent running under the broker. + +Flags: + +| Flag | Required | Description | +| ---- | -------- | ----------- | +| `--broker-url ` | no | Broker base URL. Falls back to `RELAY_BROKER_URL`, then `.agent-relay/connection.json`. | +| `--api-key ` | no | Broker API key. Falls back to `RELAY_BROKER_API_KEY`, then the `api_key` field in `connection.json`. | +| `--state-dir ` | no | Directory containing `connection.json` to use for auto-discovery. Defaults to `.agent-relay/` under the project root. | + +Keybinds: + +| Keys | Action | +| ---- | ------ | +| `Ctrl+G` | Flush the pending queue (`POST /api/spawned/{name}/flush`). The broker injects each queued message in FIFO order and emits `agent_pending_drained`. | +| `Ctrl+B` then `D` / `d` / `Ctrl+D` | Detach cleanly. Restores the worker's prior session mode, closes the WebSocket, exits 0. The parser accepts uppercase, lowercase, or `Ctrl+D` after the `Ctrl+B` prefix. | +| `Ctrl+B` then `?` | Toggle a small help hint inside the status line. | +| `Ctrl+C` | Safety alias for detach. `drive` never kills the agent — use `agent-relay agents:kill` for that. | +| `Ctrl+B` then any other key | Forwarded to the agent unchanged so TUI apps that use `Ctrl+B` themselves aren't deprived. | + +Status line: `drive` paints a one-line summary on the bottom row of your terminal using ANSI save/restore-cursor escapes so the agent's output isn't disturbed. It shows the agent name, current session mode, and the number of relay messages currently queued. The line repaints whenever the queue size changes (`delivery_queued` / `agent_pending_drained`) or the agent emits new output. + +``` +[drive reviewer | mode=human | pending=3 | Ctrl+G flush | Ctrl+B D detach] +``` + +Example session: + +```bash +# Take over the `reviewer` agent +agent-relay drive reviewer +# … type instructions interactively, watch the queue grow as other agents +# send messages, flush them when you're ready with Ctrl+G, detach with +# Ctrl+B D when you're done. +``` + +`drive` is single-driver-assumed: if two `drive` clients attach to the same worker, last writer wins on keystrokes and they both see the same WebSocket stream. Multi-driver coordination is tracked in a follow-up issue. + +`drive` is part of the broker-owned agent model described in [issue #864](https://github.com/AgentWorkforce/relay/issues/864). The auto-inject sibling `relay` verb is coming in a follow-up PR.