diff --git a/packages/acp-bridge/src/acp-agent.ts b/packages/acp-bridge/src/acp-agent.ts index 64c423c09..f2588a2ee 100644 --- a/packages/acp-bridge/src/acp-agent.ts +++ b/packages/acp-bridge/src/acp-agent.ts @@ -59,6 +59,8 @@ class CircularDedupeCache { * ACP Agent that bridges to Agent Relay */ export class RelayACPAgent implements acp.Agent { + private static readonly RECONNECT_COOLDOWN_MS = 10_000; + private readonly config: ACPBridgeConfig; private relayClient: RelayClient | null = null; private connection: acp.AgentSideConnection | null = null; @@ -66,6 +68,8 @@ export class RelayACPAgent implements acp.Agent { private messageBuffer = new Map(); private dedupeCache = new CircularDedupeCache(2000); private closedSessionIds = new Set(); + private reconnectPromise: Promise | null = null; + private lastReconnectAttempt = 0; constructor(config: ACPBridgeConfig) { this.config = config; @@ -88,8 +92,60 @@ export class RelayACPAgent implements acp.Agent { } this.relayClient = new RelayClient(relayConfig); + this.setupRelayHandlers(); + + try { + await this.relayClient.connect(); + this.debug('Connected to relay daemon via SDK'); + + // Subscribe to #general channel to receive broadcast messages + this.relayClient.subscribe('#general'); + this.debug('Subscribed to #general channel'); + } catch (err) { + this.debug('Failed to connect to relay daemon via SDK:', err); + // Continue anyway - we can still function without relay + } + + // Create ACP connection over stdio using ndJsonStream + const readable = this.nodeToWebReadable(process.stdin); + const writable = this.nodeToWebWritable(process.stdout); + const stream = acp.ndJsonStream(writable, readable); + + // Create connection with agent factory + this.connection = new acp.AgentSideConnection((conn) => { + // Store connection reference for later use + this.connection = conn; + return this; + }, stream); + + this.debug('ACP agent started'); + + // Keep alive by waiting for connection to close + await this.connection.closed; + } + + /** + * Stop the agent + */ + async stop(): Promise { + // Clean up all sessions to prevent memory leaks + this.sessions.clear(); + this.messageBuffer.clear(); + this.dedupeCache.clear(); + this.closedSessionIds.clear(); + + this.relayClient?.destroy(); + this.relayClient = null; + this.connection = null; + this.debug('ACP agent stopped'); + } + + /** + * Wire up message, channel, state change, and error handlers on the relay client. + */ + private setupRelayHandlers(): void { + if (!this.relayClient) return; - // Set up message handlers this.relayClient.onMessage = (from, payload, messageId) => { if (typeof payload.body !== 'string') { return; @@ -105,11 +161,9 @@ export class RelayACPAgent implements acp.Agent { }); }; - // Handle channel messages (e.g., #general) this.relayClient.onChannelMessage = (from, channel, body) => { this.debug('Received channel message:', from, channel, body.substring(0, 50)); - // Route channel messages to all sessions this.handleRelayMessage({ id: `channel-${randomUUID()}`, from: `${from} [${channel}]`, @@ -120,56 +174,101 @@ export class RelayACPAgent implements acp.Agent { this.relayClient.onStateChange = (state) => { this.debug('Relay client state:', state); + + // Re-subscribe to #general on successful reconnect + if (state === 'READY' && this.relayClient) { + this.relayClient.subscribe('#general'); + this.debug('Re-subscribed to #general after reconnect'); + } }; this.relayClient.onError = (error) => { this.debug('Relay client error:', error); }; + } - try { - await this.relayClient.connect(); - this.debug('Connected to relay daemon via SDK'); + /** + * Attempt to reconnect to the relay daemon. + * Creates a fresh RelayClient to reset reconnect attempt counters. + */ + private async reconnectToRelay(): Promise { + this.debug('Attempting to reconnect to relay daemon...'); - // Subscribe to #general channel to receive broadcast messages - this.relayClient.subscribe('#general'); - this.debug('Subscribed to #general channel'); - } catch (err) { - this.debug('Failed to connect to relay daemon via SDK:', err); - // Continue anyway - we can still function without relay + if (this.relayClient) { + this.relayClient.destroy(); + this.relayClient = null; } - // Create ACP connection over stdio using ndJsonStream - const readable = this.nodeToWebReadable(process.stdin); - const writable = this.nodeToWebWritable(process.stdout); - const stream = acp.ndJsonStream(writable, readable); + const relayConfig: Partial = { + agentName: this.config.agentName, + program: '@agent-relay/acp-bridge', + cli: 'acp-bridge', + quiet: true, + }; - // Create connection with agent factory - this.connection = new acp.AgentSideConnection((conn) => { - // Store connection reference for later use - this.connection = conn; - return this; - }, stream); + if (this.config.socketPath) { + relayConfig.socketPath = this.config.socketPath; + } - this.debug('ACP agent started'); + this.relayClient = new RelayClient(relayConfig); + this.setupRelayHandlers(); - // Keep alive by waiting for connection to close - await this.connection.closed; + try { + await this.relayClient.connect(); + this.debug('Reconnected to relay daemon'); + this.relayClient.subscribe('#general'); + this.debug('Re-subscribed to #general channel'); + return true; + } catch (err) { + this.debug('Failed to reconnect to relay daemon:', err); + return false; + } } /** - * Stop the agent + * Ensure the relay client is connected and ready. + * If disconnected (SDK exhausted reconnect attempts), creates a fresh client. + * If in a transient state, waits briefly for READY. */ - async stop(): Promise { - // Clean up all sessions to prevent memory leaks - this.sessions.clear(); - this.messageBuffer.clear(); - this.dedupeCache.clear(); - this.closedSessionIds.clear(); + private async ensureRelayReady(): Promise { + if (this.relayClient?.state === 'READY') { + return true; + } - this.relayClient?.destroy(); - this.relayClient = null; - this.connection = null; - this.debug('ACP agent stopped'); + if (!this.relayClient || this.relayClient.state === 'DISCONNECTED') { + // Deduplicate concurrent reconnect attempts + if (this.reconnectPromise) { + return this.reconnectPromise; + } + + // Enforce cooldown to avoid rapid reconnect attempts + const elapsed = Date.now() - this.lastReconnectAttempt; + if (elapsed < RelayACPAgent.RECONNECT_COOLDOWN_MS) { + return false; + } + + this.lastReconnectAttempt = Date.now(); + this.reconnectPromise = this.reconnectToRelay().finally(() => { + this.reconnectPromise = null; + }); + return this.reconnectPromise; + } + + // In a transient state (CONNECTING, HANDSHAKING, BACKOFF) — wait briefly. + // State changes asynchronously during await, so re-check the full type. + const client = this.relayClient; + const waitMs = 5000; + const startWait = Date.now(); + while (Date.now() - startWait < waitMs) { + const currentState = client.state as string; + if (currentState === 'READY') return true; + if (currentState === 'DISCONNECTED') { + return this.ensureRelayReady(); + } + await this.sleep(100); + } + + return false; } /** @@ -364,8 +463,7 @@ export class RelayACPAgent implements acp.Agent { }; } - if (!this.relayClient || this.relayClient.state !== 'READY') { - // If not connected to relay, return a helpful message + if (!await this.ensureRelayReady()) { await this.connection.sessionUpdate({ sessionId, update: { @@ -426,16 +524,18 @@ export class RelayACPAgent implements acp.Agent { }); // Send to specific agents or broadcast + // relayClient is guaranteed non-null after ensureRelayReady() succeeds + const relay = this.relayClient!; let sent = false; if (hasTargets) { // Send to each mentioned agent for (const target of targets) { - const result = this.relayClient.sendMessage(target, cleanMessage, 'message', undefined, session.id); + const result = relay.sendMessage(target, cleanMessage, 'message', undefined, session.id); if (result) sent = true; } } else { // Broadcast to all agents - sent = this.relayClient.sendMessage('*', userMessage, 'message', undefined, session.id); + sent = relay.sendMessage('*', userMessage, 'message', undefined, session.id); } if (!sent) { @@ -670,7 +770,7 @@ export class RelayACPAgent implements acp.Agent { return true; } - if (!this.relayClient || this.relayClient.state !== 'READY') { + if (!await this.ensureRelayReady()) { await this.sendTextUpdate(sessionId, 'Relay daemon is not connected (cannot spawn).'); return true; } @@ -679,7 +779,7 @@ export class RelayACPAgent implements acp.Agent { await this.sendTextUpdate(sessionId, `Spawning ${name} (${cli})${task ? `: ${task}` : ''}`); try { - const result = await this.relayClient.spawn({ + const result = await this.relayClient!.spawn({ name, cli, task, @@ -706,7 +806,7 @@ export class RelayACPAgent implements acp.Agent { return true; } - if (!this.relayClient || this.relayClient.state !== 'READY') { + if (!await this.ensureRelayReady()) { await this.sendTextUpdate(sessionId, 'Relay daemon is not connected (cannot release).'); return true; } @@ -714,7 +814,7 @@ export class RelayACPAgent implements acp.Agent { await this.sendTextUpdate(sessionId, `Releasing ${name}...`); try { - const result = await this.relayClient.release(name); + const result = await this.relayClient!.release(name); if (result.success) { await this.sendTextUpdate(sessionId, `Released ${name}.`); } else { @@ -728,13 +828,13 @@ export class RelayACPAgent implements acp.Agent { } private async handleListAgentsCommand(sessionId: string): Promise { - if (!this.relayClient || this.relayClient.state !== 'READY') { + if (!await this.ensureRelayReady()) { await this.sendTextUpdate(sessionId, 'Relay daemon is not connected (cannot list agents).'); return true; } try { - const agents = await this.relayClient.listConnectedAgents(); + const agents = await this.relayClient!.listConnectedAgents(); if (!agents.length) { await this.sendTextUpdate(sessionId, 'No agents are currently connected.'); } else {