diff --git a/docs/AGENT-GUIDE.md b/docs/AGENT-GUIDE.md index 86a28b7..dc796c2 100644 --- a/docs/AGENT-GUIDE.md +++ b/docs/AGENT-GUIDE.md @@ -547,6 +547,30 @@ This means a DID:web agent with a crafted long or unusual domain path could crea - **Use `ephemeral: true` for sensitive payloads.** Ephemeral messages have their body permanently deleted on ack. Use `ttl` for time-sensitive secrets. - **Use correlation IDs for request-response.** Set `correlation_id` on messages you send; use the `/reply` endpoint to send a correlated response. +### Delivery Guarantees: `retain_until_acked` and `auto_ack_on_pull` + +ADMP supports two opt-in delivery modes beyond the default lease-and-ack flow: + +#### `retain_until_acked` (sender-set, per-message) + +Set `retain_until_acked: true` in the send body to require explicit acknowledgment. This overrides the recipient's `auto_ack_on_pull` preference — the message will never be silently consumed. + +**`work_order` and `fix_request` always retain** — the hub sets `retain_until_acked: true` automatically for these types, regardless of what the sender passes. + +#### `auto_ack_on_pull` (recipient-set, at registration) + +Register with `auto_ack_on_pull: true` for fire-and-forget delivery. The hub immediately acks each message on pull — no explicit `POST .../ack` is required. + +The pull response includes `"auto_acked": true` when the hub auto-acked the message. In this case `lease_until` is `null` — do not call `POST .../ack` for auto-acked messages (it will return 400). + +`retain_until_acked` always wins over `auto_ack_on_pull` — work orders and retained messages require explicit ack even if the recipient opted into auto-ack. + +| Scenario | Recommended setting | +|----------|-------------------| +| Work orders, fix requests, critical tasks | `retain_until_acked: true` (automatic for `work_order`/`fix_request`) | +| Fire-and-forget notifications | Register recipient with `auto_ack_on_pull: true` | +| Default — explicit ack, retries on timeout | Neither flag | + ### Performance - **Pull in a loop** with a reasonable `visibility_timeout` (30-60s) to avoid re-processing. diff --git a/src/routes/agents.js b/src/routes/agents.js index 1a6c14f..03acf95 100644 --- a/src/routes/agents.js +++ b/src/routes/agents.js @@ -19,7 +19,11 @@ const router = express.Router(); */ router.post('/register', async (req, res) => { try { - const { agent_id, agent_type, metadata, webhook_url, webhook_secret, seed, public_key, tenant_id } = req.body; + const { agent_id, agent_type, metadata, webhook_url, webhook_secret, seed, public_key, tenant_id, auto_ack_on_pull } = req.body; + + if (auto_ack_on_pull !== undefined && typeof auto_ack_on_pull !== 'boolean') { + return res.status(400).json({ error: 'INVALID_INPUT', message: 'auto_ack_on_pull must be a boolean' }); + } // Convert seed from base64 string to Uint8Array if provided let seedBytes; @@ -35,7 +39,8 @@ router.post('/register', async (req, res) => { webhook_secret, seed: seedBytes, public_key, - tenant_id + tenant_id, + auto_ack_on_pull }); const response = { @@ -50,7 +55,8 @@ router.post('/register', async (req, res) => { tenant_id: agent.tenant_id, webhook_url: agent.webhook_url, webhook_secret: agent.webhook_secret, - heartbeat: agent.heartbeat + heartbeat: agent.heartbeat, + auto_ack_on_pull: agent.auto_ack_on_pull }; // Only include secret_key when available (legacy and seed-based modes) diff --git a/src/routes/inbox.js b/src/routes/inbox.js index 89994ad..2623888 100644 --- a/src/routes/inbox.js +++ b/src/routes/inbox.js @@ -16,7 +16,11 @@ const router = express.Router(); */ router.post('/:agentId/messages', async (req, res) => { try { - const { ephemeral, ttl, ...envelope } = req.body; + const { ephemeral, ttl, retain_until_acked, ...envelope } = req.body; + + if (retain_until_acked !== undefined && typeof retain_until_acked !== 'boolean') { + return res.status(400).json({ error: 'INVALID_INPUT', message: 'retain_until_acked must be a boolean' }); + } // Ensure to field matches URL if (!envelope.to) { @@ -25,7 +29,8 @@ router.post('/:agentId/messages', async (req, res) => { const message = await inboxService.send(envelope, { ephemeral: ephemeral || false, - ttl: ttl || null + ttl: ttl || null, + retain_until_acked }); res.status(201).json({ @@ -69,9 +74,15 @@ router.post('/:agentId/inbox/pull', authenticateHttpSignature, async (req, res) try { const { visibility_timeout } = req.body; - const message = await inboxService.pull(req.params.agentId, { - visibility_timeout - }); + // If req.agent is populated by HTTP Signature auth, pass the preference directly + // to avoid a redundant storage round-trip in the service. Falls back to a service- + // level lookup when req.agent is unavailable (e.g. legacy auth without signature). + const pullOpts = { visibility_timeout }; + if (req.agent?.auto_ack_on_pull !== undefined) { + pullOpts.auto_ack_on_pull = req.agent.auto_ack_on_pull; + } + + const message = await inboxService.pull(req.params.agentId, pullOpts); if (!message) { return res.status(204).send(); @@ -80,8 +91,10 @@ router.post('/:agentId/inbox/pull', authenticateHttpSignature, async (req, res) res.json({ message_id: message.id, envelope: message.envelope, - lease_until: message.lease_until, - attempts: message.attempts + // auto_acked messages are already acked in storage; lease_until is not meaningful. + lease_until: message.auto_acked ? null : message.lease_until, + attempts: message.attempts, + ...(message.auto_acked && { auto_acked: true }) }); } catch (error) { res.status(400).json({ diff --git a/src/server.test.js b/src/server.test.js index c38d239..4d44342 100644 --- a/src/server.test.js +++ b/src/server.test.js @@ -55,7 +55,7 @@ async function sendSignedMessage(sender, recipientId, options = {}) { envelope.signature.sig = 'invalid-signature'; } - // Build send body — envelope fields plus optional ephemeral/ttl + // Build send body — envelope fields plus optional ephemeral/ttl/retain_until_acked const sendBody = { ...envelope }; if (options.ephemeral !== undefined) { sendBody.ephemeral = options.ephemeral; @@ -63,6 +63,9 @@ async function sendSignedMessage(sender, recipientId, options = {}) { if (options.ttl !== undefined) { sendBody.ttl = options.ttl; } + if (options.retain_until_acked !== undefined) { + sendBody.retain_until_acked = options.retain_until_acked; + } const res = await request(app) .post(`/api/agents/${encodeURIComponent(recipientId)}/messages`) @@ -1130,6 +1133,175 @@ test('non-ephemeral messages behave as before (backward compat)', async () => { assert.equal(statusRes.body.status, 'acked'); }); +test('auto_ack_on_pull: message is immediately acked on pull, auto_acked returned in response', async () => { + const sender = await registerAgent('aap-sender'); + // Register recipient with auto_ack_on_pull: true + const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const recipientRes = await request(app) + .post('/api/agents/register') + .send({ agent_id: `aap-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true }); + assert.equal(recipientRes.status, 201); + assert.equal(recipientRes.body.auto_ack_on_pull, true); + const recipient = recipientRes.body; + + // Send a notification (not a retain type) + const sendRes = await sendSignedMessage(sender, recipient.agent_id, { + type: 'notification', + subject: 'auto-ack-test', + body: { ping: true } + }); + assert.equal(sendRes.status, 201); + const messageId = sendRes.body.message_id; + + // Pull — hub should auto-ack immediately + const pullRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`) + .send({ visibility_timeout: 60 }); + + assert.equal(pullRes.status, 200); + assert.equal(pullRes.body.message_id, messageId); + assert.equal(pullRes.body.auto_acked, true); + assert.equal(pullRes.body.lease_until, null); + + // Message should be acked in storage — further ack should fail + const ackRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`) + .send({}); + assert.equal(ackRes.status, 400); // already acked, not leased + + // Status confirms acked + const statusRes = await request(app).get(`/api/messages/${messageId}/status`); + assert.equal(statusRes.status, 200); + assert.equal(statusRes.body.status, 'acked'); +}); + +test('retain_until_acked overrides auto_ack_on_pull — explicit ack required', async () => { + const sender = await registerAgent('retain-sender'); + const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const recipientRes = await request(app) + .post('/api/agents/register') + .send({ agent_id: `retain-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true }); + assert.equal(recipientRes.status, 201); + const recipient = recipientRes.body; + + // Send with retain_until_acked: true (overrides recipient auto_ack_on_pull) + const sendRes = await sendSignedMessage(sender, recipient.agent_id, { + type: 'notification', + subject: 'retain-test', + body: { important: true }, + retain_until_acked: true + }); + assert.equal(sendRes.status, 201); + const messageId = sendRes.body.message_id; + + // Pull — should be leased, NOT auto-acked + const pullRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`) + .send({ visibility_timeout: 60 }); + + assert.equal(pullRes.status, 200); + assert.equal(pullRes.body.message_id, messageId); + assert.equal(pullRes.body.auto_acked, undefined); // not auto-acked + assert.ok(pullRes.body.lease_until); // has a real lease + + // Explicit ack should succeed + const ackRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`) + .send({}); + assert.equal(ackRes.status, 200); + assert.equal(ackRes.body.ok, true); +}); + +test('work_order type always sets retain_until_acked server-side', async () => { + const sender = await registerAgent('wo-retain-sender'); + const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const recipientRes = await request(app) + .post('/api/agents/register') + .send({ agent_id: `wo-retain-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true }); + assert.equal(recipientRes.status, 201); + const recipient = recipientRes.body; + + // Send a work_order — server should set retain_until_acked automatically + const sendRes = await sendSignedMessage(sender, recipient.agent_id, { + type: 'work_order', + subject: 'retain-by-type', + body: { task: 'do something important' } + }); + assert.equal(sendRes.status, 201); + const messageId = sendRes.body.message_id; + + // Pull — should be leased (not auto-acked) because work_order always retains + const pullRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`) + .send({ visibility_timeout: 60 }); + + assert.equal(pullRes.status, 200); + assert.equal(pullRes.body.message_id, messageId); + assert.equal(pullRes.body.auto_acked, undefined); // not auto-acked despite agent preference + assert.ok(pullRes.body.lease_until); + + // Explicit ack required + const ackRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`) + .send({}); + assert.equal(ackRes.status, 200); +}); + +test('fix_request type always sets retain_until_acked server-side', async () => { + const sender = await registerAgent('fr-retain-sender'); + const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const recipientRes = await request(app) + .post('/api/agents/register') + .send({ agent_id: `fr-retain-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true }); + assert.equal(recipientRes.status, 201); + const recipient = recipientRes.body; + + const sendRes = await sendSignedMessage(sender, recipient.agent_id, { + type: 'fix_request', + subject: 'retain-fix-request', + body: { bug: 'critical' } + }); + assert.equal(sendRes.status, 201); + const messageId = sendRes.body.message_id; + + // Pull — should be leased, not auto-acked (fix_request is in RETAIN_TYPES) + const pullRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`) + .send({ visibility_timeout: 60 }); + + assert.equal(pullRes.status, 200); + assert.equal(pullRes.body.message_id, messageId); + assert.equal(pullRes.body.auto_acked, undefined); + assert.ok(pullRes.body.lease_until); + + const ackRes = await request(app) + .post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`) + .send({}); + assert.equal(ackRes.status, 200); +}); + +test('auto_ack_on_pull: invalid non-boolean value rejected at registration', async () => { + const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const res = await request(app) + .post('/api/agents/register') + .send({ agent_id: `aap-invalid-${suffix}`, agent_type: 'test', auto_ack_on_pull: 'yes' }); + assert.equal(res.status, 400); + assert.equal(res.body.error, 'INVALID_INPUT'); +}); + +test('retain_until_acked: invalid non-boolean value rejected on send', async () => { + const sender = await registerAgent('rua-invalid-sender'); + const recipient = await registerAgent('rua-invalid-recipient'); + const sendRes = await sendSignedMessage(sender, recipient.agent_id, { + type: 'notification', + subject: 'rua-invalid', + body: { test: true }, + retain_until_acked: 'yes' + }); + assert.equal(sendRes.status, 400); + assert.equal(sendRes.body.error, 'INVALID_INPUT'); +}); + test('groups: owner can add member; non-member cannot add members', async () => { const owner = await registerAgent('add-owner'); const nonMember = await registerAgent('add-nonmember'); diff --git a/src/services/agent.service.js b/src/services/agent.service.js index 75153e5..7a34fc6 100644 --- a/src/services/agent.service.js +++ b/src/services/agent.service.js @@ -27,7 +27,7 @@ export class AgentService { * @param {string} params.tenant_id - Tenant ID (required for seed-based) * @returns {Object} Agent with keypair */ - async register({ agent_id, agent_type = 'generic', metadata = {}, webhook_url, webhook_secret, seed, public_key, tenant_id }) { + async register({ agent_id, agent_type = 'generic', metadata = {}, webhook_url, webhook_secret, seed, public_key, tenant_id, auto_ack_on_pull = false }) { // Generate agent_id if not provided if (!agent_id) { agent_id = `agent-${uuid()}`; @@ -126,7 +126,8 @@ export class AgentService { timeout_ms: parseInt(process.env.HEARTBEAT_TIMEOUT_MS) || 300000 }, trusted_agents: [], - blocked_agents: [] + blocked_agents: [], + auto_ack_on_pull }; // Apply registration policy. diff --git a/src/services/inbox.service.js b/src/services/inbox.service.js index 581e0d0..3e2442c 100644 --- a/src/services/inbox.service.js +++ b/src/services/inbox.service.js @@ -12,6 +12,11 @@ import { webhookService } from './webhook.service.js'; // Safe agent identifier patterns — module-level constants so they are compiled once, // not recreated on every message send. const SAFE_CHARS = /^[a-zA-Z0-9._:-]+$/; + +// Message types that always require explicit ack, regardless of the recipient's +// auto_ack_on_pull preference. Losing a work order or fix request silently is +// always worse than a queue backlog. +const RETAIN_TYPES = new Set(['work_order', 'fix_request']); // VALID_AGENT_URI is NOT a subset of SAFE_CHARS — agent://foo contains slashes which // are not in the allowlist. It is the only branch that accepts legacy agent:// URIs. // Do not delete it assuming it is a no-op; doing so would silently break backward @@ -129,6 +134,11 @@ export class InboxService { } } + // retain_until_acked: message must be explicitly acked before removal, regardless + // of the recipient's auto_ack_on_pull preference. Work orders and fix requests + // always retain — losing them silently is worse than a queue backlog. + const retainUntilAcked = options.retain_until_acked || RETAIN_TYPES.has(envelope.type) || false; + // Create message record const message = { id: envelope.id || uuid(), @@ -141,7 +151,8 @@ export class InboxService { attempts: 0, ephemeral, ephemeral_ttl_sec: ephemeralTTLSec, - expires_at: ephemeralTTLSec ? Date.now() + (ephemeralTTLSec * 1000) : null + expires_at: ephemeralTTLSec ? Date.now() + (ephemeralTTLSec * 1000) : null, + retain_until_acked: retainUntilAcked }; const created = await storage.createMessage(message); @@ -192,6 +203,22 @@ export class InboxService { async pull(agentId, options = {}) { const visibility_timeout = options.visibility_timeout || 60; + // Prefer caller-provided preference (avoids a storage round-trip when the route + // already has the authenticated agent via HTTP Signature middleware). + // Falls back to a storage lookup for legacy auth paths and test paths that + // do not send a Signature header — this adds one extra sequential read on those + // paths. The cost is acceptable because (a) auto_ack_on_pull agents are rare, + // (b) the lookup is cheap for the memory backend, and (c) the alternative + // (always looking up) was worse. Future: if the Mech backend proves slow here, + // thread auto_ack_on_pull through the auth middleware for all auth methods. + let autoAck; + if (options.auto_ack_on_pull !== undefined) { + autoAck = options.auto_ack_on_pull; + } else { + const agent = await agentService.getAgent(agentId); + autoAck = agent?.auto_ack_on_pull ?? false; + } + // Get available messages const now = Date.now(); let messages = await storage.getInbox(agentId, 'queued'); @@ -206,7 +233,20 @@ export class InboxService { // Get oldest message (FIFO) const message = messages.sort((a, b) => a.created_at - b.created_at)[0]; - // Lease the message + // auto_ack_on_pull: skip the lease and immediately ack. This avoids the state + // inconsistency of returning a 'leased' snapshot while storage holds 'acked'. + // If the ack write throws, it propagates — the message stays queued for retry. + // retain_until_acked overrides auto_ack_on_pull regardless of agent preference. + if (autoAck && !message.retain_until_acked) { + const acked = await storage.updateMessage(message.id, { + status: 'acked', + acked_at: Date.now(), + attempts: message.attempts + 1 + }); + return { ...acked, auto_acked: true }; + } + + // Standard lease path const leaseUntil = Date.now() + (visibility_timeout * 1000); const leased = await storage.updateMessage(message.id, {