Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/AGENT-GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,30 @@ This means a DID:web agent with a crafted long or unusual domain path could crea
- **Use `ephemeral: true` for sensitive payloads.** Ephemeral messages have their body permanently deleted on ack. Use `ttl` for time-sensitive secrets.
- **Use correlation IDs for request-response.** Set `correlation_id` on messages you send; use the `/reply` endpoint to send a correlated response.

### Delivery Guarantees: `retain_until_acked` and `auto_ack_on_pull`

ADMP supports two opt-in delivery modes beyond the default lease-and-ack flow:

#### `retain_until_acked` (sender-set, per-message)

Set `retain_until_acked: true` in the send body to require explicit acknowledgment. This overrides the recipient's `auto_ack_on_pull` preference — the message will never be silently consumed.

**`work_order` and `fix_request` always retain** — the hub sets `retain_until_acked: true` automatically for these types, regardless of what the sender passes.

#### `auto_ack_on_pull` (recipient-set, at registration)

Register with `auto_ack_on_pull: true` for fire-and-forget delivery. The hub immediately acks each message on pull — no explicit `POST .../ack` is required.

The pull response includes `"auto_acked": true` when the hub auto-acked the message. In this case `lease_until` is `null` — do not call `POST .../ack` for auto-acked messages (it will return 400).

`retain_until_acked` always wins over `auto_ack_on_pull` — work orders and retained messages require explicit ack even if the recipient opted into auto-ack.

| Scenario | Recommended setting |
|----------|-------------------|
| Work orders, fix requests, critical tasks | `retain_until_acked: true` (automatic for `work_order`/`fix_request`) |
| Fire-and-forget notifications | Register recipient with `auto_ack_on_pull: true` |
| Default — explicit ack, retries on timeout | Neither flag |

### Performance

