Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 146 additions & 46 deletions packages/acp-bridge/src/acp-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ class CircularDedupeCache {
* ACP Agent that bridges to Agent Relay
*/
export class RelayACPAgent implements acp.Agent {
private static readonly RECONNECT_COOLDOWN_MS = 10_000;

private readonly config: ACPBridgeConfig;
private relayClient: RelayClient | null = null;
private connection: acp.AgentSideConnection | null = null;
private sessions = new Map<string, SessionState>();
private messageBuffer = new Map<string, RelayMessage[]>();
private dedupeCache = new CircularDedupeCache(2000);
private closedSessionIds = new Set<string>();
private reconnectPromise: Promise<boolean> | null = null;
private lastReconnectAttempt = 0;

constructor(config: ACPBridgeConfig) {
this.config = config;
Expand All @@ -88,8 +92,60 @@ export class RelayACPAgent implements acp.Agent {
}

this.relayClient = new RelayClient(relayConfig);
this.setupRelayHandlers();

try {
await this.relayClient.connect();
this.debug('Connected to relay daemon via SDK');

// Subscribe to #general channel to receive broadcast messages
this.relayClient.subscribe('#general');
this.debug('Subscribed to #general channel');
} catch (err) {
this.debug('Failed to connect to relay daemon via SDK:', err);
// Continue anyway - we can still function without relay
}

// Create ACP connection over stdio using ndJsonStream
const readable = this.nodeToWebReadable(process.stdin);
const writable = this.nodeToWebWritable(process.stdout);
const stream = acp.ndJsonStream(writable, readable);

// Create connection with agent factory
this.connection = new acp.AgentSideConnection((conn) => {
// Store connection reference for later use
this.connection = conn;
return this;
}, stream);

this.debug('ACP agent started');

// Keep alive by waiting for connection to close
await this.connection.closed;
}

/**
* Stop the agent
*/
async stop(): Promise<void> {
// Clean up all sessions to prevent memory leaks
this.sessions.clear();
this.messageBuffer.clear();
this.dedupeCache.clear();
this.closedSessionIds.clear();

this.relayClient?.destroy();
this.relayClient = null;
this.connection = null;
this.debug('ACP agent stopped');
}

/**
* Wire up message, channel, state change, and error handlers on the relay client.
*/
private setupRelayHandlers(): void {
if (!this.relayClient) return;

// Set up message handlers
this.relayClient.onMessage = (from, payload, messageId) => {
if (typeof payload.body !== 'string') {
return;
Expand All @@ -105,11 +161,9 @@ export class RelayACPAgent implements acp.Agent {
});
};

// Handle channel messages (e.g., #general)
this.relayClient.onChannelMessage = (from, channel, body) => {
this.debug('Received channel message:', from, channel, body.substring(0, 50));

// Route channel messages to all sessions
this.handleRelayMessage({
id: `channel-${randomUUID()}`,
from: `${from} [${channel}]`,
Expand All @@ -120,56 +174,101 @@ export class RelayACPAgent implements acp.Agent {

this.relayClient.onStateChange = (state) => {
this.debug('Relay client state:', state);

// Re-subscribe to #general on successful reconnect
if (state === 'READY' && this.relayClient) {
this.relayClient.subscribe('#general');
this.debug('Re-subscribed to #general after reconnect');
}
};

this.relayClient.onError = (error) => {
this.debug('Relay client error:', error);
};
}

try {
await this.relayClient.connect();
this.debug('Connected to relay daemon via SDK');
/**
* Attempt to reconnect to the relay daemon.
* Creates a fresh RelayClient to reset reconnect attempt counters.
*/
private async reconnectToRelay(): Promise<boolean> {
this.debug('Attempting to reconnect to relay daemon...');

// Subscribe to #general channel to receive broadcast messages
this.relayClient.subscribe('#general');
this.debug('Subscribed to #general channel');
} catch (err) {
this.debug('Failed to connect to relay daemon via SDK:', err);
// Continue anyway - we can still function without relay
if (this.relayClient) {
this.relayClient.destroy();
this.relayClient = null;
}

// Create ACP connection over stdio using ndJsonStream
const readable = this.nodeToWebReadable(process.stdin);
const writable = this.nodeToWebWritable(process.stdout);
const stream = acp.ndJsonStream(writable, readable);
const relayConfig: Partial<ClientConfig> = {
agentName: this.config.agentName,
program: '@agent-relay/acp-bridge',
cli: 'acp-bridge',
quiet: true,
};

// Create connection with agent factory
this.connection = new acp.AgentSideConnection((conn) => {
// Store connection reference for later use
this.connection = conn;
return this;
}, stream);
if (this.config.socketPath) {
relayConfig.socketPath = this.config.socketPath;
}

this.debug('ACP agent started');
this.relayClient = new RelayClient(relayConfig);
this.setupRelayHandlers();

// Keep alive by waiting for connection to close
await this.connection.closed;
try {
await this.relayClient.connect();
this.debug('Reconnected to relay daemon');
this.relayClient.subscribe('#general');
this.debug('Re-subscribed to #general channel');
return true;
} catch (err) {
this.debug('Failed to reconnect to relay daemon:', err);
return false;
}
}

/**
* Stop the agent
* Ensure the relay client is connected and ready.
* If disconnected (SDK exhausted reconnect attempts), creates a fresh client.
* If in a transient state, waits briefly for READY.
*/
async stop(): Promise<void> {
// Clean up all sessions to prevent memory leaks
this.sessions.clear();
this.messageBuffer.clear();
this.dedupeCache.clear();
this.closedSessionIds.clear();
private async ensureRelayReady(): Promise<boolean> {
if (this.relayClient?.state === 'READY') {
return true;
}

this.relayClient?.destroy();
this.relayClient = null;
this.connection = null;
this.debug('ACP agent stopped');
if (!this.relayClient || this.relayClient.state === 'DISCONNECTED') {
// Deduplicate concurrent reconnect attempts
if (this.reconnectPromise) {
return this.reconnectPromise;
}

// Enforce cooldown to avoid rapid reconnect attempts
const elapsed = Date.now() - this.lastReconnectAttempt;
if (elapsed < RelayACPAgent.RECONNECT_COOLDOWN_MS) {
return false;
}

this.lastReconnectAttempt = Date.now();
this.reconnectPromise = this.reconnectToRelay().finally(() => {
this.reconnectPromise = null;
});
return this.reconnectPromise;
}

// In a transient state (CONNECTING, HANDSHAKING, BACKOFF) — wait briefly.
// State changes asynchronously during await, so re-check the full type.
const client = this.relayClient;
const waitMs = 5000;
const startWait = Date.now();
while (Date.now() - startWait < waitMs) {
const currentState = client.state as string;
if (currentState === 'READY') return true;
if (currentState === 'DISCONNECTED') {
return this.ensureRelayReady();
}
await this.sleep(100);
}

return false;
}

/**
Expand Down Expand Up @@ -364,8 +463,7 @@ export class RelayACPAgent implements acp.Agent {
};
}

if (!this.relayClient || this.relayClient.state !== 'READY') {
// If not connected to relay, return a helpful message
if (!await this.ensureRelayReady()) {
await this.connection.sessionUpdate({
sessionId,
update: {
Expand Down Expand Up @@ -426,16 +524,18 @@ export class RelayACPAgent implements acp.Agent {
});

// Send to specific agents or broadcast
// relayClient is guaranteed non-null after ensureRelayReady() succeeds
const relay = this.relayClient!;
let sent = false;
if (hasTargets) {
// Send to each mentioned agent
for (const target of targets) {
const result = this.relayClient.sendMessage(target, cleanMessage, 'message', undefined, session.id);
const result = relay.sendMessage(target, cleanMessage, 'message', undefined, session.id);
if (result) sent = true;
}
} else {
// Broadcast to all agents
sent = this.relayClient.sendMessage('*', userMessage, 'message', undefined, session.id);
sent = relay.sendMessage('*', userMessage, 'message', undefined, session.id);
}

if (!sent) {
Expand Down Expand Up @@ -670,7 +770,7 @@ export class RelayACPAgent implements acp.Agent {
return true;
}

if (!this.relayClient || this.relayClient.state !== 'READY') {
if (!await this.ensureRelayReady()) {
await this.sendTextUpdate(sessionId, 'Relay daemon is not connected (cannot spawn).');
return true;
}
Expand All @@ -679,7 +779,7 @@ export class RelayACPAgent implements acp.Agent {
await this.sendTextUpdate(sessionId, `Spawning ${name} (${cli})${task ? `: ${task}` : ''}`);

try {
const result = await this.relayClient.spawn({
const result = await this.relayClient!.spawn({
name,
cli,
task,
Expand All @@ -706,15 +806,15 @@ export class RelayACPAgent implements acp.Agent {
return true;
}

if (!this.relayClient || this.relayClient.state !== 'READY') {
if (!await this.ensureRelayReady()) {
await this.sendTextUpdate(sessionId, 'Relay daemon is not connected (cannot release).');
return true;
}

await this.sendTextUpdate(sessionId, `Releasing ${name}...`);

try {
const result = await this.relayClient.release(name);
const result = await this.relayClient!.release(name);
if (result.success) {
await this.sendTextUpdate(sessionId, `Released ${name}.`);
} else {
Expand All @@ -728,13 +828,13 @@ export class RelayACPAgent implements acp.Agent {
}

private async handleListAgentsCommand(sessionId: string): Promise<boolean> {
if (!this.relayClient || this.relayClient.state !== 'READY') {
if (!await this.ensureRelayReady()) {
await this.sendTextUpdate(sessionId, 'Relay daemon is not connected (cannot list agents).');
return true;
}

try {
const agents = await this.relayClient.listConnectedAgents();
const agents = await this.relayClient!.listConnectedAgents();
if (!agents.length) {
await this.sendTextUpdate(sessionId, 'No agents are currently connected.');
} else {
Expand Down
Loading