diff --git a/src/main/broker.ts b/src/main/broker.ts index 8ccbc43..f61fd90 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -111,6 +111,9 @@ const COMMIT_DRAFT_MAX_DIFF_CHARS = 80_000 const COMMIT_DRAFT_TIMEOUT_MS = 180_000 const MAX_BROKER_EVENT_HISTORY = 3_000 const BROKER_EVENT_HISTORY_TTL_MS = 12 * 60 * 60 * 1_000 +// After this many consecutive failures to open a PTY input stream, give up on +// the WS fast path for that agent and send over HTTP until it re-attaches. +const MAX_INPUT_STREAM_OPEN_FAILURES = 3 function delay(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)) @@ -485,6 +488,13 @@ export class BrokerManager { private agentProjects = new Map>() private inputStreams = new Map() private inputStreamFallbacks = new Set() + // Keys whose WS input stream has completed the broker's pty_input_ready + // handshake — only these are safe to send on without blocking. Everything + // else routes over HTTP until the stream is confirmed open. + private inputStreamReady = new Set() + // Consecutive background open failures per key; after MAX we stop retrying the + // WS for this agent (HTTP-only) until the terminal is re-attached. + private inputStreamOpenFailures = new Map() private eventObservers = new Set() private eventHistory: BrokerEventRecord[] = [] private eventSerial = 0 @@ -983,42 +993,90 @@ export class BrokerManager { return `${projectId}:${name}` } - private getOrOpenInputStream(session: BrokerSession, name: string): PtyInputStream { + // Returns the input stream for an agent plus whether it is *ready* to send on + // (the broker has acked pty_input_ready). The WS handshake runs in the + // background and is never awaited here, so a keystroke is never blocked on the + // up-to-10s open timeout — callers send over HTTP until `ready` flips true. + private ensureInputStream( + session: BrokerSession, + name: string + ): { stream: PtyInputStream; ready: boolean } { const key = this.getInputStreamKey(session.projectId, name) const existing = this.inputStreams.get(key) if (existing && !existing.closed) { - return existing + return { stream: existing, ready: this.inputStreamReady.has(key) } } const stream = session.client.openInputStream(name) this.inputStreams.set(key, stream) - stream.waitUntilOpen().catch(() => { - if (this.inputStreams.get(key) === stream) { + this.inputStreamReady.delete(key) + stream.waitUntilOpen().then( + () => { + if (this.inputStreams.get(key) === stream) { + this.inputStreamReady.add(key) + this.inputStreamOpenFailures.delete(key) + } + }, + () => { + if (this.inputStreams.get(key) !== stream) return this.inputStreams.delete(key) + this.inputStreamReady.delete(key) + const failures = (this.inputStreamOpenFailures.get(key) ?? 0) + 1 + this.inputStreamOpenFailures.set(key, failures) + // A stream that never opens (e.g. broker never sends pty_input_ready) + // would otherwise be re-opened on every keystroke. Stop trying the WS + // after a few failures and ride HTTP until the terminal re-attaches. + // This is the one case worth surfacing: transient not-ready is normal, + // but a *persistently* unopenable stream means the low-latency fast path + // is off for this agent — log it once rather than hiding it. + if (failures >= MAX_INPUT_STREAM_OPEN_FAILURES && !this.inputStreamFallbacks.has(key)) { + console.warn( + `[broker] PTY input stream for ${name} failed to open ${failures}x; ` + + `routing input over HTTP for this agent until the terminal re-attaches` + ) + } + if (failures >= MAX_INPUT_STREAM_OPEN_FAILURES) { + this.inputStreamFallbacks.add(key) + } } - }) - return stream + ) + return { stream, ready: false } } private closeInputStream(key: string, code = 1000, reason = 'closed'): void { const stream = this.inputStreams.get(key) this.inputStreams.delete(key) + this.inputStreamReady.delete(key) this.inputStreamFallbacks.delete(key) + this.inputStreamOpenFailures.delete(key) if (stream) { stream.close(code, reason) } } + // Drop any cached HTTP-only fallback + failure count for an agent so a fresh + // terminal attach gets another chance at the low-latency WS stream. Does not + // disturb a healthy open stream. + private resetInputStreamFallback(key: string): void { + this.inputStreamFallbacks.delete(key) + this.inputStreamOpenFailures.delete(key) + } + private closeInputStreamsForProject(projectId: string): void { + const prefix = `${projectId}:` for (const key of Array.from(this.inputStreams.keys())) { - if (key.startsWith(`${projectId}:`)) { + if (key.startsWith(prefix)) { this.closeInputStream(key, 1000, 'project closed') } } - for (const key of Array.from(this.inputStreamFallbacks.keys())) { - if (key.startsWith(`${projectId}:`)) { - this.inputStreamFallbacks.delete(key) - } + for (const key of Array.from(this.inputStreamFallbacks)) { + if (key.startsWith(prefix)) this.inputStreamFallbacks.delete(key) + } + for (const key of Array.from(this.inputStreamOpenFailures.keys())) { + if (key.startsWith(prefix)) this.inputStreamOpenFailures.delete(key) + } + for (const key of Array.from(this.inputStreamReady)) { + if (key.startsWith(prefix)) this.inputStreamReady.delete(key) } } @@ -1156,6 +1214,10 @@ export class BrokerManager { const session = this.getSessionForAgent(name, projectId) const client = session.client + // A re-attach (window reload, restart, tab re-open) is a fresh start for + // this terminal — clear any stale HTTP-only fallback so the WS fast path + // gets retried instead of being stuck on HTTP for the agent's lifetime. + this.resetInputStreamFallback(this.getInputStreamKey(session.projectId, name)) const mode = toInboundDeliveryMode(input.mode) let previousMode: InboundDeliveryMode | undefined @@ -1227,17 +1289,23 @@ export class BrokerManager { const session = this.getSessionForAgent(trimmedName, projectId) const key = this.getInputStreamKey(session.projectId, trimmedName) if (!this.inputStreamFallbacks.has(key)) { - const stream = this.getOrOpenInputStream(session, trimmedName) - try { - return await stream.send(data) - } catch (err) { - if (this.inputStreams.get(key) === stream) { - this.closeInputStream(key, 1011, 'stream send failed') - } - if (isUnsupportedInputStreamError(err)) { - this.inputStreamFallbacks.add(key) + // Kick off (or reuse) the WS stream, but only *send* on it once the broker + // has acked the handshake. Before that, fall through to HTTP so a keystroke + // never stalls on the open timeout — the symptom that made typing look dead + // after a re-attach. Subsequent keystrokes take the fast path once ready. + const { stream, ready } = this.ensureInputStream(session, trimmedName) + if (ready && !stream.closed) { + try { + return await stream.send(data) + } catch (err) { + if (this.inputStreams.get(key) === stream) { + this.closeInputStream(key, 1011, 'stream send failed') + } + if (isUnsupportedInputStreamError(err)) { + this.inputStreamFallbacks.add(key) + } + console.warn(`[broker] PTY input stream failed for ${trimmedName}; falling back to HTTP input:`, err) } - console.warn(`[broker] PTY input stream failed for ${trimmedName}; falling back to HTTP input:`, err) } } return session.client.sendInput(trimmedName, data)