fix(broker): self-heal wedged brokers and quiet shutdown-race noise#67
Conversation
Two unrelated issues surfaced together in the logs: 1. `broker:list-agents` timed out every 30s forever. The in-progress revive logic only matched ECONNREFUSED/ECONNRESET, but a *wedged* broker (alive, accepting TCP, never answering HTTP) produces a TimeoutError DOMException, which slipped past it — so nothing ever recovered it and every poll re-spammed "Error occurred in handler". listAgents now distinguishes the two failure modes: connection refused = dead, respawn immediately; timeout = possibly slow, count consecutive timeouts per project and respawn after MAX (2). Below the threshold it rethrows so the renderer keeps its stale agent list instead of flickering to empty. The counter resets on any successful poll, on revive, and on shutdown. 2. "PTY input stream failed for X; falling back to HTTP input" was logged on every app quit. An `input_stream_closed` rejection only happens when we deliberately tear the stream down (shutdown, project /agent close, re-attach) while a keystroke is in flight — an expected close, not a transport failure. Fall through to HTTP silently for that code; real failures still warn. Also refactored the cause-chain walk into a shared someInCauseChain() helper used by both the unreachable and timeout detectors. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Free Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds broker error classification and per-project revival (dedupe + timeout counting), safe process termination and restart (reviveSession), integrates revive behavior into listAgents with retry/degrade logic, silences an expected PTY WS race in sendInput, and clears timeout state on session drop. ChangesBroker Session Revival with Error Classification
Sequence DiagramsequenceDiagram
participant Client
participant ListAgents
participant BrokerManager
participant ErrorClassifier
participant BrokerSession
participant Broker
Client->>ListAgents: listAgents(projectId)
ListAgents->>BrokerSession: collectSessionAgents()
BrokerSession->>Broker: request agents
Broker--xBrokerSession: connection error / timeout
BrokerSession--xListAgents: bubbled error
ListAgents->>ErrorClassifier: classify error
ErrorClassifier-->>ListAgents: unreachable | timeout | other
alt Unreachable
ListAgents->>BrokerManager: reviveSession(projectId)
BrokerManager->>BrokerSession: dropSession + terminateOwnedBrokerProcess()
BrokerManager->>BrokerSession: start(new port)
BrokerManager-->>ListAgents: revived?
ListAgents->>BrokerSession: retry collectSessionAgents()
else Timeout (below threshold)
ListAgents->>BrokerManager: increment brokerTimeoutCounts
ListAgents-->>Client: return other projects' agents
else Timeout (at/above threshold)
ListAgents->>BrokerManager: reviveSession(projectId)
end
alt Revive succeeds
BrokerManager->>ListAgents: clear brokerTimeoutCounts
ListAgents-->>Client: return agents
else Revive fails
ListAgents-->>Client: return empty agents for project
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Note 🎁 Summarized by CodeRabbit FreeYour organization is on the Free plan. CodeRabbit will generate a high-level summary and a walkthrough for each pull request. For a comprehensive line-by-line review, please upgrade your subscription to CodeRabbit Pro by visiting https://app.coderabbit.ai/login. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces self-healing capabilities for local broker processes by detecting unreachable or wedged brokers and automatically restarting them on fresh ports. The review feedback highlights several critical issues to address: refining isBrokerTimeoutError to avoid false positives from standard manual aborts, introducing an asynchronous helper to await active start or revive promises to prevent transient errors during concurrent listAgents calls, and ensuring active revive promises are cleaned up during shutdown to avoid race conditions with manual shutdowns.
| function isBrokerTimeoutError(err: unknown): boolean { | ||
| return someInCauseChain(err, (node) => { | ||
| if (node.code === 'ETIMEDOUT') return true | ||
| if (node.name === 'TimeoutError' || node.name === 'AbortError') return true | ||
| const message = node.message | ||
| return typeof message === 'string' && /aborted due to timeout|operation was aborted|ETIMEDOUT/i.test(message) | ||
| }) | ||
| } |
There was a problem hiding this comment.
The current implementation of isBrokerTimeoutError incorrectly matches standard manual aborts (such as those triggered by component unmounts or user navigation) because it checks for node.name === 'AbortError' and matches the generic message "operation was aborted". This can lead to false positive timeout counts and spurious broker restarts when navigating the app. It should be refined to only match actual timeout errors (TimeoutError, ETIMEDOUT, or messages explicitly mentioning timeout).
| function isBrokerTimeoutError(err: unknown): boolean { | |
| return someInCauseChain(err, (node) => { | |
| if (node.code === 'ETIMEDOUT') return true | |
| if (node.name === 'TimeoutError' || node.name === 'AbortError') return true | |
| const message = node.message | |
| return typeof message === 'string' && /aborted due to timeout|operation was aborted|ETIMEDOUT/i.test(message) | |
| }) | |
| } | |
| function isBrokerTimeoutError(err: unknown): boolean { | |
| return someInCauseChain(err, (node) => { | |
| if (node.code === 'ETIMEDOUT') return true | |
| if (node.name === 'TimeoutError') return true | |
| const message = node.message | |
| return typeof message === 'string' && /aborted due to timeout|ETIMEDOUT|timed?out/i.test(message) | |
| }) | |
| } |
| private async reviveSession(projectId: string): Promise<boolean> { | ||
| const existing = this.revivePromises.get(projectId) | ||
| if (existing) return existing | ||
|
|
||
| const session = this.sessions.get(projectId) | ||
| if (!session) return false | ||
| // Cloud sessions can't be re-spawned locally — they live in a remote | ||
| // sandbox and are owned by CloudAgentManager. | ||
| if (session.cloudSandboxId) return false | ||
| const win = session.window | ||
| if (!win || win.isDestroyed()) return false | ||
|
|
||
| const { cwd, name, channels } = session | ||
| const promise = (async () => { | ||
| console.warn(`[broker] Broker for project ${projectId} is unreachable; restarting on a fresh port`) | ||
| await this.shutdown(projectId) | ||
| await this.start(projectId, cwd, name, win, channels) | ||
| return this.sessions.has(projectId) | ||
| })() | ||
| this.revivePromises.set(projectId, promise) | ||
| try { | ||
| return await promise | ||
| } catch (err) { | ||
| console.error(`[broker] Failed to revive broker for project ${projectId}:`, toErrorMessage(err)) | ||
| return false | ||
| } finally { | ||
| if (this.revivePromises.get(projectId) === promise) { | ||
| this.revivePromises.delete(projectId) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
When a broker is being revived, this.sessions.delete(projectId) is called during shutdown. Any concurrent scoped listAgents(projectId) call will immediately throw "Relay workspace not started — select the project first" because getSessionForProject is synchronous and throws if the session is missing. To prevent these transient errors during background self-healing, we should introduce an asynchronous getOrAwaitSession helper that awaits any active revivePromises or startPromises before returning the session.
Additionally, we should guard against a resurrection race condition where a manual shutdown is overridden by an active background revive by checking if the revive promise is still active before calling this.start.
private async reviveSession(projectId: string): Promise<boolean> {
const existing = this.revivePromises.get(projectId)
if (existing) return existing
const session = this.sessions.get(projectId)
if (!session) return false
// Cloud sessions can't be re-spawned locally — they live in a remote
// sandbox and are owned by CloudAgentManager.
if (session.cloudSandboxId) return false
const win = session.window
if (!win || win.isDestroyed()) return false
const { cwd, name, channels } = session
const promise = (async () => {
console.warn(`[broker] Broker for project ${projectId} is unreachable; restarting on a fresh port`)
await this.shutdown(projectId)
if (this.revivePromises.get(projectId) !== promise) return false
await this.start(projectId, cwd, name, win, channels)
return this.sessions.has(projectId)
})()
this.revivePromises.set(projectId, promise)
try {
return await promise
} catch (err) {
console.error(`[broker] Failed to revive broker for project ${projectId}:`, toErrorMessage(err))
return false
} finally {
if (this.revivePromises.get(projectId) === promise) {
this.revivePromises.delete(projectId)
}
}
}
private async getOrAwaitSession(projectId: string): Promise<BrokerSession> {
const normalizedProjectId = projectId.trim()
const revivePromise = this.revivePromises.get(normalizedProjectId)
if (revivePromise) {
await revivePromise.catch(() => undefined)
}
const startPromise = this.startPromises.get(normalizedProjectId)
if (startPromise) {
await startPromise.catch(() => undefined)
}
const session = this.sessions.get(normalizedProjectId)
if (!session) {
throw new Error('Relay workspace not started — select the project first')
}
return session
}| const sessions = projectId ? [this.getSessionForProject(projectId)] : Array.from(this.sessions.values()) | ||
| const results = await Promise.all( | ||
| sessions.map(async (session) => { | ||
| const agents = await session.client.listAgents() | ||
| for (const agent of agents) { | ||
| this.rememberAgentProject(agent.name, session.projectId) | ||
| try { | ||
| return await this.collectSessionAgents(session) | ||
| } catch (err) { | ||
| // A dead broker (connection refused) is a definitive signal — respawn | ||
| // immediately. A wedged broker (request timeout) might just be a slow | ||
| // response, so only respawn after MAX consecutive timeouts; below the | ||
| // threshold we rethrow so the renderer keeps its stale agent list | ||
| // rather than flickering to empty on a transient blip. | ||
| const unreachable = isBrokerUnreachableError(err) | ||
| if (!unreachable) { | ||
| if (!isBrokerTimeoutError(err)) throw err | ||
| const timeouts = (this.brokerTimeoutCounts.get(session.projectId) ?? 0) + 1 | ||
| this.brokerTimeoutCounts.set(session.projectId, timeouts) | ||
| if (timeouts < MAX_BROKER_TIMEOUTS_BEFORE_REVIVE) throw err | ||
| console.warn( | ||
| `[broker] listAgents: broker for project ${session.projectId} timed out ${timeouts}x; ` + | ||
| `restarting it on a fresh port` | ||
| ) | ||
| } | ||
| // Restart on a fresh port and retry once against the new session; if | ||
| // recovery fails, degrade to an empty list for this project rather | ||
| // than failing the whole call (other projects may still be healthy). | ||
| this.brokerTimeoutCounts.delete(session.projectId) | ||
| const revived = await this.reviveSession(session.projectId) | ||
| const next = revived ? this.sessions.get(session.projectId) : undefined | ||
| if (!next) { | ||
| console.warn(`[broker] listAgents: broker for project ${session.projectId} is unreachable; returning no agents`) | ||
| return [] | ||
| } | ||
| return this.collectSessionAgents(next) | ||
| } | ||
| return Promise.all( | ||
| agents.map(async (agent) => { | ||
| const inboundDeliveryMode = await session.client.getInboundDeliveryMode(agent.name).catch(() => undefined) | ||
| return { ...agent, projectId: session.projectId, inboundDeliveryMode } | ||
| }) | ||
| ) | ||
| }) | ||
| ) | ||
| return results.flat() | ||
| } |
There was a problem hiding this comment.
Update listAgents to use the new getOrAwaitSession helper. This ensures that any scoped or unscoped listAgents calls made while a background revive or start is in progress will gracefully await the operation's completion instead of throwing transient errors or silently omitting the project.
let sessions: BrokerSession[]
if (projectId) {
try {
const session = await this.getOrAwaitSession(projectId)
sessions = [session]
} catch (err) {
throw err
}
} else {
const activeProjectIds = Array.from(new Set([
...this.sessions.keys(),
...this.revivePromises.keys(),
...this.startPromises.keys()
]))
sessions = (await Promise.all(
activeProjectIds.map((id) => this.getOrAwaitSession(id).catch(() => undefined))
)).filter((s): s is BrokerSession => !!s)
}
const results = await Promise.all(
sessions.map(async (session) => {
try {
return await this.collectSessionAgents(session)
} catch (err) {
// A dead broker (connection refused) is a definitive signal — respawn
// immediately. A wedged broker (request timeout) might just be a slow
// response, so only respawn after MAX consecutive timeouts; below the
// threshold we rethrow so the renderer keeps its stale agent list
// rather than flickering to empty on a transient blip.
const unreachable = isBrokerUnreachableError(err)
if (!unreachable) {
if (!isBrokerTimeoutError(err)) throw err
const timeouts = (this.brokerTimeoutCounts.get(session.projectId) ?? 0) + 1
this.brokerTimeoutCounts.set(session.projectId, timeouts)
if (timeouts < MAX_BROKER_TIMEOUTS_BEFORE_REVIVE) throw err
console.warn(
`[broker] listAgents: broker for project ${session.projectId} timed out ${timeouts}x; ` +
`restarting it on a fresh port`
)
}
// Restart on a fresh port and retry once against the new session; if
// recovery fails, degrade to an empty list for this project rather
// than failing the whole call (other projects may still be healthy).
this.brokerTimeoutCounts.delete(session.projectId)
const revived = await this.reviveSession(session.projectId)
const next = revived ? this.sessions.get(session.projectId) : undefined
if (!next) {
console.warn(`[broker] listAgents: broker for project ${session.projectId} is unreachable; returning no agents`)
return []
}
return this.collectSessionAgents(next)
}
})
)
return results.flat()
}| const targetProjectIds = projectId ? [projectId] : Array.from(this.sessions.keys()) | ||
| for (const targetProjectId of targetProjectIds) { | ||
| this.closeInputStreamsForProject(targetProjectId) | ||
| this.brokerTimeoutCounts.delete(targetProjectId) | ||
|
|
||
| const session = this.sessions.get(targetProjectId) | ||
| if (!session) continue |
There was a problem hiding this comment.
Delete the active revive promise from this.revivePromises during shutdown to ensure that any active background revive is cancelled and does not resurrect the broker session after a manual shutdown has been explicitly requested.
| const targetProjectIds = projectId ? [projectId] : Array.from(this.sessions.keys()) | |
| for (const targetProjectId of targetProjectIds) { | |
| this.closeInputStreamsForProject(targetProjectId) | |
| this.brokerTimeoutCounts.delete(targetProjectId) | |
| const session = this.sessions.get(targetProjectId) | |
| if (!session) continue | |
| const targetProjectIds = projectId ? [projectId] : Array.from(this.sessions.keys()) | |
| for (const targetProjectId of targetProjectIds) { | |
| this.closeInputStreamsForProject(targetProjectId) | |
| this.brokerTimeoutCounts.delete(targetProjectId) | |
| this.revivePromises.delete(targetProjectId) | |
| const session = this.sessions.get(targetProjectId) | |
| if (!session) continue |
| console.warn(`[broker] listAgents: broker for project ${session.projectId} is unreachable; returning no agents`) | ||
| return [] | ||
| } | ||
| return this.collectSessionAgents(next) |
There was a problem hiding this comment.
🟡 Unhandled throw from post-revive collectSessionAgents breaks Promise.all for all projects
The comment at line 1905-1907 states the intent: "if recovery fails, degrade to an empty list for this project rather than failing the whole call (other projects may still be healthy)." However, this.collectSessionAgents(next) at line 1915 is not wrapped in a try/catch. If the newly spawned broker is also immediately unresponsive (e.g. port conflict, binary crash), this call throws from inside the catch block, propagating through Promise.all at line 1884 and rejecting the entire listAgents call for ALL sessions. In a multi-project setup, one persistently-failing project prevents agent list updates for all healthy projects on that poll cycle — contradicting the stated graceful-degradation intent.
| return this.collectSessionAgents(next) | |
| try { | |
| return await this.collectSessionAgents(next) | |
| } catch { | |
| return [] | |
| } |
Was this helpful? React with 👍 or 👎 to provide feedback.
|
✅ pr-reviewer applied fixes — committed and pushed Reviewed PR #67 and fixed the broker recovery path in src/main/broker.ts. Changes made:
Checks run:
Note: direct |
There was a problem hiding this comment.
✅ pr-reviewer applied fixes — committed and pushed c7837eb to this PR. The notes below describe what changed.
Reviewed PR #67 and fixed the broker recovery path in src/main/broker.ts.
Changes made:
- Added best-effort owned broker process termination before revive, so a wedged local broker cannot keep the runtime lock and block replacement startup.
- Changed revive teardown to disconnect/drop the stale session instead of waiting on a broker HTTP shutdown that may itself be wedged.
- Kept cloud sandbox sessions out of local respawn recovery so remote broker failures are surfaced instead of silently becoming an empty agent list.
- Factored session cleanup through
dropSession()so shutdown and revive clear the same local maps consistently.
Checks run:
node --experimental-strip-types --check src/main/broker.tsnpm testnpm run build
Note: direct npx tsc --noEmit -p tsconfig.node.json still reports existing unrelated repo type errors outside this PR path; the actual build completed successfully.
|
✅ pr-reviewer applied fixes — committed and pushed Fixed the PR review findings in
Checks run:
|
There was a problem hiding this comment.
✅ pr-reviewer applied fixes — committed and pushed ba16852 to this PR. The notes below describe what changed.
Fixed the PR review findings in src/main/broker.ts:
- Narrowed broker timeout detection so ordinary
AbortError/ manual aborts do not count toward broker revive. - Added session awaiting during
listAgentsso activestart/reviveoperations do not cause transient missing-session errors. - Prevented manual shutdown from being undone by an in-flight revive.
- Wrapped the post-revive
collectSessionAgentsretry so one still-bad project returns[]instead of rejecting all projects. - Avoided iterator spreads that break this repo’s current Node TS target.
Checks run:
node --experimental-strip-types --check src/main/broker.tspassednpm testpassednpm run buildpassednpx tsc --noEmit -p tsconfig.node.jsonstill fails on existing unrelated repo errors, but nobroker.tserrors remain
* fix(broker): generalize wedge recovery to all polled reads PR #67 added self-healing to listAgents, but the broker can wedge a single HTTP endpoint while others stay live — `/api/pending` hangs while `/api/spawned` keeps answering (the broker's relaycast event loop runs fine the whole time). So listAgents never notices, never respawns, and getPending times out on every 2.5s poll forever, flooding the main log with "Error occurred in handler for 'broker:get-pending'". - Extract the listAgents recovery into a reusable withWedgeRecovery() helper (refused → respawn now; repeated timeouts → respawn after MAX; unrecoverable → degrade to a fallback instead of rejecting). listAgents now delegates to it. - Route getPendingMessages through it with degradeOnTimeout: true (an empty held-message list is harmless, so a timeout returns [] rather than rejecting and logging) and await getOrAwaitSession so a poll racing a respawn doesn't throw "workspace not started". - Renderer: guard the 2.5s getPending poll with an in-flight ref. A wedged broker makes each call hang for its full 30s timeout; without the guard the interval stacks ~12 concurrent calls that all time out at once — which is why the log filled with hundreds of identical lines. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * chore: apply pr-reviewer fixes for #68 --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: agent-relay-bot[bot] <agent-relay-bot[bot]@users.noreply.github.com>
Why
Two unrelated issues showed up together in the logs:
1. Wedged broker never recovered (the repeating
list-agentstimeouts)listAgents→request('/api/spawned')uses the SDK transport'sAbortSignal.timeout(30_000). A full 30s timeout (vs. an instantECONNREFUSED) means the broker process is alive and accepting the TCP connection but never answering — i.e. wedged, not dead.The in-progress revive logic (
isBrokerUnreachableError) only matchedECONNREFUSED/ECONNRESET, so aTimeoutErrorDOMExceptionslipped past it — nothing ever restarted the wedged broker, and every poll (syncBrokerSnapshotfires on broker events, on connect, and on mount) re-spammedError occurred in handler for 'broker:list-agents'on the main side (Electron logs the rejected handler even though the renderer swallows it).2. Misleading shutdown-race warning
The
PTY input stream failed … falling back to HTTP inputline bottoms out inshutdownBrokerOnce → shutdown → closeInputStreamsForProject → closeInputStream → PtyInputStream.close. On app quit, closing the stream rejects any in-flightsend()withinput_stream_closed, which lands insendInput's catch and logs as a failure — but the whole session is being destroyed, so the "fallback" is meaningless noise.What changed (
src/main/broker.ts)someInCauseChain()helper.isBrokerTimeoutError()(matchesTimeoutError/AbortError,ETIMEDOUT, and the "aborted due to timeout" message through the cause chain).listAgentsnow distinguishes failure modes:MAX_BROKER_TIMEOUTS_BEFORE_REVIVE = 2. Below the threshold it rethrows so the renderer keeps its stale agent list instead of flickering to empty.collectSessionAgents), when a revive is triggered, and onshutdown.isInputStreamClosedError()and guarded thesendInputwarning so an expected close (shutdown / project-or-agent close / terminal re-attach) falls through to HTTP silently. Real transport failures still warn.Net effect: a wedged broker self-restarts on a fresh port within ~2 poll cycles and the handler-error spam stops on its own, instead of repeating every 30s forever; app-quit logs no longer carry the spurious PTY warning.
Notes
requestTimeoutMsalone — shortening it would speed wedge detection but is transport-wide and would clip legitimately slow ops (spawn, etc.).electron-vite buildpasses. No broker unit tests exist insrc/main/__tests__/.🤖 Generated with Claude Code