From 87f6a63749e739f0f38551a4461a0ad4c7c16569 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Fri, 26 Dec 2025 12:23:47 +0100 Subject: [PATCH 1/3] backend resume work --- .beads/issues.jsonl | 2 +- src/daemon/connection.test.ts | 34 ++++++- src/daemon/connection.ts | 71 ++++++++++++-- src/daemon/router.test.ts | 72 ++++++++++++++ src/daemon/router.ts | 52 +++++++++- src/daemon/server.ts | 86 +++++++++++++---- src/dashboard/needs-attention.test.ts | 6 +- src/storage/adapter.ts | 51 +++++++++- src/storage/sqlite-adapter.test.ts | 79 +++++++++++++++ src/storage/sqlite-adapter.ts | 133 ++++++++++++++++++++++++-- src/wrapper/client.ts | 31 ++++++ src/wrapper/tmux-wrapper.ts | 26 +++++ 12 files changed, 596 insertions(+), 47 deletions(-) diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 5a622b08..074ed307 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -78,7 +78,7 @@ {"id":"agent-relay-fb3","title":"Add configurable relay prefix for Gemini compatibility","description":"Gemini CLI uses @ for file references, conflicting with @relay:. Add --prefix flag to CLI and relayPrefix config option. Auto-detect CLI type and use appropriate default (\u003e\u003e for Gemini, @relay: for Claude/Codex). Update parser to use dynamic prefix. See docs/TMUX_IMPROVEMENTS.md for full implementation plan.","status":"closed","priority":2,"issue_type":"feature","assignee":"Coordinator","created_at":"2025-12-20T21:28:52.685002+01:00","updated_at":"2025-12-20T21:40:13.066766+01:00","closed_at":"2025-12-20T21:40:13.066766+01:00"} {"id":"agent-relay-ghy","title":"Team config: auto-spawn agents or auto-assign names from teams.json","description":"When a project has a teams.json config file, agent-relay up should either:\n\n1. **Auto-spawn option**: Automatically kick off terminal sessions for each agent defined in teams.json\n2. **Auto-assign option**: When users manually start sessions with `agent-relay -n \u003cname\u003e claude`, validate the name against teams.json and auto-assign roles/permissions\n\n## teams.json format\n```json\n{\n \"team\": \"my-project\",\n \"agents\": [\n {\"name\": \"Coordinator\", \"cli\": \"claude\", \"role\": \"coordinator\"},\n {\"name\": \"LeadDev\", \"cli\": \"claude\", \"role\": \"developer\"},\n {\"name\": \"Reviewer\", \"cli\": \"claude\", \"role\": \"reviewer\"}\n ],\n \"autoSpawn\": false\n}\n```\n\n## Behavior\n- If `autoSpawn: true`, `agent-relay up` spawns tmux sessions for each agent\n- If `autoSpawn: false`, validate names against config when agents connect\n- Store teams.json in project root or .agent-relay/teams.json\n\n## Commands\n- `agent-relay up --spawn` - force spawn all agents\n- `agent-relay up --no-spawn` - just start daemon, manual agent starts","status":"open","priority":2,"issue_type":"feature","created_at":"2025-12-19T23:13:02.482971+01:00","updated_at":"2025-12-22T22:11:38.639588+01:00"} {"id":"agent-relay-go9","title":"PostgreSQL storage adapter not implemented","description":"In storage/adapter.ts:152-162, PostgreSQL is listed as a storage option but throws 'not yet implemented'. For production multi-node deployments, SQLite won't scale. Implement PostgreSQL adapter for distributed storage.","status":"open","priority":2,"issue_type":"feature","created_at":"2025-12-20T00:17:45.065487+01:00","updated_at":"2025-12-20T00:17:45.065487+01:00"} -{"id":"agent-relay-hgd","title":"Reliable delivery: resume/replay via cursor/checkpoint","description":"Make ACK/RESUME real: persist a per-agent delivery cursor (last delivered/acked seq) and on reconnect replay missed messages from storage. Use existing delivery.seq as stream position, similar to swarm-mail DurableCursor checkpointing.","acceptance_criteria":"- Client can reconnect and receive missed messages\\n- Duplicate suppression is deterministic (id/seq based)\\n- Cursor persists across daemon restarts","status":"in_progress","priority":1,"issue_type":"task","assignee":"BE-Dev","created_at":"2025-12-20T21:44:02.016428+01:00","updated_at":"2025-12-24T14:29:48.822052+01:00","labels":["durability","protocol"]} +{"id":"agent-relay-hgd","title":"Reliable delivery: resume/replay via cursor/checkpoint","description":"Make ACK/RESUME real: persist a per-agent delivery cursor (last delivered/acked seq) and on reconnect replay missed messages from storage. Use existing delivery.seq as stream position, similar to swarm-mail DurableCursor checkpointing.","acceptance_criteria":"- Client can reconnect and receive missed messages\\n- Duplicate suppression is deterministic (id/seq based)\\n- Cursor persists across daemon restarts","notes":"Persistent resume tokens with stored cursors; replay unacked messages on reconnect; ack tracking + dedup; tests passing.","status":"in_progress","priority":1,"issue_type":"task","assignee":"BE-Dev","created_at":"2025-12-20T21:44:02.016428+01:00","updated_at":"2025-12-26T12:13:54.547885+01:00","labels":["durability","protocol"]} {"id":"agent-relay-hgn","title":"Agent team hierarchies with specialized sub-agents","description":"Each main agent should be able to spawn and coordinate a team of specialized sub-agents:\n\n- **Lead** → Operations Consultant (monitors behavior, provides retrospective feedback on coordination)\n- **Implementer** → Code Reviewer (reviews code before commits, suggests improvements)\n- **Designer** → UX Reviewer (validates design decisions)\n- **Architect** → Security Auditor (checks for vulnerabilities)\n\nImplementation ideas:\n1. Define agent profiles with 'team' configurations in .claude/agents/\n2. When an agent starts, it can auto-spawn its team members\n3. Sub-agents monitor the parent agent's work and provide feedback\n4. At session end, sub-agents provide retrospective summaries\n5. Use -\u003erelay:spawn to create sub-agents with specific roles\n\nThis enables each agent to have a dedicated support team for quality assurance and improvement.","status":"open","priority":2,"issue_type":"feature","created_at":"2025-12-23T15:03:56.608847+01:00","updated_at":"2025-12-23T15:03:56.608847+01:00"} {"id":"agent-relay-hgw","title":"PR-9 Review: Add reconnection logic to MultiProjectClient","description":"","status":"closed","priority":2,"issue_type":"task","created_at":"2025-12-22T21:54:09.712003+01:00","updated_at":"2025-12-24T11:50:55.708361+01:00","closed_at":"2025-12-24T11:50:55.708361+01:00"} {"id":"agent-relay-hks","title":"Increase test coverage for daemon/server.ts, dashboard, and CLI","description":"Current coverage is only 39% overall. Key files with 0% coverage: daemon/server.ts, dashboard/server.ts, cli/index.ts, wrapper/client.ts, wrapper/tmux-wrapper.ts, utils/project-namespace.ts. Need integration tests for the daemon startup/shutdown lifecycle and CLI commands.","status":"closed","priority":2,"issue_type":"task","assignee":"Lead","created_at":"2025-12-20T00:17:29.603137+01:00","updated_at":"2025-12-22T17:14:41.611717+01:00","closed_at":"2025-12-22T17:14:41.611717+01:00"} diff --git a/src/daemon/connection.test.ts b/src/daemon/connection.test.ts index 0f9f2247..e91cfce3 100644 --- a/src/daemon/connection.test.ts +++ b/src/daemon/connection.test.ts @@ -1,8 +1,8 @@ import { describe, it, expect, vi } from 'vitest'; import type { Socket } from 'node:net'; import { Connection } from './connection.js'; -import { encodeFrame } from '../protocol/framing.js'; -import { PROTOCOL_VERSION, type Envelope, type HelloPayload } from '../protocol/types.js'; +import { encodeFrame, FrameParser } from '../protocol/framing.js'; +import { PROTOCOL_VERSION, type Envelope, type HelloPayload, type WelcomePayload } from '../protocol/types.js'; class MockSocket { private handlers: Map void>> = new Map(); @@ -82,6 +82,36 @@ describe('Connection', () => { expect(socket.destroyed).toBe(true); }); + it('accepts resume token when resumeHandler returns a session', async () => { + const socket = new MockSocket(); + const parser = new FrameParser(); + const resumeHandler = vi.fn().mockResolvedValue({ + sessionId: 'session-resume', + resumeToken: 'token-abc', + seedSequences: [{ topic: 'chat', peer: 'peerA', seq: 5 }], + }); + const connection = new Connection(socket as unknown as Socket, { heartbeatMs: 50, resumeHandler }); + const onActive = vi.fn(); + connection.onActive = onActive; + + const hello = makeHello('agent-a'); + hello.payload.session = { resume_token: 'token-abc' }; + socket.emit('data', encodeFrame(hello)); + + await new Promise((r) => setTimeout(r, 0)); + + expect(connection.state).toBe('ACTIVE'); + expect(connection.sessionId).toBe('session-resume'); + expect(connection.resumeToken).toBe('token-abc'); + expect(connection.isResumed).toBe(true); + expect(connection.getNextSeq('chat', 'peerA')).toBe(6); // seeded to 5, next is 6 + expect(onActive).toHaveBeenCalledTimes(1); + + const welcome = parser.push(socket.written[0])?.[0] as Envelope; + expect(welcome.payload.resume_token).toBe('token-abc'); + expect(welcome.payload.session_id).toBe('session-resume'); + }); + describe('heartbeat timeout configuration', () => { it('uses configurable heartbeatTimeoutMultiplier', async () => { const socket = new MockSocket(); diff --git a/src/daemon/connection.ts b/src/daemon/connection.ts index 2e4823ce..869ac572 100644 --- a/src/daemon/connection.ts +++ b/src/daemon/connection.ts @@ -29,6 +29,12 @@ export interface ConnectionConfig { heartbeatMs: number; /** Multiplier for heartbeat timeout (timeout = heartbeatMs * multiplier). Default: 6 (30s with 5s heartbeat) */ heartbeatTimeoutMultiplier: number; + /** Optional handler to validate resume tokens and provide session state */ + resumeHandler?: (params: { agent: string; resumeToken: string }) => Promise<{ + sessionId: string; + resumeToken?: string; + seedSequences?: Array<{ topic?: string; peer: string; seq: number }>; + } | null>; } export const DEFAULT_CONFIG: ConnectionConfig = { @@ -53,6 +59,7 @@ export class Connection { private _workingDirectory?: string; private _sessionId: string; private _resumeToken: string; + private _isResumed = false; private heartbeatTimer?: NodeJS.Timeout; private lastPongReceived?: number; @@ -112,6 +119,14 @@ export class Connection { return this._sessionId; } + get resumeToken(): string { + return this._resumeToken; + } + + get isResumed(): boolean { + return this._isResumed; + } + private setupSocketHandlers(): void { this.socket.on('data', (data) => this.handleData(data)); this.socket.on('close', () => this.handleClose()); @@ -122,7 +137,10 @@ export class Connection { try { const frames = this.parser.push(data); for (const frame of frames) { - this.processFrame(frame); + this.processFrame(frame).catch((err) => { + this.sendError('BAD_REQUEST', `Frame error: ${err}`, true); + this.close(); + }); } } catch (err) { this.sendError('BAD_REQUEST', `Frame error: ${err}`, true); @@ -130,10 +148,10 @@ export class Connection { } } - private processFrame(envelope: Envelope): void { + private async processFrame(envelope: Envelope): Promise { switch (envelope.type) { case 'HELLO': - this.handleHello(envelope as Envelope); + await this.handleHello(envelope as Envelope); break; case 'SEND': this.handleSend(envelope as Envelope); @@ -156,7 +174,7 @@ export class Connection { } } - private handleHello(envelope: Envelope): void { + private async handleHello(envelope: Envelope): Promise { if (this._state !== 'HANDSHAKING') { this.sendError('BAD_REQUEST', 'Unexpected HELLO', false); return; @@ -170,15 +188,36 @@ export class Connection { this._workingDirectory = envelope.payload.workingDirectory; // Check for session resume - if (envelope.payload.session?.resume_token) { - // Resume tokens are not persisted; tell client to start a fresh session. - this.sendError('RESUME_TOO_OLD', 'Session resume not yet supported; starting new session', false); + const resumeToken = envelope.payload.session?.resume_token; + if (resumeToken) { + if (this.config.resumeHandler) { + try { + const resumeState = await this.config.resumeHandler({ + agent: this._agentName, + resumeToken, + }); + + if (resumeState) { + this._sessionId = resumeState.sessionId; + this._resumeToken = resumeState.resumeToken ?? resumeToken; + this._isResumed = true; + + // Seed sequence counters so new deliveries continue from last known seq per stream + for (const seed of resumeState.seedSequences ?? []) { + this.seedSequence(seed.topic ?? 'default', seed.peer, seed.seq); + } + } else { + this.sendError('RESUME_TOO_OLD', 'Resume token rejected; starting new session', false); + } + } catch (err: any) { + this.sendError('RESUME_TOO_OLD', `Resume validation failed: ${err?.message ?? err}`, false); + } + } else { + this.sendError('RESUME_TOO_OLD', 'Session resume not configured; starting new session', false); + } } // Send WELCOME - // Note: resume_token is omitted because session resume is not yet implemented. - // Sending a token would cause clients to attempt resume on reconnect, - // triggering a RESUME_TOO_OLD -> new token -> reconnect loop. const welcome: Envelope = { v: PROTOCOL_VERSION, type: 'WELCOME', @@ -186,6 +225,7 @@ export class Connection { ts: Date.now(), payload: { session_id: this._sessionId, + resume_token: this._resumeToken, server: { max_frame_bytes: this.config.maxFrameBytes, heartbeat_ms: this.config.heartbeatMs, @@ -256,6 +296,17 @@ export class Connection { } } + /** + * Seed a sequence counter for a stream so that the next value continues from the provided seq. + */ + seedSequence(topic: string | undefined, peer: string, seq: number): void { + const key = `${topic ?? 'default'}:${peer}`; + const current = this.sequences.get(key) ?? 0; + if (seq > current) { + this.sequences.set(key, seq); + } + } + /** * Get next sequence number for a stream. */ diff --git a/src/daemon/router.test.ts b/src/daemon/router.test.ts index bb6ca2f2..8bd3ee40 100644 --- a/src/daemon/router.test.ts +++ b/src/daemon/router.test.ts @@ -331,6 +331,41 @@ describe('Router', () => { router.unregister(receiver); expect(router.pendingDeliveryCount).toBe(0); }); + + it('persists ACK status to storage when handler is available', () => { + const updateMessageStatus = vi.fn(); + router = new Router({ + storage: { + init: async () => {}, + saveMessage: async () => {}, + getMessages: async () => [], + updateMessageStatus, + }, + }); + + const sender = new MockConnection('conn-1', 'agent1'); + const receiver = new MockConnection('conn-2', 'agent2'); + router.register(sender); + router.register(receiver); + + const envelope = createSendEnvelope('agent1', 'agent2'); + router.route(sender, envelope); + const deliverId = receiver.sentEnvelopes[0].id; + + const ackEnvelope: Envelope = { + v: 1, + type: 'ACK', + id: 'ack-storage', + ts: Date.now(), + payload: { + ack_id: deliverId, + seq: 1, + }, + }; + + router.handleAck(receiver, ackEnvelope); + expect(updateMessageStatus).toHaveBeenCalledWith(deliverId, 'acked'); + }); }); describe('Direct routing', () => { @@ -763,6 +798,43 @@ describe('Router', () => { }); }); + describe('Replay on resume', () => { + it('replays pending messages to a resumed connection', async () => { + const pending: StoredMessage[] = [{ + id: 'deliver-1', + ts: Date.now(), + from: 'agent1', + to: 'agent2', + topic: 'chat', + kind: 'message', + body: 'missed you', + status: 'unread', + is_urgent: false, + deliverySeq: 3, + deliverySessionId: 'session-resume', + sessionId: 'session-resume', + }]; + + const storage: StorageAdapter = { + init: async () => {}, + saveMessage: async () => {}, + getMessages: async () => [], + getPendingMessagesForSession: async () => pending, + }; + + router = new Router({ storage }); + const receiver = new MockConnection('conn-2', 'agent2', 'session-resume'); + + await router.replayPending(receiver); + + expect(receiver.sentEnvelopes).toHaveLength(1); + const deliver = receiver.sentEnvelopes[0] as DeliverEnvelope; + expect(deliver.id).toBe('deliver-1'); + expect(deliver.delivery.seq).toBe(3); + expect(deliver.from).toBe('agent1'); + }); + }); + describe('Edge cases', () => { it('should handle routing with undefined topic', () => { const sender = new MockConnection('conn-1', 'agent1'); diff --git a/src/daemon/router.ts b/src/daemon/router.ts index 87693f0f..8eca5fa5 100644 --- a/src/daemon/router.ts +++ b/src/daemon/router.ts @@ -258,12 +258,14 @@ export class Router { kind: envelope.payload.kind, body: envelope.payload.body, data: envelope.payload.data, + payloadMeta: envelope.payload_meta, + thread: envelope.payload.thread, deliverySeq: envelope.delivery.seq, deliverySessionId: envelope.delivery.session_id, sessionId: envelope.delivery.session_id, status: 'unread', is_urgent: false, - is_broadcast: isBroadcast, + is_broadcast: isBroadcast || envelope.to === '*', }).catch((err) => { console.error('[router] Failed to persist message', err); }); @@ -309,6 +311,12 @@ export class Router { clearTimeout(pending.timer); } this.pendingDeliveries.delete(ackId); + const statusUpdate = this.storage?.updateMessageStatus?.(ackId, 'acked'); + if (statusUpdate instanceof Promise) { + statusUpdate.catch(err => { + console.error('[router] Failed to record ACK status', err); + }); + } console.log(`[router] ACK received for ${ackId}`); } @@ -376,4 +384,46 @@ export class Router { pending.timer = this.scheduleRetry(deliverId); }, this.deliveryOptions.ackTimeoutMs); } + + /** + * Replay any pending (unacked) messages for a resumed session. + */ + async replayPending(connection: RoutableConnection): Promise { + if (!this.storage?.getPendingMessagesForSession || !connection.agentName) { + return; + } + + const pending = await this.storage.getPendingMessagesForSession(connection.agentName, connection.sessionId); + if (!pending.length) return; + + console.log(`[router] Replaying ${pending.length} messages to ${connection.agentName}`); + + for (const msg of pending) { + const deliver: DeliverEnvelope = { + v: PROTOCOL_VERSION, + type: 'DELIVER', + id: msg.id, + ts: msg.ts, + from: msg.from, + to: msg.to, + topic: msg.topic, + payload: { + kind: msg.kind, + body: msg.body, + data: msg.data, + thread: msg.thread, + }, + payload_meta: msg.payloadMeta, + delivery: { + seq: msg.deliverySeq ?? connection.getNextSeq(msg.topic ?? 'default', msg.from), + session_id: msg.deliverySessionId ?? connection.sessionId, + }, + }; + + const sent = connection.send(deliver); + if (sent) { + this.trackDelivery(connection, deliver); + } + } + } } diff --git a/src/daemon/server.ts b/src/daemon/server.ts index 5ca7b4e0..b580ff39 100644 --- a/src/daemon/server.ts +++ b/src/daemon/server.ts @@ -178,7 +178,30 @@ export class Daemon { private handleConnection(socket: net.Socket): void { console.log('[daemon] New connection'); - const connection = new Connection(socket, this.config); + const resumeHandler = this.storage?.getSessionByResumeToken + ? async ({ agent, resumeToken }: { agent: string; resumeToken: string }) => { + const session = await this.storage!.getSessionByResumeToken!(resumeToken); + if (!session || session.agentName !== agent) return null; + + let seedSequences: Array<{ topic?: string; peer: string; seq: number }> | undefined; + if (this.storage?.getMaxSeqByStream) { + const streams = await this.storage.getMaxSeqByStream(agent, session.id); + seedSequences = streams.map(s => ({ + topic: s.topic ?? 'default', + peer: s.peer, + seq: s.maxSeq, + })); + } + + return { + sessionId: session.id, + resumeToken: session.resumeToken ?? resumeToken, + seedSequences, + }; + } + : undefined; + + const connection = new Connection(socket, { ...this.config, resumeHandler }); this.connections.add(connection); connection.onMessage = (envelope: Envelope) => { @@ -198,32 +221,53 @@ export class Daemon { // Register agent when connection becomes active (after successful handshake) connection.onActive = () => { - if (connection.agentName) { - this.router.register(connection); - console.log(`[daemon] Agent registered: ${connection.agentName}`); - // Registry handles persistence internally via save() - this.registry?.registerOrUpdate({ - name: connection.agentName, - cli: connection.cli, - program: connection.program, - model: connection.model, - task: connection.task, - workingDirectory: connection.workingDirectory, - }); + if (connection.agentName) { + this.router.register(connection); + console.log(`[daemon] Agent registered: ${connection.agentName}`); + // Registry handles persistence internally via save() + this.registry?.registerOrUpdate({ + name: connection.agentName, + cli: connection.cli, + program: connection.program, + model: connection.model, + task: connection.task, + workingDirectory: connection.workingDirectory, + }); // Record session start if (this.storage instanceof SqliteStorageAdapter) { const projectPaths = getProjectPaths(); - this.storage.startSession({ - id: connection.sessionId, - agentName: connection.agentName, - cli: connection.cli, - projectId: projectPaths.projectId, - projectRoot: projectPaths.projectRoot, - startedAt: Date.now(), - }).catch(err => console.error('[daemon] Failed to record session start:', err)); + const storage = this.storage as SqliteStorageAdapter; + const persistSession = async (): Promise => { + let startedAt = Date.now(); + if (connection.isResumed && storage.getSessionByResumeToken) { + const existing = await storage.getSessionByResumeToken(connection.resumeToken); + if (existing?.startedAt) { + startedAt = existing.startedAt; + } + } + + await storage.startSession({ + id: connection.sessionId, + agentName: connection.agentName!, + cli: connection.cli, + projectId: projectPaths.projectId, + projectRoot: projectPaths.projectRoot, + startedAt, + resumeToken: connection.resumeToken, + }); + }; + + persistSession().catch(err => console.error('[daemon] Failed to record session start:', err)); } } + + // Replay pending deliveries for resumed sessions + if (connection.isResumed) { + this.router.replayPending(connection).catch(err => { + console.error('[daemon] Failed to replay pending messages', err); + }); + } }; connection.onClose = () => { diff --git a/src/dashboard/needs-attention.test.ts b/src/dashboard/needs-attention.test.ts index 87526227..0091a43c 100644 --- a/src/dashboard/needs-attention.test.ts +++ b/src/dashboard/needs-attention.test.ts @@ -45,7 +45,8 @@ describe('computeNeedsAttention', () => { const threadId = 'thread-123'; const messages: AttentionMessage[] = [ msg({ from: 'Alice', to: 'Bob', thread: threadId, timestamp: new Date(baseTs).toISOString() }), - msg({ from: 'Bob', to: '*', thread: threadId, timestamp: new Date(baseTs + 2000).toISOString() }), + // Broadcast reply stored with individual recipient but isBroadcast=true + msg({ from: 'Bob', to: 'Alice', thread: threadId, isBroadcast: true, timestamp: new Date(baseTs + 2000).toISOString() }), ]; const result = computeNeedsAttention(messages); @@ -55,7 +56,8 @@ describe('computeNeedsAttention', () => { it('broadcasts clear attention (agent is actively participating)', () => { const messages: AttentionMessage[] = [ msg({ from: 'Alice', to: 'Bob', timestamp: new Date(baseTs).toISOString() }), - msg({ from: 'Bob', to: '*', timestamp: new Date(baseTs + 1000).toISOString() }), + // Broadcast message stored with individual recipient but isBroadcast=true + msg({ from: 'Bob', to: 'Alice', isBroadcast: true, timestamp: new Date(baseTs + 1000).toISOString() }), ]; const result = computeNeedsAttention(messages); diff --git a/src/storage/adapter.ts b/src/storage/adapter.ts index 7390ed7f..78866b01 100644 --- a/src/storage/adapter.ts +++ b/src/storage/adapter.ts @@ -1,4 +1,4 @@ -import type { PayloadKind } from '../protocol/types.js'; +import type { PayloadKind, SendMeta } from '../protocol/types.js'; export type MessageStatus = 'unread' | 'read' | 'acked'; @@ -11,6 +11,8 @@ export interface StoredMessage { kind: PayloadKind; body: string; data?: Record; + /** Optional metadata (importance, replyTo, etc.) */ + payloadMeta?: SendMeta; /** Optional thread ID for grouping related messages */ thread?: string; deliverySeq?: number; @@ -49,6 +51,7 @@ export interface StoredSession { endedAt?: number; messageCount: number; summary?: string; + resumeToken?: string; /** How the session was closed: 'agent' (explicit), 'disconnect', 'error', or undefined (still active) */ closedBy?: 'agent' | 'disconnect' | 'error'; } @@ -76,6 +79,7 @@ export interface StorageAdapter { saveMessage(message: StoredMessage): Promise; getMessages(query?: MessageQuery): Promise; getMessageById?(id: string): Promise; + updateMessageStatus?(id: string, status: MessageStatus): Promise; close?(): Promise; // Session management (optional - for adapters that support it) @@ -84,11 +88,16 @@ export interface StorageAdapter { getSessions?(query?: SessionQuery): Promise; getRecentSessions?(limit?: number): Promise; incrementSessionMessageCount?(sessionId: string): Promise; + getSessionByResumeToken?(resumeToken: string): Promise; // Agent summaries (optional - for adapters that support it) saveAgentSummary?(summary: Omit): Promise; getAgentSummary?(agentName: string): Promise; getAllAgentSummaries?(): Promise; + + // Delivery resume helpers (optional) + getPendingMessagesForSession?(agentName: string, sessionId: string): Promise; + getMaxSeqByStream?(agentName: string, sessionId: string): Promise>; } /** @@ -160,6 +169,46 @@ export class MemoryStorageAdapter implements StorageAdapter { return this.messages.find(m => m.id === id || m.id.startsWith(id)) ?? null; } + async updateMessageStatus(id: string, status: MessageStatus): Promise { + const msg = this.messages.find(m => m.id === id || m.id.startsWith(id)); + if (msg) { + msg.status = status; + } + } + + async getPendingMessagesForSession(agentName: string, sessionId: string): Promise { + return this.messages + .filter(m => m.to === agentName && m.deliverySessionId === sessionId && m.status !== 'acked') + .sort((a, b) => { + const seqA = mSeq(a); + const seqB = mSeq(b); + return seqA === seqB ? a.ts - b.ts : seqA - seqB; + }); + + function mSeq(msg: StoredMessage): number { + return msg.deliverySeq ?? 0; + } + } + + async getMaxSeqByStream(agentName: string, sessionId: string): Promise> { + const aggregates = new Map(); + + for (const msg of this.messages) { + if (msg.to !== agentName) continue; + if (msg.deliverySessionId !== sessionId) continue; + if (msg.deliverySeq === undefined || msg.deliverySeq === null) continue; + + const topic = msg.topic ?? 'default'; + const key = `${topic}:${msg.from}`; + const current = aggregates.get(key); + if (!current || msg.deliverySeq > current.maxSeq) { + aggregates.set(key, { peer: msg.from, topic: msg.topic, maxSeq: msg.deliverySeq }); + } + } + + return Array.from(aggregates.values()); + } + async close(): Promise { this.messages = []; } diff --git a/src/storage/sqlite-adapter.test.ts b/src/storage/sqlite-adapter.test.ts index 00f3d12d..aa1e2c55 100644 --- a/src/storage/sqlite-adapter.test.ts +++ b/src/storage/sqlite-adapter.test.ts @@ -14,12 +14,14 @@ const makeMessage = (overrides: Partial = {}): StoredMessage => ( kind: overrides.kind ?? 'message', body: overrides.body ?? 'hello', data: overrides.data, + payloadMeta: overrides.payloadMeta, thread: overrides.thread, deliverySeq: overrides.deliverySeq, deliverySessionId: overrides.deliverySessionId, sessionId: overrides.sessionId, status: overrides.status ?? 'unread', is_urgent: overrides.is_urgent ?? false, + is_broadcast: overrides.is_broadcast ?? false, }); describe('SqliteStorageAdapter', () => { @@ -189,6 +191,21 @@ describe('SqliteStorageAdapter', () => { expect(sessions[0].messageCount).toBe(3); }); + it('stores and retrieves a resume token', async () => { + const token = 'resume-123'; + await adapter.startSession({ + id: 'resume-session', + agentName: 'ResumeAgent', + startedAt: Date.now(), + resumeToken: token, + }); + + const session = await adapter.getSessionByResumeToken(token); + expect(session).not.toBeNull(); + expect(session?.id).toBe('resume-session'); + expect(session?.resumeToken).toBe(token); + }); + it('filters sessions by agentName and projectId', async () => { const now = Date.now(); await adapter.startSession({ id: 's1', agentName: 'Alice', projectId: 'p1', startedAt: now - 2000 }); @@ -217,6 +234,68 @@ describe('SqliteStorageAdapter', () => { }); }); + describe('Resume helpers', () => { + it('returns pending messages and max seq per stream', async () => { + const sessionId = 'sess-1'; + await adapter.saveMessage({ + id: 'm1', + ts: Date.now(), + from: 'Alice', + to: 'Bob', + kind: 'message', + body: 'one', + status: 'unread', + is_urgent: false, + deliverySeq: 1, + deliverySessionId: sessionId, + sessionId, + is_broadcast: false, + }); + + await adapter.saveMessage({ + id: 'm2', + ts: Date.now() + 10, + from: 'Alice', + to: 'Bob', + kind: 'message', + body: 'two', + status: 'unread', + is_urgent: false, + deliverySeq: 2, + deliverySessionId: sessionId, + sessionId, + is_broadcast: false, + }); + + await adapter.saveMessage({ + id: 'm3', + ts: Date.now() + 20, + from: 'Carol', + to: 'Bob', + kind: 'message', + body: 'three', + status: 'unread', + is_urgent: false, + deliverySeq: 1, + deliverySessionId: sessionId, + sessionId, + is_broadcast: false, + }); + + await adapter.updateMessageStatus?.('m2', 'acked'); + + const pending = await adapter.getPendingMessagesForSession?.('Bob', sessionId); + expect(pending?.map(p => p.id)).toEqual(['m1', 'm3']); + + const seqs = await adapter.getMaxSeqByStream?.('Bob', sessionId); + const sorted = (seqs ?? []).sort((a, b) => a.peer.localeCompare(b.peer)); + expect(sorted).toEqual([ + { peer: 'Alice', topic: undefined, maxSeq: 2 }, + { peer: 'Carol', topic: undefined, maxSeq: 1 }, + ]); + }); + }); + describe('Agent Summaries', () => { it('saves and retrieves an agent summary', async () => { await adapter.saveAgentSummary({ diff --git a/src/storage/sqlite-adapter.ts b/src/storage/sqlite-adapter.ts index 9f173bb9..b01cc7ee 100644 --- a/src/storage/sqlite-adapter.ts +++ b/src/storage/sqlite-adapter.ts @@ -133,6 +133,7 @@ export class SqliteStorageAdapter implements StorageAdapter { kind TEXT NOT NULL, body TEXT NOT NULL, data TEXT, + payload_meta TEXT, thread TEXT, delivery_seq INTEGER, delivery_session_id TEXT, @@ -158,6 +159,9 @@ export class SqliteStorageAdapter implements StorageAdapter { this.db.exec('ALTER TABLE messages ADD COLUMN thread TEXT'); this.db.exec('CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages (thread)'); } + if (!columnNames.has('payload_meta')) { + this.db.exec('ALTER TABLE messages ADD COLUMN payload_meta TEXT'); + } if (!columnNames.has('status')) { this.db.exec("ALTER TABLE messages ADD COLUMN status TEXT NOT NULL DEFAULT 'unread'"); this.db.exec('CREATE INDEX IF NOT EXISTS idx_messages_status ON messages (status)'); @@ -183,13 +187,23 @@ export class SqliteStorageAdapter implements StorageAdapter { ended_at INTEGER, message_count INTEGER DEFAULT 0, summary TEXT, + resume_token TEXT, closed_by TEXT ); CREATE INDEX IF NOT EXISTS idx_sessions_agent ON sessions (agent_name); CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions (started_at); CREATE INDEX IF NOT EXISTS idx_sessions_project ON sessions (project_id); + CREATE INDEX IF NOT EXISTS idx_sessions_resume_token ON sessions (resume_token); `); + // Migrate existing sessions table to add resume_token if missing + const sessionColumns = this.db.prepare("PRAGMA table_info(sessions)").all() as { name: string }[]; + const sessionColumnNames = new Set(sessionColumns.map(c => c.name)); + if (!sessionColumnNames.has('resume_token')) { + this.db.exec('ALTER TABLE sessions ADD COLUMN resume_token TEXT'); + this.db.exec('CREATE INDEX IF NOT EXISTS idx_sessions_resume_token ON sessions (resume_token)'); + } + // Create agent_summaries table (IF NOT EXISTS is safe here - no new columns to migrate) this.db.exec(` CREATE TABLE IF NOT EXISTS agent_summaries ( @@ -231,8 +245,8 @@ export class SqliteStorageAdapter implements StorageAdapter { this.insertStmt = this.db.prepare(` INSERT OR REPLACE INTO messages - (id, ts, sender, recipient, topic, kind, body, data, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent, is_broadcast) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + (id, ts, sender, recipient, topic, kind, body, data, payload_meta, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent, is_broadcast) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); // Start automatic cleanup if enabled @@ -313,6 +327,7 @@ export class SqliteStorageAdapter implements StorageAdapter { message.kind, message.body, message.data ? JSON.stringify(message.data) : null, + message.payloadMeta ? JSON.stringify(message.payloadMeta) : null, message.thread ?? null, message.deliverySeq ?? null, message.deliverySessionId ?? null, @@ -365,7 +380,7 @@ export class SqliteStorageAdapter implements StorageAdapter { const limit = query.limit ?? 200; const stmt = this.db.prepare(` - SELECT id, ts, sender, recipient, topic, kind, body, data, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent, is_broadcast + SELECT id, ts, sender, recipient, topic, kind, body, data, payload_meta, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent, is_broadcast FROM messages ${where} ORDER BY ts ${order} @@ -382,6 +397,7 @@ export class SqliteStorageAdapter implements StorageAdapter { kind: row.kind, body: row.body, data: row.data ? JSON.parse(row.data) : undefined, + payloadMeta: row.payload_meta ? JSON.parse(row.payload_meta) : undefined, thread: row.thread ?? undefined, deliverySeq: row.delivery_seq ?? undefined, deliverySessionId: row.delivery_session_id ?? undefined, @@ -407,7 +423,7 @@ export class SqliteStorageAdapter implements StorageAdapter { // Support both exact match and prefix match (for short IDs like "06eb33da") const stmt = this.db.prepare(` - SELECT id, ts, sender, recipient, topic, kind, body, data, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent, is_broadcast + SELECT id, ts, sender, recipient, topic, kind, body, data, payload_meta, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent, is_broadcast FROM messages WHERE id = ? OR id LIKE ? ORDER BY ts DESC @@ -426,6 +442,7 @@ export class SqliteStorageAdapter implements StorageAdapter { kind: row.kind, body: row.body, data: row.data ? JSON.parse(row.data) : undefined, + payloadMeta: row.payload_meta ? JSON.parse(row.payload_meta) : undefined, thread: row.thread ?? undefined, deliverySeq: row.delivery_seq ?? undefined, deliverySessionId: row.delivery_session_id ?? undefined, @@ -436,6 +453,59 @@ export class SqliteStorageAdapter implements StorageAdapter { }; } + async getPendingMessagesForSession(agentName: string, sessionId: string): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + SELECT id, ts, sender, recipient, topic, kind, body, data, payload_meta, thread, delivery_seq, delivery_session_id, session_id, status, is_urgent, is_broadcast + FROM messages + WHERE recipient = ? AND delivery_session_id = ? AND status != 'acked' + ORDER BY delivery_seq ASC, ts ASC + `); + + const rows = stmt.all(agentName, sessionId); + return rows.map((row: any) => ({ + id: row.id, + ts: row.ts, + from: row.sender, + to: row.recipient, + topic: row.topic ?? undefined, + kind: row.kind, + body: row.body, + data: row.data ? JSON.parse(row.data) : undefined, + payloadMeta: row.payload_meta ? JSON.parse(row.payload_meta) : undefined, + thread: row.thread ?? undefined, + deliverySeq: row.delivery_seq ?? undefined, + deliverySessionId: row.delivery_session_id ?? undefined, + sessionId: row.session_id ?? undefined, + status: row.status ?? 'unread', + is_urgent: row.is_urgent === 1, + is_broadcast: row.is_broadcast === 1, + })); + } + + async getMaxSeqByStream(agentName: string, sessionId: string): Promise> { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const stmt = this.db.prepare(` + SELECT sender, topic, MAX(delivery_seq) as max_seq + FROM messages + WHERE recipient = ? AND delivery_session_id = ? AND delivery_seq IS NOT NULL + GROUP BY sender, topic + `); + + const rows = stmt.all(agentName, sessionId) as Array<{ sender: string; topic: string | null; max_seq: number }>; + return rows.map(row => ({ + peer: row.sender, + topic: row.topic ?? undefined, + maxSeq: row.max_seq, + })); + } + async close(): Promise { // Stop cleanup timer if (this.cleanupTimer) { @@ -457,9 +527,20 @@ export class SqliteStorageAdapter implements StorageAdapter { } const stmt = this.db.prepare(` - INSERT OR REPLACE INTO sessions - (id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO sessions + (id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary, resume_token, closed_by) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + agent_name = excluded.agent_name, + cli = COALESCE(excluded.cli, sessions.cli), + project_id = COALESCE(excluded.project_id, sessions.project_id), + project_root = COALESCE(excluded.project_root, sessions.project_root), + started_at = COALESCE(sessions.started_at, excluded.started_at), + ended_at = excluded.ended_at, + message_count = COALESCE(sessions.message_count, excluded.message_count), + summary = COALESCE(excluded.summary, sessions.summary), + resume_token = COALESCE(excluded.resume_token, sessions.resume_token), + closed_by = excluded.closed_by `); stmt.run( @@ -471,7 +552,9 @@ export class SqliteStorageAdapter implements StorageAdapter { session.startedAt, session.endedAt ?? null, 0, - session.summary ?? null + session.summary ?? null, + session.resumeToken ?? null, + null ); } @@ -542,7 +625,7 @@ export class SqliteStorageAdapter implements StorageAdapter { const limit = query.limit ?? 50; const stmt = this.db.prepare(` - SELECT id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary, closed_by + SELECT id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary, resume_token, closed_by FROM sessions ${where} ORDER BY started_at DESC @@ -560,6 +643,7 @@ export class SqliteStorageAdapter implements StorageAdapter { endedAt: row.ended_at ?? undefined, messageCount: row.message_count, summary: row.summary ?? undefined, + resumeToken: row.resume_token ?? undefined, closedBy: row.closed_by ?? undefined, })); } @@ -568,6 +652,37 @@ export class SqliteStorageAdapter implements StorageAdapter { return this.getSessions({ limit }); } + async getSessionByResumeToken(resumeToken: string): Promise { + if (!this.db) { + throw new Error('SqliteStorageAdapter not initialized'); + } + + const row = this.db.prepare(` + SELECT id, agent_name, cli, project_id, project_root, started_at, ended_at, message_count, summary, resume_token, closed_by + FROM sessions + WHERE resume_token = ? + LIMIT 1 + `).get(resumeToken) as any; + + if (!row) { + return null; + } + + return { + id: row.id, + agentName: row.agent_name, + cli: row.cli ?? undefined, + projectId: row.project_id ?? undefined, + projectRoot: row.project_root ?? undefined, + startedAt: row.started_at, + endedAt: row.ended_at ?? undefined, + messageCount: row.message_count, + summary: row.summary ?? undefined, + resumeToken: row.resume_token ?? undefined, + closedBy: row.closed_by ?? undefined, + }; + } + // ============ Agent Summaries ============ async saveAgentSummary(summary: { diff --git a/src/wrapper/client.ts b/src/wrapper/client.ts index cc8a7358..aa167f46 100644 --- a/src/wrapper/client.ts +++ b/src/wrapper/client.ts @@ -66,6 +66,9 @@ export class RelayClient { private reconnectDelay: number; private reconnectTimer?: NodeJS.Timeout; private _destroyed = false; + private deliveredIds: Set = new Set(); + private deliveredOrder: string[] = []; + private readonly deliveredCacheLimit = 2000; // Event handlers onMessage?: (from: string, payload: SendPayload, messageId: string, meta?: SendMeta) => void; @@ -360,6 +363,11 @@ export class RelayClient { }, }); + const duplicate = this.markDelivered(envelope.id); + if (duplicate) { + return; + } + // Notify handler if (this.onMessage && envelope.from) { this.onMessage(envelope.from, envelope.payload, envelope.id, envelope.payload_meta); @@ -437,4 +445,27 @@ export class RelayClient { }); }, delay); } + + /** + * Track delivered message IDs to provide deterministic deduplication when messages are replayed. + * @returns true if the message has already been seen. + */ + private markDelivered(id: string): boolean { + if (this.deliveredIds.has(id)) { + return true; + } + + this.deliveredIds.add(id); + this.deliveredOrder.push(id); + + // Simple FIFO eviction to keep memory bounded + if (this.deliveredOrder.length > this.deliveredCacheLimit) { + const oldest = this.deliveredOrder.shift(); + if (oldest) { + this.deliveredIds.delete(oldest); + } + } + + return false; + } } diff --git a/src/wrapper/tmux-wrapper.ts b/src/wrapper/tmux-wrapper.ts index 3df34c3b..4a2f53e4 100644 --- a/src/wrapper/tmux-wrapper.ts +++ b/src/wrapper/tmux-wrapper.ts @@ -127,6 +127,9 @@ export class TmuxWrapper { private readonly MAX_PENDING_RELAY_COMMANDS = 50; private processedSpawnCommands: Set = new Set(); // Dedup spawn commands private processedReleaseCommands: Set = new Set(); // Dedup release commands + private receivedMessageIdSet: Set = new Set(); + private receivedMessageIdOrder: string[] = []; + private readonly MAX_RECEIVED_MESSAGES = 2000; constructor(config: TmuxWrapperConfig) { this.config = { @@ -878,6 +881,11 @@ export class TmuxWrapper { * Handle incoming message from relay */ private handleIncomingMessage(from: string, payload: SendPayload, messageId: string, meta?: SendMeta): void { + if (this.hasSeenIncoming(messageId)) { + this.logStderr(`← ${from}: duplicate delivery (${messageId.substring(0, 8)})`); + return; + } + const truncatedBody = payload.body.substring(0, Math.min(DEBUG_LOG_TRUNCATE_LENGTH, payload.body.length)); this.logStderr(`← ${from}: ${truncatedBody}...`); @@ -1012,6 +1020,24 @@ export class TmuxWrapper { } } + private hasSeenIncoming(messageId: string): boolean { + if (this.receivedMessageIdSet.has(messageId)) { + return true; + } + + this.receivedMessageIdSet.add(messageId); + this.receivedMessageIdOrder.push(messageId); + + if (this.receivedMessageIdOrder.length > this.MAX_RECEIVED_MESSAGES) { + const oldest = this.receivedMessageIdOrder.shift(); + if (oldest) { + this.receivedMessageIdSet.delete(oldest); + } + } + + return false; + } + /** * Send special keys to tmux */ From f2c40ebdadf6335328b2d21d87b498063e7ec5b2 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Fri, 26 Dec 2025 12:25:27 +0100 Subject: [PATCH 2/3] fix migration --- src/storage/sqlite-adapter.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/storage/sqlite-adapter.ts b/src/storage/sqlite-adapter.ts index b01cc7ee..b2a35735 100644 --- a/src/storage/sqlite-adapter.ts +++ b/src/storage/sqlite-adapter.ts @@ -175,7 +175,8 @@ export class SqliteStorageAdapter implements StorageAdapter { } } - // Create sessions table (IF NOT EXISTS is safe here - no new columns to migrate) + // Create sessions table (IF NOT EXISTS is safe here) + // Note: Don't create resume_token index here - it's created after migration check this.db.exec(` CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, @@ -193,7 +194,6 @@ export class SqliteStorageAdapter implements StorageAdapter { CREATE INDEX IF NOT EXISTS idx_sessions_agent ON sessions (agent_name); CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions (started_at); CREATE INDEX IF NOT EXISTS idx_sessions_project ON sessions (project_id); - CREATE INDEX IF NOT EXISTS idx_sessions_resume_token ON sessions (resume_token); `); // Migrate existing sessions table to add resume_token if missing @@ -201,8 +201,9 @@ export class SqliteStorageAdapter implements StorageAdapter { const sessionColumnNames = new Set(sessionColumns.map(c => c.name)); if (!sessionColumnNames.has('resume_token')) { this.db.exec('ALTER TABLE sessions ADD COLUMN resume_token TEXT'); - this.db.exec('CREATE INDEX IF NOT EXISTS idx_sessions_resume_token ON sessions (resume_token)'); } + // Create index after ensuring column exists (either from CREATE TABLE or migration) + this.db.exec('CREATE INDEX IF NOT EXISTS idx_sessions_resume_token ON sessions (resume_token)'); // Create agent_summaries table (IF NOT EXISTS is safe here - no new columns to migrate) this.db.exec(` From bd5f6512e074e8d8c490a4214a957d0841944200 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Fri, 26 Dec 2025 13:04:16 +0100 Subject: [PATCH 3/3] Fix: Multi-line relay message parsing - parser now captures plain continuation lines [agent-relay-6dl8] --- .beads/issues.jsonl | 3 ++- src/wrapper/parser.test.ts | 38 +++++++++++++++++++++++++++++++++++--- src/wrapper/parser.ts | 3 +++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 074ed307..fd81a592 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -29,6 +29,7 @@ {"id":"agent-relay-5fa","title":"Add exponential backoff for daemon reconnection","description":"Implement graceful reconnection with exponential backoff delays [100, 500, 1000, 2000, 5000ms]. After max attempts, operate offline gracefully. See docs/TMUX_IMPROVEMENTS.md for implementation details.","status":"closed","priority":2,"issue_type":"feature","assignee":"LeadDev","created_at":"2025-12-20T21:28:48.055013+01:00","updated_at":"2025-12-20T21:33:42.229756+01:00","closed_at":"2025-12-20T21:33:42.229756+01:00"} {"id":"agent-relay-5g0","title":"Heartbeat timeout could be more configurable","description":"In connection.ts:196, heartbeat timeout is hardcoded as 2x heartbeatMs. This should be independently configurable. Also, heartbeat failures immediately kill the connection - could implement exponential backoff for transient issues.","status":"closed","priority":2,"issue_type":"task","created_at":"2025-12-20T00:18:03.556614+01:00","updated_at":"2025-12-23T23:03:07.563273+01:00","closed_at":"2025-12-23T23:03:07.563273+01:00"} {"id":"agent-relay-68a","title":"Ask pattern: request/response messaging (replyTo + deferred)","description":"Implement an RPC-like ask() over agent-relay: sender creates a short-lived deferred handle, sends message with replyTo, receiver responds resolving the deferred. Similar to swarm-mail DurableDeferred + ask/respond.","acceptance_criteria":"- ask() returns response or times out\\n- Deferreds are persisted with TTL\\n- Works across process restarts/reconnects","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-20T21:44:33.930531+01:00","updated_at":"2025-12-20T21:44:33.930531+01:00","labels":["coordination","protocol"]} +{"id":"agent-relay-6dl8","title":"Multi-line relay messages truncated in dashboard display","description":"Multi-line messages sent via -\u003erelay: are being cut off after first line when displayed in dashboard. This blocks communication between agents. Need to: (1) Verify daemon sends full message, (2) Fix dashboard rendering to handle newlines properly.","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-26T12:28:39.746942+01:00","updated_at":"2025-12-26T12:38:36.844034+01:00","closed_at":"2025-12-26T12:38:36.844034+01:00"} {"id":"agent-relay-6mo","title":"Lead spawn capability not working (@relay:spawn pattern)","description":"The lead command advertises spawn capability via @relay:spawn pattern but it's not fully implemented. Agents started with 'agent-relay lead' should be able to spawn workers, but the parser extension mentioned in the code is not implemented.","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-23T12:14:59.492237+01:00","updated_at":"2025-12-23T12:18:14.114911+01:00","closed_at":"2025-12-23T12:18:14.114911+01:00"} {"id":"agent-relay-6nx","title":"Add activity state tracking for better injection timing","description":"Track active/idle/disconnected state with timestamps. When session goes idle (30s no activity), trigger message injection opportunity. Improves injection timing vs current fixed 1.5s wait. See docs/TMUX_IMPROVEMENTS.md for implementation details.","status":"closed","priority":2,"issue_type":"feature","assignee":"LeadDev","created_at":"2025-12-20T21:28:46.993856+01:00","updated_at":"2025-12-20T21:33:42.223042+01:00","closed_at":"2025-12-20T21:33:42.223042+01:00"} {"id":"agent-relay-6ny","title":"Fix message truncation: store full message in SQLite first, include ID in relay","description":"","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-19T22:04:36.168862+01:00","updated_at":"2025-12-19T22:08:28.207532+01:00","closed_at":"2025-12-19T22:08:28.207532+01:00"} @@ -130,7 +131,7 @@ {"id":"agent-relay-ytt","title":"Implement threaded conversations (Slack-style)","description":"Add thread replies with parent_message_id tracking in storage, nested thread UI display in dashboard, thread notification badges, and expand/collapse thread views. Backend: add parent_message_id column, thread reply routing. Frontend: inline thread expansion, reply count badges, thread-specific compose.","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-12-23T12:00:00Z","updated_at":"2025-12-23T16:37:22.987643+01:00","closed_at":"2025-12-23T16:37:22.987643+01:00"} {"id":"agent-relay-ytu","title":"[Control Plane] Design Control API (REST + WebSocket)","description":"Design API for human control plane: POST /tasks, GET /agents, POST /agents/:id/msg, WS /stream. OpenAPI spec. Authentication endpoints. See ai-maestro's manager/worker pattern for reference.","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"} {"id":"agent-relay-ytv","title":"[Control Plane] Implement Lead Agent orchestration","description":"Lead Agent receives human intent, breaks into tasks, assigns to specialized agents based on skills, monitors progress, escalates decisions. Can be AI-powered or rule-based initially.","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"} -{"id":"agent-relay-ytw","title":"[Control Plane] Build web dashboard v2 (fleet control)","description":"Next.js dashboard with: fleet overview, task assignment UI, agent status cards, trajectory viewer, decision queue. Learn from ai-maestro's hierarchical naming and color coding.","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"} +{"id":"agent-relay-ytw","title":"[Control Plane] Build web dashboard v2 (fleet control)","description":"Next.js dashboard with: fleet overview, task assignment UI, agent status cards, trajectory viewer, decision queue. Learn from ai-maestro's hierarchical naming and color coding.","status":"in_progress","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-26T12:39:19.789712+01:00"} {"id":"agent-relay-ytx","title":"[Control Plane] Human authentication (OAuth/magic link)","description":"Secure human access to control plane. OAuth2 (GitHub/Google) or magic link email auth. JWT tokens for API access. Role-based permissions (admin/operator/viewer).","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"} {"id":"agent-relay-yty","title":"[Control Plane] Push notification service","description":"Real-time notifications to mobile/web: APNs for iOS, FCM for Android/web. Notify on: task completion, agent questions, escalations, errors.","status":"open","priority":3,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"} {"id":"agent-relay-ytz","title":"[Control Plane] iPhone app MVP","description":"React Native or Swift app: fleet status view, task list, agent messaging, push notifications, decision queue with quick actions. Minimal viable version.","status":"open","priority":3,"issue_type":"task","created_at":"2025-12-21T14:00:00Z","updated_at":"2025-12-21T14:00:00Z"} diff --git a/src/wrapper/parser.test.ts b/src/wrapper/parser.test.ts index 70a8f19f..fdcbcea3 100644 --- a/src/wrapper/parser.test.ts +++ b/src/wrapper/parser.test.ts @@ -87,6 +87,15 @@ describe('OutputParser', () => { expect(result.output).toBe(''); }); + it('captures plain multi-line inline command without punctuation or indentation', () => { + const input = '->relay:agent2 First line\nSecond line\nThird line\n'; + const result = parser.parse(input); + + expect(result.commands).toHaveLength(1); + expect(result.commands[0].body).toBe('First line\nSecond line\nThird line'); + expect(result.output).toBe(''); + }); + it('does not swallow subsequent inline command after indented continuation', () => { const result = parser.parse('->relay:agent1 First line\n Second line\n->relay:agent2 Next\n'); @@ -114,6 +123,17 @@ describe('OutputParser', () => { expect(result.output).toBe('\nNext output\n'); }); + it('captures plain multi-line message without punctuation until blank line', () => { + // This is the key fix for agent-relay-6dl8: multi-line messages that don't + // end with continuation punctuation or have indentation should still be captured + const input = '->relay:Lead Hello world\nThis is the second line\nAnd third line\n\nRegular output\n'; + const result = parser.parse(input); + + expect(result.commands).toHaveLength(1); + expect(result.commands[0].body).toBe('Hello world\nThis is the second line\nAnd third line'); + expect(result.output).toBe('\nRegular output\n'); + }); + it('stops continuation at prompt-ish line', () => { const input = '->relay:agent2 Message body\n> \nFollow-up\n'; const result = parser.parse(input); @@ -381,11 +401,14 @@ describe('OutputParser', () => { }); it('mixes relay commands with regular output', () => { - const input = 'Output 1\n->relay:agent2 Message\nOutput 2\n'; + // With multi-line continuation enabled, "Output 2" is captured as continuation + // until a blank line or other stop condition. Use a blank line to separate. + const input = 'Output 1\n->relay:agent2 Message\n\nOutput 2\n'; const result = parser.parse(input); expect(result.commands).toHaveLength(1); - expect(result.output).toBe('Output 1\nOutput 2\n'); + expect(result.commands[0].body).toBe('Message'); + expect(result.output).toBe('Output 1\n\nOutput 2\n'); }); it('handles incomplete block at flush', () => { @@ -450,6 +473,8 @@ describe('OutputParser', () => { describe('Complex scenarios', () => { it('handles multiple commands in one parse call', () => { + // With multi-line continuation, "Regular output" becomes part of command 1 + // since it's followed by another relay command (which stops continuation). const input = `->relay:agent1 First Regular output ->relay:agent2 Second @@ -460,9 +485,10 @@ More output expect(result.commands).toHaveLength(3); expect(result.commands[0].to).toBe('agent1'); + // "Regular output" is captured as continuation of "First" + expect(result.commands[0].body).toBe('First\nRegular output'); expect(result.commands[1].to).toBe('agent2'); expect(result.commands[2].to).toBe('agent3'); - expect(result.output).toContain('Regular output'); expect(result.output).toContain('More output'); }); @@ -493,10 +519,14 @@ More output }); it('preserves order of commands and output', () => { + // With multi-line continuation, lines after relay commands are captured + // until a stop condition. Use blank lines to separate output from messages. const input = `Out1 ->relay:agent1 Msg1 + Out2 ->relay:agent2 Msg2 + Out3 `; const result = parser.parse(input); @@ -504,7 +534,9 @@ Out3 const outputLines = result.output.split('\n').filter(l => l.trim()); expect(outputLines).toEqual(['Out1', 'Out2', 'Out3']); expect(result.commands[0].to).toBe('agent1'); + expect(result.commands[0].body).toBe('Msg1'); expect(result.commands[1].to).toBe('agent2'); + expect(result.commands[1].body).toBe('Msg2'); }); }); diff --git a/src/wrapper/parser.ts b/src/wrapper/parser.ts index a38c93a9..fa172c3a 100644 --- a/src/wrapper/parser.ts +++ b/src/wrapper/parser.ts @@ -357,6 +357,9 @@ export class OutputParser { if (prevSuggestsContinuation) return true; // If we've already continued once, allow subsequent lines until a stop condition if (continuationCount > 0) return true; + // Allow plain non-empty lines as continuation so multi-line messages + // without indentation or trailing punctuation are captured fully. + if (stripped.trim() !== '') return true; return false; };