Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .beads/issues.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
{"id":"agent-relay-5fa","title":"Add exponential backoff for daemon reconnection","description":"Implement graceful reconnection with exponential backoff delays [100, 500, 1000, 2000, 5000ms]. After max attempts, operate offline gracefully. See docs/TMUX_IMPROVEMENTS.md for implementation details.","status":"closed","priority":2,"issue_type":"feature","assignee":"LeadDev","created_at":"2025-12-20T21:28:48.055013+01:00","updated_at":"2025-12-20T21:33:42.229756+01:00","closed_at":"2025-12-20T21:33:42.229756+01:00"}
{"id":"agent-relay-5g0","title":"Heartbeat timeout could be more configurable","description":"In connection.ts:196, heartbeat timeout is hardcoded as 2x heartbeatMs. This should be independently configurable. Also, heartbeat failures immediately kill the connection - could implement exponential backoff for transient issues.","status":"closed","priority":2,"issue_type":"task","created_at":"2025-12-20T00:18:03.556614+01:00","updated_at":"2025-12-23T23:03:07.563273+01:00","closed_at":"2025-12-23T23:03:07.563273+01:00"}
{"id":"agent-relay-68a","title":"Ask pattern: request/response messaging (replyTo + deferred)","description":"Implement an RPC-like ask() over agent-relay: sender creates a short-lived deferred handle, sends message with replyTo, receiver responds resolving the deferred. Similar to swarm-mail DurableDeferred + ask/respond.","acceptance_criteria":"- ask() returns response or times out\\n- Deferreds are persisted with TTL\\n- Works across process restarts/reconnects","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-20T21:44:33.930531+01:00","updated_at":"2025-12-20T21:44:33.930531+01:00","labels":["coordination","protocol"]}
{"id":"agent-relay-6dl8","title":"Multi-line relay messages truncated in dashboard display","description":"Multi-line messages sent via -\u003erelay: are being cut off after first line when displayed in dashboard. This blocks communication between agents. Need to: (1) Verify daemon sends full message, (2) Fix dashboard rendering to handle newlines properly.","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-26T12:28:39.746942+01:00","updated_at":"2025-12-26T12:38:36.844034+01:00","closed_at":"2025-12-26T12:38:36.844034+01:00"}
{"id":"agent-relay-6mo","title":"Lead spawn capability not working (@relay:spawn pattern)","description":"The lead command advertises spawn capability via @relay:spawn pattern but it's not fully implemented. Agents started with 'agent-relay lead' should be able to spawn workers, but the parser extension mentioned in the code is not implemented.","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-23T12:14:59.492237+01:00","updated_at":"2025-12-23T12:18:14.114911+01:00","closed_at":"2025-12-23T12:18:14.114911+01:00"}
{"id":"agent-relay-6nx","title":"Add activity state tracking for better injection timing","description":"Track active/idle/disconnected state with timestamps. When session goes idle (30s no activity), trigger message injection opportunity. Improves injection timing vs current fixed 1.5s wait. See docs/TMUX_IMPROVEMENTS.md for implementation details.","status":"closed","priority":2,"issue_type":"feature","assignee":"LeadDev","created_at":"2025-12-20T21:28:46.993856+01:00","updated_at":"2025-12-20T21:33:42.223042+01:00","closed_at":"2025-12-20T21:33:42.223042+01:00"}
{"id":"agent-relay-6ny","title":"Fix message truncation: store full message in SQLite first, include ID in relay","description":"","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-19T22:04:36.168862+01:00","updated_at":"2025-12-19T22:08:28.207532+01:00","closed_at":"2025-12-19T22:08:28.207532+01:00"}
Expand Down Expand Up @@ -78,7 +79,7 @@
{"id":"agent-relay-fb3","title":"Add configurable relay prefix for Gemini compatibility","description":"Gemini CLI uses @ for file references, conflicting with @relay:. Add --prefix flag to CLI and relayPrefix config option. Auto-detect CLI type and use appropriate default (\u003e\u003e for Gemini, @relay: for Claude/Codex). Update parser to use dynamic prefix. See docs/TMUX_IMPROVEMENTS.md for full implementation plan.","status":"closed","priority":2,"issue_type":"feature","assignee":"Coordinator","created_at":"2025-12-20T21:28:52.685002+01:00","updated_at":"2025-12-20T21:40:13.066766+01:00","closed_at":"2025-12-20T21:40:13.066766+01:00"}
{"id":"agent-relay-ghy","title":"Team config: auto-spawn agents or auto-assign names from teams.json","description":"When a project has a teams.json config file, agent-relay up should either:\n\n1. **Auto-spawn option**: Automatically kick off terminal sessions for each agent defined in teams.json\n2. **Auto-assign option**: When users manually start sessions with `agent-relay -n \u003cname\u003e claude`, validate the name against teams.json and auto-assign roles/permissions\n\n## teams.json format\n```json\n{\n \"team\": \"my-project\",\n \"agents\": [\n {\"name\": \"Coordinator\", \"cli\": \"claude\", \"role\": \"coordinator\"},\n {\"name\": \"LeadDev\", \"cli\": \"claude\", \"role\": \"developer\"},\n {\"name\": \"Reviewer\", \"cli\": \"claude\", \"role\": \"reviewer\"}\n ],\n \"autoSpawn\": false\n}\n```\n\n## Behavior\n- If `autoSpawn: true`, `agent-relay up` spawns tmux sessions for each agent\n- If `autoSpawn: false`, validate names against config when agents connect\n- Store teams.json in project root or .agent-relay/teams.json\n\n## Commands\n- `agent-relay up --spawn` - force spawn all agents\n- `agent-relay up --no-spawn` - just start daemon, manual agent starts","status":"open","priority":2,"issue_type":"feature","created_at":"2025-12-19T23:13:02.482971+01:00","updated_at":"2025-12-22T22:11:38.639588+01:00"}
{"id":"agent-relay-go9","title":"PostgreSQL storage adapter not implemented","description":"In storage/adapter.ts:152-162, PostgreSQL is listed as a storage option but throws 'not yet implemented'. For production multi-node deployments, SQLite won't scale. Implement PostgreSQL adapter for distributed storage.","status":"open","priority":2,"issue_type":"feature","created_at":"2025-12-20T00:17:45.065487+01:00","updated_at":"2025-12-20T00:17:45.065487+01:00"}
{"id":"agent-relay-hgd","title":"Reliable delivery: resume/replay via cursor/checkpoint","description":"Make ACK/RESUME real: persist a per-agent delivery cursor (last delivered/acked seq) and on reconnect replay missed messages from storage. Use existing delivery.seq as stream position, similar to swarm-mail DurableCursor checkpointing.","acceptance_criteria":"- Client can reconnect and receive missed messages\\n- Duplicate suppression is deterministic (id/seq based)\\n- Cursor persists across daemon restarts","status":"in_progress","priority":1,"issue_type":"task","assignee":"BE-Dev","created_at":"2025-12-20T21:44:02.016428+01:00","updated_at":"2025-12-24T14:29:48.822052+01:00","labels":["durability","protocol"]}
{"id":"agent-relay-hgd","title":"Reliable delivery: resume/replay via cursor/checkpoint","description":"Make ACK/RESUME real: persist a per-agent delivery cursor (last delivered/acked seq) and on reconnect replay missed messages from storage. Use existing delivery.seq as stream position, similar to swarm-mail DurableCursor checkpointing.","acceptance_criteria":"- Client can reconnect and receive missed messages\\n- Duplicate suppression is deterministic (id/seq based)\\n- Cursor persists across daemon restarts","notes":"Persistent resume tokens with stored cursors; replay unacked messages on reconnect; ack tracking + dedup; tests passing.","status":"in_progress","priority":1,"issue_type":"task","assignee":"BE-Dev","created_at":"2025-12-20T21:44:02.016428+01:00","updated_at":"2025-12-26T12:13:54.547885+01:00","labels":["durability","protocol"]}
{"id":"agent-relay-hgn","title":"Agent team hierarchies with specialized sub-agents","description":"Each main agent should be able to spawn and coordinate a team of specialized sub-agents:\n\n- **Lead** → Operations Consultant (monitors behavior, provides retrospective feedback on coordination)\n- **Implementer** → Code Reviewer (reviews code before commits, suggests improvements)\n- **Designer** → UX Reviewer (validates design decisions)\n- **Architect** → Security Auditor (checks for vulnerabilities)\n\nImplementation ideas:\n1. Define agent profiles with 'team' configurations in .claude/agents/\n2. When an agent starts, it can auto-spawn its team members\n3. Sub-agents monitor the parent agent's work and provide feedback\n4. At session end, sub-agents provide retrospective summaries\n5. Use -\u003erelay:spawn to create sub-agents with specific roles\n\nThis enables each agent to have a dedicated support team for quality assurance and improvement.","status":"open","priority":2,"issue_type":"feature","created_at":"2025-12-23T15:03:56.608847+01:00","updated_at":"2025-12-23T15:03:56.608847+01:00"}
{"id":"agent-relay-hgw","title":"PR-9 Review: Add reconnection logic to MultiProjectClient","description":"","status":"closed","priority":2,"issue_type":"task","created_at":"2025-12-22T21:54:09.712003+01:00","updated_at":"2025-12-24T11:50:55.708361+01:00","closed_at":"2025-12-24T11:50:55.708361+01:00"}
{"id":"agent-relay-hks","title":"Increase test coverage for daemon/server.ts, dashboard, and CLI","description":"Current coverage is only 39% overall. Key files with 0% coverage: daemon/server.ts, dashboard/server.ts, cli/index.ts, wrapper/client.ts, wrapper/tmux-wrapper.ts, utils/project-namespace.ts. Need integration tests for the daemon startup/shutdown lifecycle and CLI commands.","status":"closed","priority":2,"issue_type":"task","assignee":"Lead","created_at":"2025-12-20T00:17:29.603137+01:00","updated_at":"2025-12-22T17:14:41.611717+01:00","closed_at":"2025-12-22T17:14:41.611717+01:00"}
Expand Down Expand Up @@ -130,7 +131,7 @@
{"id":"agent-relay-ytt","title":"Implement threaded conversations (Slack-style)","description":"Add thread replies with parent_message_id tracking in storage, nested thread UI display in dashboard, thread notification badges, and expand/collapse thread views. Backend: add parent_message_id column, thread reply routing. Frontend: inline thread expansion, reply count badges, thread-specific compose.","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-12-23T12:00:00Z","updated_at":"2025-12-23T16:37:22.987643+01:00","closed_at":"2025-12-23T16:37:22.987643+01:00"}
{"id":"agent-relay-ytu","title":"[Control Plane] Design Control API (REST + WebSocket)","description":"Design API for human control plane: POST /tasks, GET /agents, POST /agents/:id/msg, WS /stream. OpenAPI spec. Authentication endpoints. See ai-maestro's manager/worker pattern for reference.","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"}
{"id":"agent-relay-ytv","title":"[Control Plane] Implement Lead Agent orchestration","description":"Lead Agent receives human intent, breaks into tasks, assigns to specialized agents based on skills, monitors progress, escalates decisions. Can be AI-powered or rule-based initially.","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"}
{"id":"agent-relay-ytw","title":"[Control Plane] Build web dashboard v2 (fleet control)","description":"Next.js dashboard with: fleet overview, task assignment UI, agent status cards, trajectory viewer, decision queue. Learn from ai-maestro's hierarchical naming and color coding.","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"}
{"id":"agent-relay-ytw","title":"[Control Plane] Build web dashboard v2 (fleet control)","description":"Next.js dashboard with: fleet overview, task assignment UI, agent status cards, trajectory viewer, decision queue. Learn from ai-maestro's hierarchical naming and color coding.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-26T12:39:19.789712+01:00"}
{"id":"agent-relay-ytx","title":"[Control Plane] Human authentication (OAuth/magic link)","description":"Secure human access to control plane. OAuth2 (GitHub/Google) or magic link email auth. JWT tokens for API access. Role-based permissions (admin/operator/viewer).","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"}
{"id":"agent-relay-yty","title":"[Control Plane] Push notification service","description":"Real-time notifications to mobile/web: APNs for iOS, FCM for Android/web. Notify on: task completion, agent questions, escalations, errors.","status":"open","priority":3,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"}
{"id":"agent-relay-ytz","title":"[Control Plane] iPhone app MVP","description":"React Native or Swift app: fleet status view, task list, agent messaging, push notifications, decision queue with quick actions. Minimal viable version.","status":"open","priority":3,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"}
Expand Down
34 changes: 32 additions & 2 deletions src/daemon/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { describe, it, expect, vi } from 'vitest';
import type { Socket } from 'node:net';
import { Connection } from './connection.js';
import { encodeFrame } from '../protocol/framing.js';
import { PROTOCOL_VERSION, type Envelope, type HelloPayload } from '../protocol/types.js';
import { encodeFrame, FrameParser } from '../protocol/framing.js';
import { PROTOCOL_VERSION, type Envelope, type HelloPayload, type WelcomePayload } from '../protocol/types.js';

class MockSocket {
private handlers: Map<string, Array<(...args: any[]) => void>> = new Map();
Expand Down Expand Up @@ -82,6 +82,36 @@ describe('Connection', () => {
expect(socket.destroyed).toBe(true);
});

it('accepts resume token when resumeHandler returns a session', async () => {
const socket = new MockSocket();
const parser = new FrameParser();
const resumeHandler = vi.fn().mockResolvedValue({
sessionId: 'session-resume',
resumeToken: 'token-abc',
seedSequences: [{ topic: 'chat', peer: 'peerA', seq: 5 }],
});
const connection = new Connection(socket as unknown as Socket, { heartbeatMs: 50, resumeHandler });
const onActive = vi.fn();
connection.onActive = onActive;

const hello = makeHello('agent-a');
hello.payload.session = { resume_token: 'token-abc' };
socket.emit('data', encodeFrame(hello));

await new Promise((r) => setTimeout(r, 0));

expect(connection.state).toBe('ACTIVE');
expect(connection.sessionId).toBe('session-resume');
expect(connection.resumeToken).toBe('token-abc');
expect(connection.isResumed).toBe(true);
expect(connection.getNextSeq('chat', 'peerA')).toBe(6); // seeded to 5, next is 6
expect(onActive).toHaveBeenCalledTimes(1);

const welcome = parser.push(socket.written[0])?.[0] as Envelope<WelcomePayload>;
expect(welcome.payload.resume_token).toBe('token-abc');
expect(welcome.payload.session_id).toBe('session-resume');
});

describe('heartbeat timeout configuration', () => {
it('uses configurable heartbeatTimeoutMultiplier', async () => {
const socket = new MockSocket();
Expand Down
71 changes: 61 additions & 10 deletions src/daemon/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export interface ConnectionConfig {
heartbeatMs: number;
/** Multiplier for heartbeat timeout (timeout = heartbeatMs * multiplier). Default: 6 (30s with 5s heartbeat) */
heartbeatTimeoutMultiplier: number;
/** Optional handler to validate resume tokens and provide session state */
resumeHandler?: (params: { agent: string; resumeToken: string }) => Promise<{
sessionId: string;
resumeToken?: string;
seedSequences?: Array<{ topic?: string; peer: string; seq: number }>;
} | null>;
}

export const DEFAULT_CONFIG: ConnectionConfig = {
Expand All @@ -53,6 +59,7 @@ export class Connection {
private _workingDirectory?: string;
private _sessionId: string;
private _resumeToken: string;
private _isResumed = false;

private heartbeatTimer?: NodeJS.Timeout;
private lastPongReceived?: number;
Expand Down Expand Up @@ -112,6 +119,14 @@ export class Connection {
return this._sessionId;
}

get resumeToken(): string {
return this._resumeToken;
}

get isResumed(): boolean {
return this._isResumed;
}

private setupSocketHandlers(): void {
this.socket.on('data', (data) => this.handleData(data));
this.socket.on('close', () => this.handleClose());
Expand All @@ -122,18 +137,21 @@ export class Connection {
try {
const frames = this.parser.push(data);
for (const frame of frames) {
this.processFrame(frame);
this.processFrame(frame).catch((err) => {
this.sendError('BAD_REQUEST', `Frame error: ${err}`, true);
this.close();
});
}
} catch (err) {
this.sendError('BAD_REQUEST', `Frame error: ${err}`, true);
this.close();
}
}

private processFrame(envelope: Envelope): void {
private async processFrame(envelope: Envelope): Promise<void> {
switch (envelope.type) {
case 'HELLO':
this.handleHello(envelope as Envelope<HelloPayload>);
await this.handleHello(envelope as Envelope<HelloPayload>);
break;
case 'SEND':
this.handleSend(envelope as Envelope<SendPayload>);
Expand All @@ -156,7 +174,7 @@ export class Connection {
}
}

private handleHello(envelope: Envelope<HelloPayload>): void {
private async handleHello(envelope: Envelope<HelloPayload>): Promise<void> {
if (this._state !== 'HANDSHAKING') {
this.sendError('BAD_REQUEST', 'Unexpected HELLO', false);
return;
Expand All @@ -170,22 +188,44 @@ export class Connection {
this._workingDirectory = envelope.payload.workingDirectory;

// Check for session resume
if (envelope.payload.session?.resume_token) {
// Resume tokens are not persisted; tell client to start a fresh session.
this.sendError('RESUME_TOO_OLD', 'Session resume not yet supported; starting new session', false);
const resumeToken = envelope.payload.session?.resume_token;
if (resumeToken) {
if (this.config.resumeHandler) {
try {
const resumeState = await this.config.resumeHandler({
agent: this._agentName,
resumeToken,
});

if (resumeState) {
this._sessionId = resumeState.sessionId;
this._resumeToken = resumeState.resumeToken ?? resumeToken;
this._isResumed = true;

// Seed sequence counters so new deliveries continue from last known seq per stream
for (const seed of resumeState.seedSequences ?? []) {
this.seedSequence(seed.topic ?? 'default', seed.peer, seed.seq);
}
} else {
this.sendError('RESUME_TOO_OLD', 'Resume token rejected; starting new session', false);
}
} catch (err: any) {
this.sendError('RESUME_TOO_OLD', `Resume validation failed: ${err?.message ?? err}`, false);
}
} else {
this.sendError('RESUME_TOO_OLD', 'Session resume not configured; starting new session', false);
}
}

// Send WELCOME
// Note: resume_token is omitted because session resume is not yet implemented.
// Sending a token would cause clients to attempt resume on reconnect,
// triggering a RESUME_TOO_OLD -> new token -> reconnect loop.
const welcome: Envelope<WelcomePayload> = {
v: PROTOCOL_VERSION,
type: 'WELCOME',
id: uuid(),
ts: Date.now(),
payload: {
session_id: this._sessionId,
resume_token: this._resumeToken,
server: {
max_frame_bytes: this.config.maxFrameBytes,
heartbeat_ms: this.config.heartbeatMs,
Expand Down Expand Up @@ -256,6 +296,17 @@ export class Connection {
}
}

/**
* Seed a sequence counter for a stream so that the next value continues from the provided seq.
*/
seedSequence(topic: string | undefined, peer: string, seq: number): void {
const key = `${topic ?? 'default'}:${peer}`;
const current = this.sequences.get(key) ?? 0;
if (seq > current) {
this.sequences.set(key, seq);
}
}

/**
* Get next sequence number for a stream.
*/
Expand Down
Loading
Loading