- **Pull in a loop** with a reasonable `visibility_timeout` (30-60s) to avoid re-processing.
Expand Down
12 changes: 9 additions & 3 deletions src/routes/agents.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ const router = express.Router();
*/
router.post('/register', async (req, res) => {
try {
const { agent_id, agent_type, metadata, webhook_url, webhook_secret, seed, public_key, tenant_id } = req.body;
const { agent_id, agent_type, metadata, webhook_url, webhook_secret, seed, public_key, tenant_id, auto_ack_on_pull } = req.body;

if (auto_ack_on_pull !== undefined && typeof auto_ack_on_pull !== 'boolean') {
return res.status(400).json({ error: 'INVALID_INPUT', message: 'auto_ack_on_pull must be a boolean' });
}

// Convert seed from base64 string to Uint8Array if provided
let seedBytes;
Expand All @@ -35,7 +39,8 @@ router.post('/register', async (req, res) => {
webhook_secret,
seed: seedBytes,
public_key,
tenant_id
tenant_id,
auto_ack_on_pull
});

const response = {
Expand All @@ -50,7 +55,8 @@ router.post('/register', async (req, res) => {
tenant_id: agent.tenant_id,
webhook_url: agent.webhook_url,
webhook_secret: agent.webhook_secret,
heartbeat: agent.heartbeat
heartbeat: agent.heartbeat,
auto_ack_on_pull: agent.auto_ack_on_pull
};

// Only include secret_key when available (legacy and seed-based modes)
Expand Down
27 changes: 20 additions & 7 deletions src/routes/inbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ const router = express.Router();
*/
router.post('/:agentId/messages', async (req, res) => {
try {
const { ephemeral, ttl, ...envelope } = req.body;
const { ephemeral, ttl, retain_until_acked, ...envelope } = req.body;

if (retain_until_acked !== undefined && typeof retain_until_acked !== 'boolean') {
return res.status(400).json({ error: 'INVALID_INPUT', message: 'retain_until_acked must be a boolean' });
}

// Ensure to field matches URL
if (!envelope.to) {
Expand All @@ -25,7 +29,8 @@ router.post('/:agentId/messages', async (req, res) => {

const message = await inboxService.send(envelope, {
ephemeral: ephemeral || false,
ttl: ttl || null
ttl: ttl || null,
retain_until_acked
});

res.status(201).json({
Expand Down Expand Up @@ -69,9 +74,15 @@ router.post('/:agentId/inbox/pull', authenticateHttpSignature, async (req, res)
try {
const { visibility_timeout } = req.body;

const message = await inboxService.pull(req.params.agentId, {
visibility_timeout
});
// If req.agent is populated by HTTP Signature auth, pass the preference directly
// to avoid a redundant storage round-trip in the service. Falls back to a service-
// level lookup when req.agent is unavailable (e.g. legacy auth without signature).
const pullOpts = { visibility_timeout };
if (req.agent?.auto_ack_on_pull !== undefined) {
pullOpts.auto_ack_on_pull = req.agent.auto_ack_on_pull;
}

const message = await inboxService.pull(req.params.agentId, pullOpts);

if (!message) {
return res.status(204).send();
Expand All @@ -80,8 +91,10 @@ router.post('/:agentId/inbox/pull', authenticateHttpSignature, async (req, res)
res.json({
message_id: message.id,
envelope: message.envelope,
lease_until: message.lease_until,
attempts: message.attempts
// auto_acked messages are already acked in storage; lease_until is not meaningful.
lease_until: message.auto_acked ? null : message.lease_until,
attempts: message.attempts,
...(message.auto_acked && { auto_acked: true })
});
} catch (error) {
res.status(400).json({
Expand Down
174 changes: 173 additions & 1 deletion src/server.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ async function sendSignedMessage(sender, recipientId, options = {}) {
envelope.signature.sig = 'invalid-signature';
}

// Build send body — envelope fields plus optional ephemeral/ttl
// Build send body — envelope fields plus optional ephemeral/ttl/retain_until_acked
const sendBody = { ...envelope };
if (options.ephemeral !== undefined) {
sendBody.ephemeral = options.ephemeral;
}
if (options.ttl !== undefined) {
sendBody.ttl = options.ttl;
}
if (options.retain_until_acked !== undefined) {
sendBody.retain_until_acked = options.retain_until_acked;
}

const res = await request(app)
.post(`/api/agents/${encodeURIComponent(recipientId)}/messages`)
Expand Down Expand Up @@ -1130,6 +1133,175 @@ test('non-ephemeral messages behave as before (backward compat)', async () => {
assert.equal(statusRes.body.status, 'acked');
});

test('auto_ack_on_pull: message is immediately acked on pull, auto_acked returned in response', async () => {
const sender = await registerAgent('aap-sender');
// Register recipient with auto_ack_on_pull: true
const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const recipientRes = await request(app)
.post('/api/agents/register')
.send({ agent_id: `aap-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true });
assert.equal(recipientRes.status, 201);
assert.equal(recipientRes.body.auto_ack_on_pull, true);
const recipient = recipientRes.body;

// Send a notification (not a retain type)
const sendRes = await sendSignedMessage(sender, recipient.agent_id, {
type: 'notification',
subject: 'auto-ack-test',
body: { ping: true }
});
assert.equal(sendRes.status, 201);
const messageId = sendRes.body.message_id;

// Pull — hub should auto-ack immediately
const pullRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`)
.send({ visibility_timeout: 60 });

assert.equal(pullRes.status, 200);
assert.equal(pullRes.body.message_id, messageId);
assert.equal(pullRes.body.auto_acked, true);
assert.equal(pullRes.body.lease_until, null);

// Message should be acked in storage — further ack should fail
const ackRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`)
.send({});
assert.equal(ackRes.status, 400); // already acked, not leased

// Status confirms acked
const statusRes = await request(app).get(`/api/messages/${messageId}/status`);
assert.equal(statusRes.status, 200);
assert.equal(statusRes.body.status, 'acked');
});

test('retain_until_acked overrides auto_ack_on_pull — explicit ack required', async () => {
const sender = await registerAgent('retain-sender');
const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const recipientRes = await request(app)
.post('/api/agents/register')
.send({ agent_id: `retain-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true });
assert.equal(recipientRes.status, 201);
const recipient = recipientRes.body;

// Send with retain_until_acked: true (overrides recipient auto_ack_on_pull)
const sendRes = await sendSignedMessage(sender, recipient.agent_id, {
type: 'notification',
subject: 'retain-test',
body: { important: true },
retain_until_acked: true
});
assert.equal(sendRes.status, 201);
const messageId = sendRes.body.message_id;

// Pull — should be leased, NOT auto-acked
const pullRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`)
.send({ visibility_timeout: 60 });

assert.equal(pullRes.status, 200);
assert.equal(pullRes.body.message_id, messageId);
assert.equal(pullRes.body.auto_acked, undefined); // not auto-acked
assert.ok(pullRes.body.lease_until); // has a real lease

// Explicit ack should succeed
const ackRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`)
.send({});
assert.equal(ackRes.status, 200);
assert.equal(ackRes.body.ok, true);
});

test('work_order type always sets retain_until_acked server-side', async () => {
const sender = await registerAgent('wo-retain-sender');
const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const recipientRes = await request(app)
.post('/api/agents/register')
.send({ agent_id: `wo-retain-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true });
assert.equal(recipientRes.status, 201);
const recipient = recipientRes.body;

// Send a work_order — server should set retain_until_acked automatically
const sendRes = await sendSignedMessage(sender, recipient.agent_id, {
type: 'work_order',
subject: 'retain-by-type',
body: { task: 'do something important' }
});
assert.equal(sendRes.status, 201);
const messageId = sendRes.body.message_id;

// Pull — should be leased (not auto-acked) because work_order always retains
const pullRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`)
.send({ visibility_timeout: 60 });

assert.equal(pullRes.status, 200);
assert.equal(pullRes.body.message_id, messageId);
assert.equal(pullRes.body.auto_acked, undefined); // not auto-acked despite agent preference
assert.ok(pullRes.body.lease_until);

// Explicit ack required
const ackRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`)
.send({});
assert.equal(ackRes.status, 200);
});

test('fix_request type always sets retain_until_acked server-side', async () => {
const sender = await registerAgent('fr-retain-sender');
const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const recipientRes = await request(app)
.post('/api/agents/register')
.send({ agent_id: `fr-retain-recipient-${suffix}`, agent_type: 'test', auto_ack_on_pull: true });
assert.equal(recipientRes.status, 201);
const recipient = recipientRes.body;

const sendRes = await sendSignedMessage(sender, recipient.agent_id, {
type: 'fix_request',
subject: 'retain-fix-request',
body: { bug: 'critical' }
});
assert.equal(sendRes.status, 201);
const messageId = sendRes.body.message_id;

// Pull — should be leased, not auto-acked (fix_request is in RETAIN_TYPES)
const pullRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/inbox/pull`)
.send({ visibility_timeout: 60 });

assert.equal(pullRes.status, 200);
assert.equal(pullRes.body.message_id, messageId);
assert.equal(pullRes.body.auto_acked, undefined);
assert.ok(pullRes.body.lease_until);

const ackRes = await request(app)
.post(`/api/agents/${encodeURIComponent(recipient.agent_id)}/messages/${messageId}/ack`)
.send({});
assert.equal(ackRes.status, 200);
});

test('auto_ack_on_pull: invalid non-boolean value rejected at registration', async () => {
const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const res = await request(app)
.post('/api/agents/register')
.send({ agent_id: `aap-invalid-${suffix}`, agent_type: 'test', auto_ack_on_pull: 'yes' });
assert.equal(res.status, 400);
assert.equal(res.body.error, 'INVALID_INPUT');
});

test('retain_until_acked: invalid non-boolean value rejected on send', async () => {
const sender = await registerAgent('rua-invalid-sender');
const recipient = await registerAgent('rua-invalid-recipient');
const sendRes = await sendSignedMessage(sender, recipient.agent_id, {
type: 'notification',
subject: 'rua-invalid',
body: { test: true },
retain_until_acked: 'yes'
});
assert.equal(sendRes.status, 400);
assert.equal(sendRes.body.error, 'INVALID_INPUT');
});

test('groups: owner can add member; non-member cannot add members', async () => {
const owner = await registerAgent('add-owner');
const nonMember = await registerAgent('add-nonmember');
Expand Down
5 changes: 3 additions & 2 deletions src/services/agent.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class AgentService {
* @param {string} params.tenant_id - Tenant ID (required for seed-based)
* @returns {Object} Agent with keypair
*/
async register({ agent_id, agent_type = 'generic', metadata = {}, webhook_url, webhook_secret, seed, public_key, tenant_id }) {
async register({ agent_id, agent_type = 'generic', metadata = {}, webhook_url, webhook_secret, seed, public_key, tenant_id, auto_ack_on_pull = false }) {
// Generate agent_id if not provided
if (!agent_id) {
agent_id = `agent-${uuid()}`;
Expand Down Expand Up @@ -126,7 +126,8 @@ export class AgentService {
timeout_ms: parseInt(process.env.HEARTBEAT_TIMEOUT_MS) || 300000
},
trusted_agents: [],
blocked_agents: []
blocked_agents: [],
auto_ack_on_pull
};

// Apply registration policy.
Expand Down
Loading