Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 89 additions & 21 deletions src/main/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ const COMMIT_DRAFT_MAX_DIFF_CHARS = 80_000
const COMMIT_DRAFT_TIMEOUT_MS = 180_000
const MAX_BROKER_EVENT_HISTORY = 3_000
const BROKER_EVENT_HISTORY_TTL_MS = 12 * 60 * 60 * 1_000
// After this many consecutive failures to open a PTY input stream, give up on
// the WS fast path for that agent and send over HTTP until it re-attaches.
const MAX_INPUT_STREAM_OPEN_FAILURES = 3

function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
Expand Down Expand Up @@ -485,6 +488,13 @@ export class BrokerManager {
private agentProjects = new Map<string, Set<string>>()
private inputStreams = new Map<string, PtyInputStream>()
private inputStreamFallbacks = new Set<string>()
// Keys whose WS input stream has completed the broker's pty_input_ready
// handshake — only these are safe to send on without blocking. Everything
// else routes over HTTP until the stream is confirmed open.
private inputStreamReady = new Set<string>()
// Consecutive background open failures per key; after MAX we stop retrying the
// WS for this agent (HTTP-only) until the terminal is re-attached.
private inputStreamOpenFailures = new Map<string, number>()
private eventObservers = new Set<BrokerEventObserver>()
private eventHistory: BrokerEventRecord[] = []
private eventSerial = 0
Expand Down Expand Up @@ -983,42 +993,90 @@ export class BrokerManager {
return `${projectId}:${name}`
}

private getOrOpenInputStream(session: BrokerSession, name: string): PtyInputStream {
// Returns the input stream for an agent plus whether it is *ready* to send on
// (the broker has acked pty_input_ready). The WS handshake runs in the
// background and is never awaited here, so a keystroke is never blocked on the
// up-to-10s open timeout — callers send over HTTP until `ready` flips true.
private ensureInputStream(
session: BrokerSession,
name: string
): { stream: PtyInputStream; ready: boolean } {
const key = this.getInputStreamKey(session.projectId, name)
const existing = this.inputStreams.get(key)
if (existing && !existing.closed) {
return existing
return { stream: existing, ready: this.inputStreamReady.has(key) }
}

const stream = session.client.openInputStream(name)
this.inputStreams.set(key, stream)
stream.waitUntilOpen().catch(() => {
if (this.inputStreams.get(key) === stream) {
this.inputStreamReady.delete(key)
stream.waitUntilOpen().then(
() => {
if (this.inputStreams.get(key) === stream) {
this.inputStreamReady.add(key)
this.inputStreamOpenFailures.delete(key)
}
},
() => {
if (this.inputStreams.get(key) !== stream) return
this.inputStreams.delete(key)
this.inputStreamReady.delete(key)
const failures = (this.inputStreamOpenFailures.get(key) ?? 0) + 1
this.inputStreamOpenFailures.set(key, failures)
// A stream that never opens (e.g. broker never sends pty_input_ready)
// would otherwise be re-opened on every keystroke. Stop trying the WS
// after a few failures and ride HTTP until the terminal re-attaches.
// This is the one case worth surfacing: transient not-ready is normal,
// but a *persistently* unopenable stream means the low-latency fast path
// is off for this agent — log it once rather than hiding it.
if (failures >= MAX_INPUT_STREAM_OPEN_FAILURES && !this.inputStreamFallbacks.has(key)) {
console.warn(
`[broker] PTY input stream for ${name} failed to open ${failures}x; ` +
`routing input over HTTP for this agent until the terminal re-attaches`
)
}
if (failures >= MAX_INPUT_STREAM_OPEN_FAILURES) {
this.inputStreamFallbacks.add(key)
}
}
})
return stream
)
return { stream, ready: false }
}

private closeInputStream(key: string, code = 1000, reason = 'closed'): void {
const stream = this.inputStreams.get(key)
this.inputStreams.delete(key)
this.inputStreamReady.delete(key)
this.inputStreamFallbacks.delete(key)
this.inputStreamOpenFailures.delete(key)
if (stream) {
stream.close(code, reason)
}
}

// Drop any cached HTTP-only fallback + failure count for an agent so a fresh
// terminal attach gets another chance at the low-latency WS stream. Does not
// disturb a healthy open stream.
private resetInputStreamFallback(key: string): void {
this.inputStreamFallbacks.delete(key)
this.inputStreamOpenFailures.delete(key)
}

private closeInputStreamsForProject(projectId: string): void {
const prefix = `${projectId}:`
for (const key of Array.from(this.inputStreams.keys())) {
if (key.startsWith(`${projectId}:`)) {
if (key.startsWith(prefix)) {
this.closeInputStream(key, 1000, 'project closed')
}
}
for (const key of Array.from(this.inputStreamFallbacks.keys())) {
if (key.startsWith(`${projectId}:`)) {
this.inputStreamFallbacks.delete(key)
}
for (const key of Array.from(this.inputStreamFallbacks)) {
if (key.startsWith(prefix)) this.inputStreamFallbacks.delete(key)
}
for (const key of Array.from(this.inputStreamOpenFailures.keys())) {
if (key.startsWith(prefix)) this.inputStreamOpenFailures.delete(key)
}
for (const key of Array.from(this.inputStreamReady)) {
if (key.startsWith(prefix)) this.inputStreamReady.delete(key)
}
}

Expand Down Expand Up @@ -1156,6 +1214,10 @@ export class BrokerManager {

const session = this.getSessionForAgent(name, projectId)
const client = session.client
// A re-attach (window reload, restart, tab re-open) is a fresh start for
// this terminal — clear any stale HTTP-only fallback so the WS fast path
// gets retried instead of being stuck on HTTP for the agent's lifetime.
this.resetInputStreamFallback(this.getInputStreamKey(session.projectId, name))
const mode = toInboundDeliveryMode(input.mode)
let previousMode: InboundDeliveryMode | undefined

Expand Down Expand Up @@ -1227,17 +1289,23 @@ export class BrokerManager {
const session = this.getSessionForAgent(trimmedName, projectId)
const key = this.getInputStreamKey(session.projectId, trimmedName)
if (!this.inputStreamFallbacks.has(key)) {
const stream = this.getOrOpenInputStream(session, trimmedName)
try {
return await stream.send(data)
} catch (err) {
if (this.inputStreams.get(key) === stream) {
this.closeInputStream(key, 1011, 'stream send failed')
}
if (isUnsupportedInputStreamError(err)) {
this.inputStreamFallbacks.add(key)
// Kick off (or reuse) the WS stream, but only *send* on it once the broker
// has acked the handshake. Before that, fall through to HTTP so a keystroke
// never stalls on the open timeout — the symptom that made typing look dead
// after a re-attach. Subsequent keystrokes take the fast path once ready.
const { stream, ready } = this.ensureInputStream(session, trimmedName)
if (ready && !stream.closed) {
try {
return await stream.send(data)
} catch (err) {
if (this.inputStreams.get(key) === stream) {
this.closeInputStream(key, 1011, 'stream send failed')
}
if (isUnsupportedInputStreamError(err)) {
this.inputStreamFallbacks.add(key)
}
console.warn(`[broker] PTY input stream failed for ${trimmedName}; falling back to HTTP input:`, err)
}
console.warn(`[broker] PTY input stream failed for ${trimmedName}; falling back to HTTP input:`, err)
}
}
return session.client.sendInput(trimmedName, data)
Expand Down