From f0ef93119c46d3ce20ab9dc78c1d05eef6963747 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 15:51:14 +0000 Subject: [PATCH 01/15] feat: expose durable delivery inbox and ack/fail/defer API Add the public engine/API/SDK surface for server-backed durable delivery on top of the existing per-recipient `deliveries` model (issue #154). Engine: - GET /v1/deliveries lists an agent's queued items (accepted + deferred by default, oldest first) with the message payload so offline consumers can replay on reconnect. - POST /v1/deliveries/:id/ack|fail|defer idempotently transition a delivery to delivered / failed (error + retryable) / deferred (available_at + reason), emitting delivery.delivered / delivery.failed / delivery.deferred to the agent. Types: - New Delivery / DeliveryItem / request schemas. - Typed delivery.accepted, delivery.delivered, delivery.deferred, and delivery.failed server events wired into the event unions. SDK: - agent.deliveries() / ackDelivery() / failDelivery() / deferDelivery() and delivery.* event handlers so downstream SDKs can report durable delivery support honestly. Docs: README + openapi.yaml document the new endpoints and schemas. Also align the engine's @relaycast/types dependency to the workspace version so it links the local package instead of a stale published copy. --- README.md | 14 + openapi.yaml | 249 ++++++++++++++++++ package-lock.json | 12 +- packages/engine/package.json | 2 +- .../__tests__/conformance/delivery.test.ts | 178 +++++++++++++ packages/engine/src/engine.ts | 2 + packages/engine/src/engine/delivery.ts | 198 ++++++++++++++ packages/engine/src/routes/delivery.ts | 169 ++++++++++++ packages/sdk-typescript/src/agent.ts | 51 ++++ packages/sdk-typescript/src/types.ts | 10 + packages/types/src/__tests__/types.test.ts | 58 +++- packages/types/src/delivery.ts | 64 +++++ packages/types/src/events.ts | 44 ++++ packages/types/src/index.ts | 1 + 14 files changed, 1040 insertions(+), 12 deletions(-) create mode 100644 packages/engine/src/__tests__/conformance/delivery.test.ts create mode 100644 packages/engine/src/engine/delivery.ts create mode 100644 packages/engine/src/routes/delivery.ts create mode 100644 packages/types/src/delivery.ts diff --git a/README.md b/README.md index b7fe2db7..684171ec 100644 --- a/README.md +++ b/README.md @@ -309,6 +309,20 @@ GET /inbox GET /search ``` +Durable delivery (server-backed, per-recipient delivery contract): + +```text +GET /deliveries List queued deliveries for the agent (accepted + deferred) +POST /deliveries/:id/ack Acknowledge a delivery (-> delivered) +POST /deliveries/:id/fail Record a failed delivery (error + retryable) +POST /deliveries/:id/defer Defer a delivery until available_at +``` + +Relaycast creates a per-recipient delivery row for every channel message, DM, group DM, and +thread reply, and emits `delivery.accepted`, `delivery.delivered`, `delivery.deferred`, and +`delivery.failed` events to the recipient. Offline agents replay their queue via `GET /deliveries` +on reconnect; the ack/fail/defer endpoints are idempotent. + A2A (Agent-to-Agent) gateway endpoints: ```text diff --git a/openapi.yaml b/openapi.yaml index 6bad3898..8c07396b 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -336,6 +336,78 @@ components: items: $ref: '#/components/schemas/Message' + Delivery: + type: object + description: A per-recipient durable delivery record. + properties: + id: + type: string + message_id: + type: string + channel_id: + type: string + agent_id: + type: string + status: + type: string + enum: [accepted, delivered, deferred, failed] + mode: + type: string + reason: + type: string + nullable: true + priority: + type: string + retryable: + type: boolean + nullable: true + error: + type: string + nullable: true + available_at: + type: string + format: date-time + nullable: true + deadline: + type: string + format: date-time + nullable: true + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + nullable: true + + DeliveryItem: + allOf: + - $ref: '#/components/schemas/Delivery' + - type: object + properties: + message: + nullable: true + type: object + properties: + id: + type: string + channel_id: + type: string + agent_id: + type: string + nullable: true + agent_name: + type: string + nullable: true + text: + type: string + thread_id: + type: string + nullable: true + created_at: + type: string + format: date-time + SearchResult: type: object properties: @@ -2175,6 +2247,183 @@ paths: schema: $ref: '#/components/schemas/SuccessResponse' + /deliveries: + get: + summary: List durable deliveries + description: > + List durable delivery items queued for the calling agent. Defaults to + the non-terminal queue (`accepted` + `deferred`) so an offline consumer + can replay what it missed after reconnect, oldest first. Each item + carries the message payload. + tags: + - Deliveries + security: + - agentToken: [] + parameters: + - name: status + in: query + required: false + description: Filter by a single delivery status. + schema: + type: string + enum: [accepted, delivered, deferred, failed] + - name: limit + in: query + required: false + description: Maximum number of items to return (1-200, default 100). + schema: + type: integer + minimum: 1 + maximum: 200 + responses: + '200': + description: Queued delivery items + content: + application/json: + schema: + type: object + properties: + ok: + type: boolean + data: + type: array + items: + $ref: '#/components/schemas/DeliveryItem' + + /deliveries/{id}/ack: + post: + summary: Acknowledge a delivery + description: Idempotently transition a delivery to `delivered`. + tags: + - Deliveries + security: + - agentToken: [] + parameters: + - name: id + in: path + required: true + schema: + type: string + responses: + '200': + description: Updated delivery + content: + application/json: + schema: + type: object + properties: + ok: + type: boolean + data: + $ref: '#/components/schemas/Delivery' + '404': + description: Delivery not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + + /deliveries/{id}/fail: + post: + summary: Fail a delivery + description: > + Idempotently record a delivery as `failed`, capturing error text and + whether a retry is sane. + tags: + - Deliveries + security: + - agentToken: [] + parameters: + - name: id + in: path + required: true + schema: + type: string + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + error: + type: string + retryable: + type: boolean + responses: + '200': + description: Updated delivery + content: + application/json: + schema: + type: object + properties: + ok: + type: boolean + data: + $ref: '#/components/schemas/Delivery' + '404': + description: Delivery not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + + /deliveries/{id}/defer: + post: + summary: Defer a delivery + description: > + Idempotently record a delivery as `deferred` with the time it next + becomes available for retry. + tags: + - Deliveries + security: + - agentToken: [] + parameters: + - name: id + in: path + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - available_at + properties: + available_at: + type: string + format: date-time + reason: + type: string + responses: + '200': + description: Updated delivery + content: + application/json: + schema: + type: object + properties: + ok: + type: boolean + data: + $ref: '#/components/schemas/Delivery' + '400': + description: Invalid available_at + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '404': + description: Delivery not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + /files/upload: post: summary: Create file upload diff --git a/package-lock.json b/package-lock.json index dcb760d3..59792160 100644 --- a/package-lock.json +++ b/package-lock.json @@ -206,7 +206,7 @@ }, "node_modules/@clack/prompts/node_modules/is-unicode-supported": { "version": "1.3.0", - "dev": true, + "extraneous": true, "inBundle": true, "license": "MIT", "engines": { @@ -10498,7 +10498,7 @@ "dependencies": { "@hono/node-server": "^1.13.7", "@relaycast/a2a": "1.1.7", - "@relaycast/types": "1.1.7", + "@relaycast/types": "2.0.0", "better-sqlite3": "^11.10.0", "drizzle-orm": "^0.45.1", "hono": "^4.11.9", @@ -10524,14 +10524,6 @@ "zod": "^4.3.6" } }, - "packages/engine/node_modules/@relaycast/types": { - "version": "1.1.7", - "resolved": "https://registry.npmjs.org/@relaycast/types/-/types-1.1.7.tgz", - "integrity": "sha512-CzLik3qI5f+X0aQeFEITd26/Dn1W07XPHgxE6MwSj2idZen3IkprqdiTBfl/xj1JRqiQH+yq3pW7DeqJx8YC6g==", - "dependencies": { - "zod": "^4.3.6" - } - }, "packages/mcp": { "name": "@relaycast/mcp", "version": "2.0.0", diff --git a/packages/engine/package.json b/packages/engine/package.json index 167c1026..5d95354b 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -44,7 +44,7 @@ "dependencies": { "@hono/node-server": "^1.13.7", "@relaycast/a2a": "1.1.7", - "@relaycast/types": "1.1.7", + "@relaycast/types": "2.0.0", "better-sqlite3": "^11.10.0", "drizzle-orm": "^0.45.1", "hono": "^4.11.9", diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts new file mode 100644 index 00000000..9c94e1d3 --- /dev/null +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -0,0 +1,178 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + makeNodeStack, + createWorkspace, + registerAgent, + FakeSocket, + type TestStack, +} from './harness.js'; + +/** + * Durable delivery API conformance: listing the queued inbox, and the + * idempotent ack / fail / defer transitions over the public engine routes. + */ +describe('durable delivery api', () => { + let stack: TestStack; + beforeEach(() => { stack = makeNodeStack({ ttlMs: 60_000 }); }); + afterEach(() => stack.close()); + + /** Stand up a workspace + channel with alice and bob joined, alice posts one message. */ + async function seed() { + const ws = await createWorkspace(stack.app, 'delivery-ws'); + const alice = await registerAgent(stack.app, ws.workspaceKey, 'alice'); + const bob = await registerAgent(stack.app, ws.workspaceKey, 'bob'); + + const createRes = await stack.app.request('/v1/channels', { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${ws.workspaceKey}` }, + body: JSON.stringify({ name: 'team-chat' }), + }); + expect(createRes.status).toBeLessThan(300); + for (const token of [alice.token, bob.token]) { + const joinRes = await stack.app.request('/v1/channels/team-chat/join', { + method: 'POST', + headers: { authorization: `Bearer ${token}` }, + }); + expect(joinRes.status).toBeLessThan(300); + } + + const postRes = await stack.app.request('/v1/channels/team-chat/messages', { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${alice.token}` }, + body: JSON.stringify({ text: 'hello bob' }), + }); + expect(postRes.status).toBeLessThan(300); + const messageId = ((await postRes.json()) as { data: { id: string } }).data.id; + + return { ws, alice, bob, messageId }; + } + + async function listDeliveries(token: string, query = '') { + const res = await stack.app.request(`/v1/deliveries${query}`, { + headers: { authorization: `Bearer ${token}` }, + }); + expect(res.status).toBe(200); + return ((await res.json()) as { data: Array> }).data; + } + + it('lists the queued delivery for a recipient with the message payload', async () => { + const { bob, messageId } = await seed(); + + const items = await listDeliveries(bob.token); + expect(items).toHaveLength(1); + const item = items[0]; + expect(item.message_id).toBe(messageId); + expect(item.status).toBe('accepted'); + expect(item.agent_id).toBe(bob.agentId); + expect((item.message as { text: string }).text).toBe('hello bob'); + expect((item.message as { agent_name: string }).agent_name).toBe('alice'); + }); + + it('does not expose deliveries to non-recipients', async () => { + const { alice } = await seed(); + // alice is the sender, so she has no delivery row of her own + const items = await listDeliveries(alice.token); + expect(items).toHaveLength(0); + }); + + it('acks a delivery idempotently and removes it from the default queue', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + const deliveryId = item.id as string; + + const ack1 = await stack.app.request(`/v1/deliveries/${deliveryId}/ack`, { + method: 'POST', + headers: { authorization: `Bearer ${bob.token}` }, + }); + expect(ack1.status).toBe(200); + expect(((await ack1.json()) as { data: { status: string } }).data.status).toBe('delivered'); + + // Idempotent: second ack still 200 + delivered. + const ack2 = await stack.app.request(`/v1/deliveries/${deliveryId}/ack`, { + method: 'POST', + headers: { authorization: `Bearer ${bob.token}` }, + }); + expect(ack2.status).toBe(200); + expect(((await ack2.json()) as { data: { status: string } }).data.status).toBe('delivered'); + + // Delivered items drop out of the default (accepted+deferred) queue. + expect(await listDeliveries(bob.token)).toHaveLength(0); + // But are still listable by explicit status filter. + expect(await listDeliveries(bob.token, '?status=delivered')).toHaveLength(1); + }); + + it('records a failed delivery with error text and retryability', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const res = await stack.app.request(`/v1/deliveries/${item.id}/fail`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ error: 'handler threw', retryable: true }), + }); + expect(res.status).toBe(200); + const data = ((await res.json()) as { data: Record }).data; + expect(data.status).toBe('failed'); + expect(data.error).toBe('handler threw'); + expect(data.retryable).toBe(true); + }); + + it('defers a delivery with available_at and keeps it in the queue', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + // Timestamps are stored at second precision (unixepoch), so align to the second. + const availableAt = new Date(Math.floor((Date.now() + 60_000) / 1000) * 1000).toISOString(); + + const res = await stack.app.request(`/v1/deliveries/${item.id}/defer`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ available_at: availableAt, reason: 'busy' }), + }); + expect(res.status).toBe(200); + const data = ((await res.json()) as { data: Record }).data; + expect(data.status).toBe('deferred'); + expect(data.available_at).toBe(availableAt); + expect(data.reason).toBe('busy'); + + // Deferred items remain in the default queue for later retry. + const queued = await listDeliveries(bob.token); + expect(queued).toHaveLength(1); + expect(queued[0].status).toBe('deferred'); + }); + + it('rejects an invalid defer payload', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + const res = await stack.app.request(`/v1/deliveries/${item.id}/defer`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ available_at: 'not-a-date' }), + }); + expect(res.status).toBe(400); + }); + + it('returns 404 for an unknown or unowned delivery', async () => { + const { alice } = await seed(); + const res = await stack.app.request('/v1/deliveries/del_does_not_exist/ack', { + method: 'POST', + headers: { authorization: `Bearer ${alice.token}` }, + }); + expect(res.status).toBe(404); + }); + + it('emits delivery.delivered to the recipient on ack', async () => { + const { ws, bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const bobSock = new FakeSocket(); + stack.runtime.realtime.attachAgentSocket(ws.workspaceId, bob.agentId, bobSock); + + const res = await stack.app.request(`/v1/deliveries/${item.id}/ack`, { + method: 'POST', + headers: { authorization: `Bearer ${bob.token}` }, + }); + expect(res.status).toBe(200); + await new Promise((r) => setTimeout(r, 50)); + expect(bobSock.ofType('delivery.delivered').length).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 80991c16..436c0763 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -27,6 +27,7 @@ import { reactionRoutes } from './routes/reaction.js'; import { searchRoutes } from './routes/search.js'; import { inboxRoutes } from './routes/inbox.js'; import { receiptRoutes } from './routes/receipt.js'; +import { deliveryRoutes } from './routes/delivery.js'; import { fileRoutes } from './routes/file.js'; import { presenceRoutes } from './routes/presence.js'; import { systemPromptRoutes } from './routes/systemPrompt.js'; @@ -179,6 +180,7 @@ export function createEngine(deps: EngineDeps): Hono { v1.route('/', searchRoutes); v1.route('/', inboxRoutes); v1.route('/', receiptRoutes); + v1.route('/', deliveryRoutes); v1.route('/', fileRoutes); v1.route('/', inboundWebhookRoutes); v1.route('/', eventSubscriptionRoutes); diff --git a/packages/engine/src/engine/delivery.ts b/packages/engine/src/engine/delivery.ts new file mode 100644 index 00000000..f1f094a6 --- /dev/null +++ b/packages/engine/src/engine/delivery.ts @@ -0,0 +1,198 @@ +import { eq, and, asc, inArray } from 'drizzle-orm'; +import type { getDb } from '../db/index.js'; +import { deliveries, messages, agents } from '../db/schema.js'; +import type { DeliveryStatus } from '@relaycast/types'; + +type Db = ReturnType; + +type DeliveryRow = typeof deliveries.$inferSelect; + +function toIso(value: Date | null | undefined): string | null { + return value ? value.toISOString() : null; +} + +function serializeDelivery(row: DeliveryRow) { + return { + id: row.id, + message_id: row.messageId, + channel_id: '', // filled in by callers that have the message row + agent_id: row.agentId, + status: row.status as DeliveryStatus, + mode: row.mode, + reason: row.reason, + priority: row.priority, + retryable: row.retryable ?? null, + error: row.error, + available_at: toIso(row.availableAt), + deadline: toIso(row.deadline), + created_at: toIso(row.createdAt) ?? new Date(0).toISOString(), + updated_at: toIso(row.updatedAt), + }; +} + +/** + * List durable delivery items for an agent. Defaults to the non-terminal + * (`accepted` + `deferred`) queue so an offline consumer can replay what it + * missed on reconnect, oldest first (FIFO). Each item carries the message + * payload so the consumer does not need a second round-trip. + */ +export async function listDeliveries( + db: Db, + workspaceId: string, + agentId: string, + opts: { status?: DeliveryStatus; limit?: number } = {}, +) { + const limit = Math.min(Math.max(opts.limit ?? 100, 1), 200); + const statusFilter = opts.status + ? eq(deliveries.status, opts.status) + : inArray(deliveries.status, ['accepted', 'deferred']); + + const rows = await db + .select() + .from(deliveries) + .where( + and( + eq(deliveries.workspaceId, workspaceId), + eq(deliveries.agentId, agentId), + statusFilter, + ), + ) + .orderBy(asc(deliveries.createdAt), asc(deliveries.id)) + .limit(limit); + + if (rows.length === 0) return []; + + const messageIds = [...new Set(rows.map((r) => r.messageId))]; + const msgRows = await db + .select({ + id: messages.id, + channelId: messages.channelId, + agentId: messages.agentId, + agentName: agents.name, + body: messages.body, + threadId: messages.threadId, + createdAt: messages.createdAt, + }) + .from(messages) + .leftJoin(agents, eq(messages.agentId, agents.id)) + .where(inArray(messages.id, messageIds)); + const msgById = new Map(msgRows.map((m) => [m.id, m])); + + return rows.map((row) => { + const msg = msgById.get(row.messageId); + return { + ...serializeDelivery(row), + channel_id: msg?.channelId ?? '', + message: msg + ? { + id: msg.id, + channel_id: msg.channelId, + agent_id: msg.agentId ?? null, + agent_name: msg.agentName ?? null, + text: msg.body, + thread_id: msg.threadId ?? null, + created_at: msg.createdAt.toISOString(), + } + : null, + }; + }); +} + +/** Fetch a single delivery owned by the agent, or null. */ +async function getOwnedDelivery( + db: Db, + workspaceId: string, + agentId: string, + deliveryId: string, +): Promise { + const [row] = await db + .select() + .from(deliveries) + .where( + and( + eq(deliveries.id, deliveryId), + eq(deliveries.workspaceId, workspaceId), + eq(deliveries.agentId, agentId), + ), + ); + return row ?? null; +} + +async function reloadDelivery(db: Db, deliveryId: string): Promise { + const [row] = await db.select().from(deliveries).where(eq(deliveries.id, deliveryId)); + return row; +} + +/** + * Idempotently transition a delivery to `delivered`. Repeated calls return the + * same delivered record. Returns null if the delivery is not found / not owned. + */ +export async function ackDelivery( + db: Db, + workspaceId: string, + agentId: string, + deliveryId: string, +) { + const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); + if (!existing) return null; + + if (existing.status !== 'delivered') { + await db + .update(deliveries) + .set({ status: 'delivered', updatedAt: new Date() }) + .where(eq(deliveries.id, deliveryId)); + } + return serializeDelivery(await reloadDelivery(db, deliveryId)); +} + +/** + * Idempotently record a delivery as `failed`, capturing error text and + * retryability. Returns null if the delivery is not found / not owned. + */ +export async function failDelivery( + db: Db, + workspaceId: string, + agentId: string, + deliveryId: string, + opts: { error?: string; retryable?: boolean } = {}, +) { + const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); + if (!existing) return null; + + await db + .update(deliveries) + .set({ + status: 'failed', + error: opts.error ?? null, + retryable: opts.retryable ?? null, + updatedAt: new Date(), + }) + .where(eq(deliveries.id, deliveryId)); + return serializeDelivery(await reloadDelivery(db, deliveryId)); +} + +/** + * Idempotently record a delivery as `deferred` with the time it next becomes + * available. Returns null if the delivery is not found / not owned. + */ +export async function deferDelivery( + db: Db, + workspaceId: string, + agentId: string, + deliveryId: string, + opts: { availableAt: Date; reason?: string }, +) { + const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); + if (!existing) return null; + + await db + .update(deliveries) + .set({ + status: 'deferred', + availableAt: opts.availableAt, + reason: opts.reason ?? existing.reason, + updatedAt: new Date(), + }) + .where(eq(deliveries.id, deliveryId)); + return serializeDelivery(await reloadDelivery(db, deliveryId)); +} diff --git a/packages/engine/src/routes/delivery.ts b/packages/engine/src/routes/delivery.ts new file mode 100644 index 00000000..be8292bb --- /dev/null +++ b/packages/engine/src/routes/delivery.ts @@ -0,0 +1,169 @@ +import { Hono } from 'hono'; +import { z } from 'zod'; +import type { AppEnv } from '../env.js'; +import { requireAgentToken } from '../middleware/auth.js'; +import { rateLimit } from '../middleware/rateLimit.js'; +import * as deliveryEngine from '../engine/delivery.js'; +import { fanoutToAgents } from './fanout.js'; +import { runInBackground } from './background.js'; +import { errorResponse } from '../lib/httpError.js'; +import { DeliveryStatusSchema, FailDeliveryRequestSchema, DeferDeliveryRequestSchema } from '@relaycast/types'; + +export const deliveryRoutes = new Hono(); + +const listQuerySchema = z.object({ + status: DeliveryStatusSchema.optional(), + limit: z.coerce.number().int().positive().max(200).optional(), +}); + +// GET /v1/deliveries - durable delivery queue for the calling agent. +// Defaults to non-terminal items (accepted + deferred) so offline consumers +// can replay what they missed after reconnect. +deliveryRoutes.get( + '/deliveries', + requireAgentToken, + rateLimit, + async (c) => { + try { + const parsed = listQuerySchema.safeParse({ + status: c.req.query('status'), + limit: c.req.query('limit'), + }); + if (!parsed.success) { + return c.json({ + ok: false, + error: { code: 'invalid_request', message: 'Invalid status or limit' }, + }, 400); + } + const db = c.get('db'); + const workspace = c.get('workspace'); + const agent = c.get('agent'); + const result = await deliveryEngine.listDeliveries(db, workspace.id, agent!.id, parsed.data); + return c.json({ ok: true, data: result }); + } catch (err: unknown) { + return errorResponse(c, err); + } + }, +); + +// POST /v1/deliveries/:id/ack - idempotently mark a delivery delivered. +deliveryRoutes.post( + '/deliveries/:id/ack', + requireAgentToken, + rateLimit, + async (c) => { + try { + const db = c.get('db'); + const workspace = c.get('workspace'); + const agent = c.get('agent'); + const id = c.req.param('id'); + const result = await deliveryEngine.ackDelivery(db, workspace.id, agent!.id, id); + if (!result) { + return c.json({ + ok: false, + error: { code: 'delivery_not_found', message: 'Delivery not found' }, + }, 404); + } + runInBackground( + c, + fanoutToAgents(c, [agent!.id], 'delivery.delivered', { + delivery_id: result.id, + message_id: result.message_id, + }), + 'fanout delivery.delivered', + ); + return c.json({ ok: true, data: result }); + } catch (err: unknown) { + return errorResponse(c, err); + } + }, +); + +// POST /v1/deliveries/:id/fail - idempotently record a failed delivery. +deliveryRoutes.post( + '/deliveries/:id/fail', + requireAgentToken, + rateLimit, + async (c) => { + try { + const raw = await c.req.json().catch(() => ({})); + const parsed = FailDeliveryRequestSchema.safeParse(raw ?? {}); + if (!parsed.success) { + return c.json({ + ok: false, + error: { code: 'invalid_request', message: 'Invalid fail payload' }, + }, 400); + } + const db = c.get('db'); + const workspace = c.get('workspace'); + const agent = c.get('agent'); + const id = c.req.param('id'); + const result = await deliveryEngine.failDelivery(db, workspace.id, agent!.id, id, parsed.data); + if (!result) { + return c.json({ + ok: false, + error: { code: 'delivery_not_found', message: 'Delivery not found' }, + }, 404); + } + runInBackground( + c, + fanoutToAgents(c, [agent!.id], 'delivery.failed', { + delivery_id: result.id, + message_id: result.message_id, + error: result.error, + retryable: result.retryable, + }), + 'fanout delivery.failed', + ); + return c.json({ ok: true, data: result }); + } catch (err: unknown) { + return errorResponse(c, err); + } + }, +); + +// POST /v1/deliveries/:id/defer - idempotently defer a delivery to available_at. +deliveryRoutes.post( + '/deliveries/:id/defer', + requireAgentToken, + rateLimit, + async (c) => { + try { + const raw = await c.req.json().catch(() => null); + const parsed = DeferDeliveryRequestSchema.safeParse(raw); + if (!parsed.success) { + return c.json({ + ok: false, + error: { code: 'invalid_request', message: 'available_at must be an ISO-8601 timestamp' }, + }, 400); + } + const db = c.get('db'); + const workspace = c.get('workspace'); + const agent = c.get('agent'); + const id = c.req.param('id'); + const result = await deliveryEngine.deferDelivery(db, workspace.id, agent!.id, id, { + availableAt: new Date(parsed.data.available_at), + reason: parsed.data.reason, + }); + if (!result) { + return c.json({ + ok: false, + error: { code: 'delivery_not_found', message: 'Delivery not found' }, + }, 404); + } + runInBackground( + c, + fanoutToAgents(c, [agent!.id], 'delivery.deferred', { + delivery_id: result.id, + message_id: result.message_id, + available_at: result.available_at, + reason: result.reason, + }), + 'fanout delivery.deferred', + ); + return c.json({ ok: true, data: result }); + } catch (err: unknown) { + return errorResponse(c, err); + } + }, +); diff --git a/packages/sdk-typescript/src/agent.ts b/packages/sdk-typescript/src/agent.ts index b7b92e80..2d492730 100644 --- a/packages/sdk-typescript/src/agent.ts +++ b/packages/sdk-typescript/src/agent.ts @@ -56,6 +56,15 @@ import type { ActionInvokedEvent, ActionCompletedEvent, ActionFailedEvent, + Delivery, + DeliveryItem, + DeliveryStatus, + FailDeliveryRequest, + DeferDeliveryRequest, + DeliveryAcceptedEvent, + DeliveryDeliveredEvent, + DeliveryDeferredEvent, + DeliveryFailedEvent, WsReconnectingEvent, WsPermanentlyDisconnectedEvent, } from './types.js'; @@ -350,6 +359,11 @@ export class AgentClient { actionInvoked: (handler: (e: ActionInvokedEvent) => void): (() => void) => this.onEvent('action.invoked', handler), actionCompleted: (handler: (e: ActionCompletedEvent) => void): (() => void) => this.onEvent('action.completed', handler), actionFailed: (handler: (e: ActionFailedEvent) => void): (() => void) => this.onEvent('action.failed', handler), + // Durable delivery lifecycle + deliveryAccepted: (handler: (e: DeliveryAcceptedEvent) => void): (() => void) => this.onEvent('delivery.accepted', handler), + deliveryDelivered: (handler: (e: DeliveryDeliveredEvent) => void): (() => void) => this.onEvent('delivery.delivered', handler), + deliveryDeferred: (handler: (e: DeliveryDeferredEvent) => void): (() => void) => this.onEvent('delivery.deferred', handler), + deliveryFailed: (handler: (e: DeliveryFailedEvent) => void): (() => void) => this.onEvent('delivery.failed', handler), // Lifecycle connected: (handler: () => void): (() => void) => this.onEvent('open', handler as (e: never) => void), disconnected: (handler: () => void): (() => void) => this.onEvent('close', handler as (e: never) => void), @@ -717,6 +731,43 @@ export class AgentClient { ); } + // === Durable Delivery === + + /** + * List durable delivery items queued for this agent. Defaults to the + * non-terminal queue (accepted + deferred) so an offline consumer can replay + * what it missed on reconnect. Each item carries the message payload. + */ + async deliveries(options?: { status?: DeliveryStatus; limit?: number }): Promise { + const params: Record = {}; + if (options?.status) params.status = options.status; + if (options?.limit != null) params.limit = String(options.limit); + return this.client.get('/v1/deliveries', params); + } + + /** Idempotently acknowledge a delivery, transitioning it to `delivered`. */ + async ackDelivery(deliveryId: string): Promise { + return this.client.post( + `/v1/deliveries/${encodeURIComponent(deliveryId)}/ack`, + ); + } + + /** Idempotently record a delivery as `failed` with optional error/retryability. */ + async failDelivery(deliveryId: string, options?: FailDeliveryRequest): Promise { + return this.client.post( + `/v1/deliveries/${encodeURIComponent(deliveryId)}/fail`, + options ?? {}, + ); + } + + /** Idempotently defer a delivery until `availableAt`. */ + async deferDelivery(deliveryId: string, options: DeferDeliveryRequest): Promise { + return this.client.post( + `/v1/deliveries/${encodeURIComponent(deliveryId)}/defer`, + options, + ); + } + // === Actions === actions = { diff --git a/packages/sdk-typescript/src/types.ts b/packages/sdk-typescript/src/types.ts index c3793415..5933c229 100644 --- a/packages/sdk-typescript/src/types.ts +++ b/packages/sdk-typescript/src/types.ts @@ -494,6 +494,16 @@ export type UploadResponse = Camelize; export type ActionInvokedEvent = Camelize; export type ActionCompletedEvent = Camelize; export type ActionFailedEvent = Camelize; +export type Delivery = Camelize; +export type DeliveryItem = Camelize; +export type DeliveryMessage = Camelize; +export type DeliveryStatus = Raw.DeliveryStatus; +export type FailDeliveryRequest = Camelize; +export type DeferDeliveryRequest = Camelize; +export type DeliveryAcceptedEvent = Camelize; +export type DeliveryDeliveredEvent = Camelize; +export type DeliveryDeferredEvent = Camelize; +export type DeliveryFailedEvent = Camelize; export type Webhook = Camelize; export type WebhookReceivedEvent = Camelize; export type WebhookTriggerRequest = Camelize; diff --git a/packages/types/src/__tests__/types.test.ts b/packages/types/src/__tests__/types.test.ts index b0ed429d..305b4a34 100644 --- a/packages/types/src/__tests__/types.test.ts +++ b/packages/types/src/__tests__/types.test.ts @@ -1,4 +1,5 @@ -import { describe, it, expectTypeOf } from 'vitest'; +import { describe, it, expect, expectTypeOf } from 'vitest'; +import { DeliverySchema, DeliveryItemSchema, ServerEventSchema } from '../index.js'; import type { Workspace, CreateWorkspaceRequest, @@ -302,4 +303,59 @@ describe('Type definitions', () => { } handleEvent({ type: 'pong' }); }); + + // ============================================ + // Durable delivery + // ============================================ + it('DeliverySchema validates a delivery record', () => { + const parsed = DeliverySchema.safeParse({ + id: 'del_1', + message_id: 'm_1', + channel_id: 'c_1', + agent_id: 'a_1', + status: 'accepted', + mode: 'immediate', + reason: 'mention', + priority: 'normal', + retryable: null, + error: null, + available_at: null, + deadline: null, + created_at: '2026-06-01T00:00:00.000Z', + updated_at: null, + }); + expect(parsed.success).toBe(true); + }); + + it('DeliverySchema rejects an unknown status', () => { + const parsed = DeliverySchema.safeParse({ + id: 'del_1', message_id: 'm_1', channel_id: 'c_1', agent_id: 'a_1', + status: 'bogus', mode: 'immediate', reason: null, priority: 'normal', + retryable: null, error: null, available_at: null, deadline: null, + created_at: '2026-06-01T00:00:00.000Z', updated_at: null, + }); + expect(parsed.success).toBe(false); + }); + + it('DeliveryItemSchema carries an optional message payload', () => { + const parsed = DeliveryItemSchema.safeParse({ + id: 'del_1', message_id: 'm_1', channel_id: 'c_1', agent_id: 'a_1', + status: 'deferred', mode: 'immediate', reason: null, priority: 'normal', + retryable: null, error: null, available_at: '2026-06-01T00:01:00.000Z', + deadline: null, created_at: '2026-06-01T00:00:00.000Z', updated_at: null, + message: null, + }); + expect(parsed.success).toBe(true); + }); + + it('ServerEventSchema parses durable delivery events', () => { + for (const event of [ + { type: 'delivery.accepted', delivery_id: 'del_1', message_id: 'm_1', channel_id: 'c_1', reason: 'message' }, + { type: 'delivery.delivered', delivery_id: 'del_1', message_id: 'm_1' }, + { type: 'delivery.deferred', delivery_id: 'del_1', message_id: 'm_1', available_at: '2026-06-01T00:01:00.000Z' }, + { type: 'delivery.failed', delivery_id: 'del_1', message_id: 'm_1', error: 'boom', retryable: true }, + ]) { + expect(ServerEventSchema.safeParse(event).success).toBe(true); + } + }); }); diff --git a/packages/types/src/delivery.ts b/packages/types/src/delivery.ts new file mode 100644 index 00000000..ac9913c7 --- /dev/null +++ b/packages/types/src/delivery.ts @@ -0,0 +1,64 @@ +import { z } from 'zod'; + +// Durable delivery status lifecycle: +// accepted -> queued for the recipient, awaiting handling +// delivered -> recipient acked (terminal success) +// deferred -> recipient asked to retry no earlier than available_at +// failed -> recipient failed to handle; retryable indicates whether a retry is sane +export const DeliveryStatusSchema = z.enum(['accepted', 'delivered', 'deferred', 'failed']); +export type DeliveryStatus = z.infer; + +// The message payload carried alongside a queued delivery item so an offline +// consumer can recover what it missed without a second round-trip. +export const DeliveryMessageSchema = z.object({ + id: z.string(), + channel_id: z.string(), + agent_id: z.string().nullable(), + agent_name: z.string().nullable(), + text: z.string(), + thread_id: z.string().nullable(), + created_at: z.string(), +}); +export type DeliveryMessage = z.infer; + +export const DeliverySchema = z.object({ + id: z.string(), + message_id: z.string(), + channel_id: z.string(), + agent_id: z.string(), + status: DeliveryStatusSchema, + mode: z.string(), + reason: z.string().nullable(), + priority: z.string(), + retryable: z.boolean().nullable(), + error: z.string().nullable(), + available_at: z.string().nullable(), + deadline: z.string().nullable(), + created_at: z.string(), + updated_at: z.string().nullable(), +}); +export type Delivery = z.infer; + +// A queued delivery item: the delivery record plus the message it carries. +export const DeliveryItemSchema = DeliverySchema.extend({ + message: DeliveryMessageSchema.nullable(), +}); +export type DeliveryItem = z.infer; + +export const ListDeliveriesQuerySchema = z.object({ + status: DeliveryStatusSchema.optional(), + limit: z.number().int().positive().max(200).optional(), +}); +export type ListDeliveriesQuery = z.infer; + +export const FailDeliveryRequestSchema = z.object({ + error: z.string().optional(), + retryable: z.boolean().optional(), +}); +export type FailDeliveryRequest = z.infer; + +export const DeferDeliveryRequestSchema = z.object({ + available_at: z.string().datetime(), + reason: z.string().optional(), +}); +export type DeferDeliveryRequest = z.infer; diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index 59183f81..da54300d 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -236,6 +236,42 @@ export const ActionFailedEventSchema = z.object({ }); export type ActionFailedEvent = z.infer; +// Durable delivery lifecycle events. Delivered to the recipient agent so an +// offline consumer can reconcile queued delivery state on reconnect. +export const DeliveryAcceptedEventSchema = z.object({ + type: z.literal('delivery.accepted'), + delivery_id: z.string(), + message_id: z.string(), + channel_id: z.string().nullable().optional(), + reason: z.string().nullable().optional(), +}); +export type DeliveryAcceptedEvent = z.infer; + +export const DeliveryDeliveredEventSchema = z.object({ + type: z.literal('delivery.delivered'), + delivery_id: z.string(), + message_id: z.string(), +}); +export type DeliveryDeliveredEvent = z.infer; + +export const DeliveryDeferredEventSchema = z.object({ + type: z.literal('delivery.deferred'), + delivery_id: z.string(), + message_id: z.string(), + available_at: z.string().nullable(), + reason: z.string().nullable().optional(), +}); +export type DeliveryDeferredEvent = z.infer; + +export const DeliveryFailedEventSchema = z.object({ + type: z.literal('delivery.failed'), + delivery_id: z.string(), + message_id: z.string(), + error: z.string().nullable().optional(), + retryable: z.boolean().nullable().optional(), +}); +export type DeliveryFailedEvent = z.infer; + // WebSocket client events (emitted by WsClient, not from server) export const WsOpenEventSchema = z.object({ type: z.literal('open'), @@ -290,6 +326,10 @@ export const ServerEventSchema = z.discriminatedUnion('type', [ ActionInvokedEventSchema, ActionCompletedEventSchema, ActionFailedEventSchema, + DeliveryAcceptedEventSchema, + DeliveryDeliveredEventSchema, + DeliveryDeferredEventSchema, + DeliveryFailedEventSchema, PongEventSchema, ]); @@ -332,6 +372,10 @@ export const WsClientEventSchema = z.discriminatedUnion('type', [ ActionInvokedEventSchema, ActionCompletedEventSchema, ActionFailedEventSchema, + DeliveryAcceptedEventSchema, + DeliveryDeliveredEventSchema, + DeliveryDeferredEventSchema, + DeliveryFailedEventSchema, PongEventSchema, // Client-only events WsOpenEventSchema, diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 875e1a07..9ceadba5 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -5,6 +5,7 @@ export * from './message.js'; export * from './reaction.js'; export * from './file.js'; export * from './receipt.js'; +export * from './delivery.js'; export * from './dm.js'; export * from './events.js'; export * from './api.js'; From 5ae080d2cccf09aa77fe2666f172454521f11601 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 15:55:07 +0000 Subject: [PATCH 02/15] fix(engine): populate channel_id on delivery transitions and use .returning() Address review feedback on the durable delivery engine: - getOwnedDelivery now joins messages so ack/fail/defer responses return the delivery's channel_id instead of an empty string (matching the Delivery schema). - Use Drizzle .returning() on the update queries to read back the transitioned row in a single round-trip, removing the reloadDelivery helper. - ackDelivery short-circuits when already delivered. - Tests assert channel_id is populated on ack/fail/defer responses. --- .../__tests__/conformance/delivery.test.ts | 8 ++- packages/engine/src/engine/delivery.ts | 49 ++++++++++--------- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 9c94e1d3..74c3d703 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -85,7 +85,11 @@ describe('durable delivery api', () => { headers: { authorization: `Bearer ${bob.token}` }, }); expect(ack1.status).toBe(200); - expect(((await ack1.json()) as { data: { status: string } }).data.status).toBe('delivered'); + const ack1Data = ((await ack1.json()) as { data: { status: string; channel_id: string } }).data; + expect(ack1Data.status).toBe('delivered'); + // channel_id is populated on the transition response, matching the queued item. + expect(ack1Data.channel_id).toBe(item.channel_id); + expect(ack1Data.channel_id).not.toBe(''); // Idempotent: second ack still 200 + delivered. const ack2 = await stack.app.request(`/v1/deliveries/${deliveryId}/ack`, { @@ -115,6 +119,7 @@ describe('durable delivery api', () => { expect(data.status).toBe('failed'); expect(data.error).toBe('handler threw'); expect(data.retryable).toBe(true); + expect(data.channel_id).toBe(item.channel_id); }); it('defers a delivery with available_at and keeps it in the queue', async () => { @@ -133,6 +138,7 @@ describe('durable delivery api', () => { expect(data.status).toBe('deferred'); expect(data.available_at).toBe(availableAt); expect(data.reason).toBe('busy'); + expect(data.channel_id).toBe(item.channel_id); // Deferred items remain in the default queue for later retry. const queued = await listDeliveries(bob.token); diff --git a/packages/engine/src/engine/delivery.ts b/packages/engine/src/engine/delivery.ts index f1f094a6..8f24aee8 100644 --- a/packages/engine/src/engine/delivery.ts +++ b/packages/engine/src/engine/delivery.ts @@ -11,11 +11,11 @@ function toIso(value: Date | null | undefined): string | null { return value ? value.toISOString() : null; } -function serializeDelivery(row: DeliveryRow) { +function serializeDelivery(row: DeliveryRow & { channelId?: string }) { return { id: row.id, message_id: row.messageId, - channel_id: '', // filled in by callers that have the message row + channel_id: row.channelId ?? '', agent_id: row.agentId, status: row.status as DeliveryStatus, mode: row.mode, @@ -98,16 +98,20 @@ export async function listDeliveries( }); } -/** Fetch a single delivery owned by the agent, or null. */ +/** + * Fetch a single delivery owned by the agent, joined to its message so the + * caller has the `channelId` for serialization. Returns null if not found. + */ async function getOwnedDelivery( db: Db, workspaceId: string, agentId: string, deliveryId: string, -): Promise { +): Promise<(DeliveryRow & { channelId: string }) | null> { const [row] = await db - .select() + .select({ delivery: deliveries, channelId: messages.channelId }) .from(deliveries) + .innerJoin(messages, eq(deliveries.messageId, messages.id)) .where( and( eq(deliveries.id, deliveryId), @@ -115,12 +119,7 @@ async function getOwnedDelivery( eq(deliveries.agentId, agentId), ), ); - return row ?? null; -} - -async function reloadDelivery(db: Db, deliveryId: string): Promise { - const [row] = await db.select().from(deliveries).where(eq(deliveries.id, deliveryId)); - return row; + return row ? { ...row.delivery, channelId: row.channelId } : null; } /** @@ -135,14 +134,14 @@ export async function ackDelivery( ) { const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; + if (existing.status === 'delivered') return serializeDelivery(existing); - if (existing.status !== 'delivered') { - await db - .update(deliveries) - .set({ status: 'delivered', updatedAt: new Date() }) - .where(eq(deliveries.id, deliveryId)); - } - return serializeDelivery(await reloadDelivery(db, deliveryId)); + const [updated] = await db + .update(deliveries) + .set({ status: 'delivered', updatedAt: new Date() }) + .where(eq(deliveries.id, deliveryId)) + .returning(); + return serializeDelivery({ ...updated, channelId: existing.channelId }); } /** @@ -159,7 +158,7 @@ export async function failDelivery( const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; - await db + const [updated] = await db .update(deliveries) .set({ status: 'failed', @@ -167,8 +166,9 @@ export async function failDelivery( retryable: opts.retryable ?? null, updatedAt: new Date(), }) - .where(eq(deliveries.id, deliveryId)); - return serializeDelivery(await reloadDelivery(db, deliveryId)); + .where(eq(deliveries.id, deliveryId)) + .returning(); + return serializeDelivery({ ...updated, channelId: existing.channelId }); } /** @@ -185,7 +185,7 @@ export async function deferDelivery( const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; - await db + const [updated] = await db .update(deliveries) .set({ status: 'deferred', @@ -193,6 +193,7 @@ export async function deferDelivery( reason: opts.reason ?? existing.reason, updatedAt: new Date(), }) - .where(eq(deliveries.id, deliveryId)); - return serializeDelivery(await reloadDelivery(db, deliveryId)); + .where(eq(deliveries.id, deliveryId)) + .returning(); + return serializeDelivery({ ...updated, channelId: existing.channelId }); } From 9466059e6fd4113fee073db895fadbdc59e7a03e Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 15:57:01 +0000 Subject: [PATCH 03/15] fix(engine): treat delivered as terminal in fail/defer transitions A late fail/defer could overwrite an already-acked (delivered) delivery, resurrecting it into the default queue. Guard the transitions so delivered is terminal: short-circuit when already delivered, and add `status != 'delivered'` to the update WHERE clause to close the read->write race against a concurrent ack (re-reading current state when the guard rejects the write). --- .../__tests__/conformance/delivery.test.ts | 32 ++++++++++++++++ packages/engine/src/engine/delivery.ts | 38 +++++++++++++++---- 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 74c3d703..42dcfec0 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -146,6 +146,38 @@ describe('durable delivery api', () => { expect(queued[0].status).toBe('deferred'); }); + it('does not resurrect a delivered (terminal) delivery via defer or fail', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const ack = await stack.app.request(`/v1/deliveries/${item.id}/ack`, { + method: 'POST', + headers: { authorization: `Bearer ${bob.token}` }, + }); + expect(ack.status).toBe(200); + + // A late defer must not move a delivered record back into the queue. + const defer = await stack.app.request(`/v1/deliveries/${item.id}/defer`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ available_at: new Date(Date.now() + 60_000).toISOString() }), + }); + expect(defer.status).toBe(200); + expect(((await defer.json()) as { data: { status: string } }).data.status).toBe('delivered'); + + // Same for a late fail. + const fail = await stack.app.request(`/v1/deliveries/${item.id}/fail`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ error: 'too late' }), + }); + expect(fail.status).toBe(200); + expect(((await fail.json()) as { data: { status: string } }).data.status).toBe('delivered'); + + // Stays out of the default queue. + expect(await listDeliveries(bob.token)).toHaveLength(0); + }); + it('rejects an invalid defer payload', async () => { const { bob } = await seed(); const [item] = await listDeliveries(bob.token); diff --git a/packages/engine/src/engine/delivery.ts b/packages/engine/src/engine/delivery.ts index 8f24aee8..4583e77e 100644 --- a/packages/engine/src/engine/delivery.ts +++ b/packages/engine/src/engine/delivery.ts @@ -1,4 +1,4 @@ -import { eq, and, asc, inArray } from 'drizzle-orm'; +import { eq, ne, and, asc, inArray } from 'drizzle-orm'; import type { getDb } from '../db/index.js'; import { deliveries, messages, agents } from '../db/schema.js'; import type { DeliveryStatus } from '@relaycast/types'; @@ -146,7 +146,9 @@ export async function ackDelivery( /** * Idempotently record a delivery as `failed`, capturing error text and - * retryability. Returns null if the delivery is not found / not owned. + * retryability. `delivered` is terminal, so a fail never resurrects an + * already-acked delivery (the WHERE guard also closes the read→write race + * against a concurrent ack). Returns null if not found / not owned. */ export async function failDelivery( db: Db, @@ -157,6 +159,7 @@ export async function failDelivery( ) { const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; + if (existing.status === 'delivered') return serializeDelivery(existing); const [updated] = await db .update(deliveries) @@ -166,14 +169,16 @@ export async function failDelivery( retryable: opts.retryable ?? null, updatedAt: new Date(), }) - .where(eq(deliveries.id, deliveryId)) + .where(and(eq(deliveries.id, deliveryId), ne(deliveries.status, 'delivered'))) .returning(); - return serializeDelivery({ ...updated, channelId: existing.channelId }); + return resolveTransition(db, workspaceId, agentId, deliveryId, updated, existing.channelId); } /** * Idempotently record a delivery as `deferred` with the time it next becomes - * available. Returns null if the delivery is not found / not owned. + * available. `delivered` is terminal, so a defer never resurrects an + * already-acked delivery (the WHERE guard also closes the read→write race + * against a concurrent ack). Returns null if not found / not owned. */ export async function deferDelivery( db: Db, @@ -184,6 +189,7 @@ export async function deferDelivery( ) { const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; + if (existing.status === 'delivered') return serializeDelivery(existing); const [updated] = await db .update(deliveries) @@ -193,7 +199,25 @@ export async function deferDelivery( reason: opts.reason ?? existing.reason, updatedAt: new Date(), }) - .where(eq(deliveries.id, deliveryId)) + .where(and(eq(deliveries.id, deliveryId), ne(deliveries.status, 'delivered'))) .returning(); - return serializeDelivery({ ...updated, channelId: existing.channelId }); + return resolveTransition(db, workspaceId, agentId, deliveryId, updated, existing.channelId); +} + +/** + * Resolve the result of a status-guarded transition: use the updated row when + * the write landed, otherwise (a concurrent ack won the race and the row is now + * terminal) re-read and return the current state without resurrecting it. + */ +async function resolveTransition( + db: Db, + workspaceId: string, + agentId: string, + deliveryId: string, + updated: DeliveryRow | undefined, + channelId: string, +) { + if (updated) return serializeDelivery({ ...updated, channelId }); + const current = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); + return current ? serializeDelivery(current) : null; } From be44755a0c295e8e1f4192ac8e2b811e7c6bfb0c Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:01:47 +0000 Subject: [PATCH 04/15] refactor: address review nitpicks on durable delivery - Reuse the shared ListDeliveriesQuerySchema in the engine route (now coercing on limit) instead of a local duplicate, avoiding schema drift. - Modernize DeferDeliveryRequestSchema to Zod 4's z.iso.datetime(). - Add a realtime-first SDK example (live delivery events + reconnect replay) to the durable delivery README section. The terminal-state guard for fail/defer flagged by reviewers was already added in an earlier commit. --- README.md | 21 +++++++++++++++++++++ packages/engine/src/routes/delivery.ts | 10 ++-------- packages/types/src/delivery.ts | 5 +++-- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 684171ec..3d23e14b 100644 --- a/README.md +++ b/README.md @@ -323,6 +323,27 @@ thread reply, and emits `delivery.accepted`, `delivery.delivered`, `delivery.def `delivery.failed` events to the recipient. Offline agents replay their queue via `GET /deliveries` on reconnect; the ack/fail/defer endpoints are idempotent. +Realtime-first usage with the TypeScript SDK — react to delivery events live, and replay the +durable queue on reconnect instead of polling: + +```typescript +// React to durable delivery state as it changes. +agent.on.deliveryAccepted((e) => console.log(`queued ${e.deliveryId} for ${e.messageId}`)); +agent.on.deliveryDelivered((e) => console.log(`acked ${e.deliveryId}`)); + +// On (re)connect, drain anything queued while offline, then ack each item. +agent.on.connected(async () => { + for (const item of await agent.deliveries({ status: 'accepted' })) { + try { + await handle(item.message); // your handler + await agent.ackDelivery(item.id); // -> delivered + } catch (err) { + await agent.failDelivery(item.id, { error: String(err), retryable: true }); + } + } +}); +``` + A2A (Agent-to-Agent) gateway endpoints: ```text diff --git a/packages/engine/src/routes/delivery.ts b/packages/engine/src/routes/delivery.ts index be8292bb..0ee8b748 100644 --- a/packages/engine/src/routes/delivery.ts +++ b/packages/engine/src/routes/delivery.ts @@ -1,5 +1,4 @@ import { Hono } from 'hono'; -import { z } from 'zod'; import type { AppEnv } from '../env.js'; import { requireAgentToken } from '../middleware/auth.js'; import { rateLimit } from '../middleware/rateLimit.js'; @@ -7,15 +6,10 @@ import * as deliveryEngine from '../engine/delivery.js'; import { fanoutToAgents } from './fanout.js'; import { runInBackground } from './background.js'; import { errorResponse } from '../lib/httpError.js'; -import { DeliveryStatusSchema, FailDeliveryRequestSchema, DeferDeliveryRequestSchema } from '@relaycast/types'; +import { ListDeliveriesQuerySchema, FailDeliveryRequestSchema, DeferDeliveryRequestSchema } from '@relaycast/types'; export const deliveryRoutes = new Hono(); -const listQuerySchema = z.object({ - status: DeliveryStatusSchema.optional(), - limit: z.coerce.number().int().positive().max(200).optional(), -}); - // GET /v1/deliveries - durable delivery queue for the calling agent. // Defaults to non-terminal items (accepted + deferred) so offline consumers // can replay what they missed after reconnect. @@ -25,7 +19,7 @@ deliveryRoutes.get( rateLimit, async (c) => { try { - const parsed = listQuerySchema.safeParse({ + const parsed = ListDeliveriesQuerySchema.safeParse({ status: c.req.query('status'), limit: c.req.query('limit'), }); diff --git a/packages/types/src/delivery.ts b/packages/types/src/delivery.ts index ac9913c7..d63ba7b5 100644 --- a/packages/types/src/delivery.ts +++ b/packages/types/src/delivery.ts @@ -47,7 +47,8 @@ export type DeliveryItem = z.infer; export const ListDeliveriesQuerySchema = z.object({ status: DeliveryStatusSchema.optional(), - limit: z.number().int().positive().max(200).optional(), + // Coerce so the same schema validates both typed callers and raw HTTP query strings. + limit: z.coerce.number().int().positive().max(200).optional(), }); export type ListDeliveriesQuery = z.infer; @@ -58,7 +59,7 @@ export const FailDeliveryRequestSchema = z.object({ export type FailDeliveryRequest = z.infer; export const DeferDeliveryRequestSchema = z.object({ - available_at: z.string().datetime(), + available_at: z.iso.datetime(), reason: z.string().optional(), }); export type DeferDeliveryRequest = z.infer; From 079f0cf1dffc5aa4316dd3cc057805b3a7ad03ae Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:02:45 +0000 Subject: [PATCH 05/15] test(sdk): cover durable delivery methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add SDK tests for deliveries(), ackDelivery(), failDelivery(), and deferDelivery() — asserting URL construction, delivery-id URL encoding, query params, response camelization, and the availableAt -> available_at body decamelization. --- .../src/__tests__/deliveries.test.ts | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 packages/sdk-typescript/src/__tests__/deliveries.test.ts diff --git a/packages/sdk-typescript/src/__tests__/deliveries.test.ts b/packages/sdk-typescript/src/__tests__/deliveries.test.ts new file mode 100644 index 00000000..0278baca --- /dev/null +++ b/packages/sdk-typescript/src/__tests__/deliveries.test.ts @@ -0,0 +1,115 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { AgentClient } from '../agent.js'; +import { HttpClient } from '../client.js'; + +const mockFetch = vi.fn(); +vi.stubGlobal('fetch', mockFetch); + +function mockResponse(data: unknown, apiOk = true, status = 200) { + return Promise.resolve({ + ok: true, + status, + json: () => Promise.resolve(apiOk ? { ok: true, data } : { ok: false, error: data }), + }); +} + +describe('AgentClient durable delivery', () => { + let me: AgentClient; + + beforeEach(() => { + mockFetch.mockReset(); + me = new AgentClient(new HttpClient({ apiKey: 'at_live_test123' })); + }); + + describe('deliveries()', () => { + it('GETs /v1/deliveries with no query by default', async () => { + mockFetch.mockImplementation(() => mockResponse([])); + + await me.deliveries(); + + const [url, init] = mockFetch.mock.calls[0]!; + expect(url).toBe('https://gateway.relaycast.dev/v1/deliveries'); + expect(init.method).toBe('GET'); + }); + + it('passes status and limit as query params', async () => { + mockFetch.mockImplementation(() => mockResponse([])); + + await me.deliveries({ status: 'accepted', limit: 25 }); + + const [url] = mockFetch.mock.calls[0]!; + const parsed = new URL(url as string); + expect(parsed.pathname).toBe('/v1/deliveries'); + expect(parsed.searchParams.get('status')).toBe('accepted'); + expect(parsed.searchParams.get('limit')).toBe('25'); + }); + + it('camelizes the delivery items in the response', async () => { + mockFetch.mockImplementation(() => mockResponse([ + { id: 'del_1', message_id: 'm_1', channel_id: 'c_1', available_at: null, message: { agent_name: 'bob' } }, + ])); + + const items = await me.deliveries(); + + expect(items[0]).toMatchObject({ id: 'del_1', messageId: 'm_1', channelId: 'c_1', availableAt: null }); + expect(items[0]!.message).toMatchObject({ agentName: 'bob' }); + }); + }); + + describe('ackDelivery()', () => { + it('POSTs to /v1/deliveries/:id/ack', async () => { + mockFetch.mockImplementation(() => mockResponse({ id: 'del_1', status: 'delivered' })); + + await me.ackDelivery('del_1'); + + const [url, init] = mockFetch.mock.calls[0]!; + expect(url).toBe('https://gateway.relaycast.dev/v1/deliveries/del_1/ack'); + expect(init.method).toBe('POST'); + }); + + it('URL-encodes the delivery id', async () => { + mockFetch.mockImplementation(() => mockResponse({ id: 'del/1' })); + + await me.ackDelivery('del/1'); + + const [url] = mockFetch.mock.calls[0]!; + expect(url).toBe('https://gateway.relaycast.dev/v1/deliveries/del%2F1/ack'); + }); + }); + + describe('failDelivery()', () => { + it('POSTs error and retryable to /v1/deliveries/:id/fail', async () => { + mockFetch.mockImplementation(() => mockResponse({ id: 'del_1', status: 'failed' })); + + await me.failDelivery('del_1', { error: 'boom', retryable: true }); + + const [url, init] = mockFetch.mock.calls[0]!; + expect(url).toBe('https://gateway.relaycast.dev/v1/deliveries/del_1/fail'); + expect(init.method).toBe('POST'); + expect(JSON.parse(init.body)).toEqual({ error: 'boom', retryable: true }); + }); + + it('sends an empty body when no options are given', async () => { + mockFetch.mockImplementation(() => mockResponse({ id: 'del_1', status: 'failed' })); + + await me.failDelivery('del_1'); + + const [, init] = mockFetch.mock.calls[0]!; + expect(JSON.parse(init.body)).toEqual({}); + }); + }); + + describe('deferDelivery()', () => { + it('decamelizes availableAt -> available_at in the body', async () => { + mockFetch.mockImplementation(() => mockResponse({ id: 'del_1', status: 'deferred' })); + const availableAt = '2026-06-01T00:01:00.000Z'; + + await me.deferDelivery('del_1', { availableAt, reason: 'busy' }); + + const [url, init] = mockFetch.mock.calls[0]!; + expect(url).toBe('https://gateway.relaycast.dev/v1/deliveries/del_1/defer'); + expect(init.method).toBe('POST'); + expect(JSON.parse(init.body)).toEqual({ available_at: availableAt, reason: 'busy' }); + }); + }); +}); From 2f423e54f72ced92290a17e70eeae8efee1bb33c Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:03:45 +0000 Subject: [PATCH 06/15] fix(engine): handle no-row race in ackDelivery via resolveTransition If the delivery is deleted between the ownership read and the UPDATE, RETURNING yields no row and serializeDelivery({ ...undefined }) produced a malformed object. Route ackDelivery through the same resolveTransition helper as fail/defer so it re-reads and returns the current state (or null) instead. --- packages/engine/src/engine/delivery.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/engine/src/engine/delivery.ts b/packages/engine/src/engine/delivery.ts index 4583e77e..5d28954c 100644 --- a/packages/engine/src/engine/delivery.ts +++ b/packages/engine/src/engine/delivery.ts @@ -141,7 +141,7 @@ export async function ackDelivery( .set({ status: 'delivered', updatedAt: new Date() }) .where(eq(deliveries.id, deliveryId)) .returning(); - return serializeDelivery({ ...updated, channelId: existing.channelId }); + return resolveTransition(db, workspaceId, agentId, deliveryId, updated, existing.channelId); } /** From 288d9e4811cd5b460eb5bfc0a5b6a9ed84a5cb15 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:08:21 +0000 Subject: [PATCH 07/15] fix(engine): idempotent delivery transitions without duplicate events Address review findings on durable delivery idempotency: - Transitions now report whether they actually changed state; the routes fan out delivery.* events only on a real change, so idempotent retries no longer emit duplicate notifications. - failDelivery treats `failed` as settled: repeated calls are no-ops that preserve the original error/retryable and don't churn updatedAt. - deferDelivery is a no-op when re-deferred to the same available_at/reason. - markRead now clears both accepted and deferred deliveries, so a deferred delivery doesn't linger in the replay queue once the message is read. Adds conformance coverage for single-emit ack, fail-metadata preservation, and read-clears-deferred. --- .../__tests__/conformance/delivery.test.ts | 59 ++++++++++++++-- packages/engine/src/engine/delivery.ts | 65 +++++++++++------ packages/engine/src/engine/receipt.ts | 9 ++- packages/engine/src/routes/delivery.ts | 70 +++++++++++-------- 4 files changed, 144 insertions(+), 59 deletions(-) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 42dcfec0..07691123 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -198,19 +198,70 @@ describe('durable delivery api', () => { expect(res.status).toBe(404); }); - it('emits delivery.delivered to the recipient on ack', async () => { + it('emits delivery.delivered once on ack, not on idempotent retries', async () => { const { ws, bob } = await seed(); const [item] = await listDeliveries(bob.token); const bobSock = new FakeSocket(); stack.runtime.realtime.attachAgentSocket(ws.workspaceId, bob.agentId, bobSock); - const res = await stack.app.request(`/v1/deliveries/${item.id}/ack`, { + const ackOnce = () => stack.app.request(`/v1/deliveries/${item.id}/ack`, { method: 'POST', headers: { authorization: `Bearer ${bob.token}` }, }); - expect(res.status).toBe(200); + + expect((await ackOnce()).status).toBe(200); + // A second ack is a no-op and must not re-emit the event. + expect((await ackOnce()).status).toBe(200); await new Promise((r) => setTimeout(r, 50)); - expect(bobSock.ofType('delivery.delivered').length).toBeGreaterThanOrEqual(1); + expect(bobSock.ofType('delivery.delivered')).toHaveLength(1); + }); + + it('preserves failure metadata across repeated fail calls (idempotent)', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const first = await stack.app.request(`/v1/deliveries/${item.id}/fail`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ error: 'boom', retryable: true }), + }); + expect(first.status).toBe(200); + + // A second fail (even with an empty body) must not null out the recorded + // error/retryable — the first failure is preserved. + const second = await stack.app.request(`/v1/deliveries/${item.id}/fail`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({}), + }); + expect(second.status).toBe(200); + const data = ((await second.json()) as { data: Record }).data; + expect(data.status).toBe('failed'); + expect(data.error).toBe('boom'); + expect(data.retryable).toBe(true); + }); + + it('clears a deferred delivery from the replay queue when the message is read', async () => { + const { bob, messageId } = await seed(); + const [item] = await listDeliveries(bob.token); + + const defer = await stack.app.request(`/v1/deliveries/${item.id}/defer`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ available_at: new Date(Date.now() + 60_000).toISOString() }), + }); + expect(defer.status).toBe(200); + expect(await listDeliveries(bob.token)).toHaveLength(1); + + const read = await stack.app.request(`/v1/messages/${messageId}/read`, { + method: 'POST', + headers: { authorization: `Bearer ${bob.token}` }, + }); + expect(read.status).toBe(200); + + // The deferred item is now delivered and out of the default queue. + expect(await listDeliveries(bob.token)).toHaveLength(0); + expect(await listDeliveries(bob.token, '?status=delivered')).toHaveLength(1); }); }); diff --git a/packages/engine/src/engine/delivery.ts b/packages/engine/src/engine/delivery.ts index 5d28954c..c6e0ab22 100644 --- a/packages/engine/src/engine/delivery.ts +++ b/packages/engine/src/engine/delivery.ts @@ -122,19 +122,29 @@ async function getOwnedDelivery( return row ? { ...row.delivery, channelId: row.channelId } : null; } +// The outcome of a transition: the (possibly unchanged) delivery plus whether +// this call actually mutated state. Callers fan out lifecycle events only when +// `changed` is true so idempotent retries don't emit duplicate notifications. +export type TransitionResult = { delivery: ReturnType; changed: boolean }; + +function unixSeconds(value: Date | null | undefined): number | null { + return value ? Math.floor(value.getTime() / 1000) : null; +} + /** - * Idempotently transition a delivery to `delivered`. Repeated calls return the - * same delivered record. Returns null if the delivery is not found / not owned. + * Idempotently transition a delivery to `delivered`. `delivered` is terminal, + * so repeated acks are no-ops (reported as `changed: false`). Returns null if + * the delivery is not found / not owned. */ export async function ackDelivery( db: Db, workspaceId: string, agentId: string, deliveryId: string, -) { +): Promise { const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; - if (existing.status === 'delivered') return serializeDelivery(existing); + if (existing.status === 'delivered') return { delivery: serializeDelivery(existing), changed: false }; const [updated] = await db .update(deliveries) @@ -146,9 +156,11 @@ export async function ackDelivery( /** * Idempotently record a delivery as `failed`, capturing error text and - * retryability. `delivered` is terminal, so a fail never resurrects an - * already-acked delivery (the WHERE guard also closes the read→write race - * against a concurrent ack). Returns null if not found / not owned. + * retryability. Both `delivered` and `failed` are treated as settled: once a + * delivery has failed, repeated calls are no-ops that preserve the original + * failure metadata (no `null` overwrite, no `updatedAt` churn, no duplicate + * event). The WHERE guard also closes the read→write race against a concurrent + * ack. Returns null if not found / not owned. */ export async function failDelivery( db: Db, @@ -156,10 +168,12 @@ export async function failDelivery( agentId: string, deliveryId: string, opts: { error?: string; retryable?: boolean } = {}, -) { +): Promise { const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; - if (existing.status === 'delivered') return serializeDelivery(existing); + if (existing.status === 'delivered' || existing.status === 'failed') { + return { delivery: serializeDelivery(existing), changed: false }; + } const [updated] = await db .update(deliveries) @@ -176,9 +190,11 @@ export async function failDelivery( /** * Idempotently record a delivery as `deferred` with the time it next becomes - * available. `delivered` is terminal, so a defer never resurrects an - * already-acked delivery (the WHERE guard also closes the read→write race - * against a concurrent ack). Returns null if not found / not owned. + * available. A re-defer to the same `available_at`/reason is a no-op (reported + * as `changed: false`); deferring to a new time is a real change. `delivered` + * is terminal, so a defer never resurrects an already-acked delivery (the WHERE + * guard also closes the read→write race against a concurrent ack). Returns null + * if not found / not owned. */ export async function deferDelivery( db: Db, @@ -186,17 +202,23 @@ export async function deferDelivery( agentId: string, deliveryId: string, opts: { availableAt: Date; reason?: string }, -) { +): Promise { const existing = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); if (!existing) return null; - if (existing.status === 'delivered') return serializeDelivery(existing); + if (existing.status === 'delivered') return { delivery: serializeDelivery(existing), changed: false }; + + const targetReason = opts.reason ?? existing.reason; + const unchanged = existing.status === 'deferred' + && unixSeconds(existing.availableAt) === unixSeconds(opts.availableAt) + && existing.reason === targetReason; + if (unchanged) return { delivery: serializeDelivery(existing), changed: false }; const [updated] = await db .update(deliveries) .set({ status: 'deferred', availableAt: opts.availableAt, - reason: opts.reason ?? existing.reason, + reason: targetReason, updatedAt: new Date(), }) .where(and(eq(deliveries.id, deliveryId), ne(deliveries.status, 'delivered'))) @@ -205,9 +227,10 @@ export async function deferDelivery( } /** - * Resolve the result of a status-guarded transition: use the updated row when - * the write landed, otherwise (a concurrent ack won the race and the row is now - * terminal) re-read and return the current state without resurrecting it. + * Resolve the result of a status-guarded transition: when the write landed, + * report the updated row as changed. When it did not (the row was deleted, or a + * concurrent ack won the race and the row is now terminal), re-read and return + * the current state as unchanged — never resurrecting it or emitting an event. */ async function resolveTransition( db: Db, @@ -216,8 +239,8 @@ async function resolveTransition( deliveryId: string, updated: DeliveryRow | undefined, channelId: string, -) { - if (updated) return serializeDelivery({ ...updated, channelId }); +): Promise { + if (updated) return { delivery: serializeDelivery({ ...updated, channelId }), changed: true }; const current = await getOwnedDelivery(db, workspaceId, agentId, deliveryId); - return current ? serializeDelivery(current) : null; + return current ? { delivery: serializeDelivery(current), changed: false } : null; } diff --git a/packages/engine/src/engine/receipt.ts b/packages/engine/src/engine/receipt.ts index 57e7d93f..b8c9182a 100644 --- a/packages/engine/src/engine/receipt.ts +++ b/packages/engine/src/engine/receipt.ts @@ -1,4 +1,4 @@ -import { eq, and, sql, isNull } from 'drizzle-orm'; +import { eq, and, sql, isNull, inArray } from 'drizzle-orm'; import type { getDb } from '../db/index.js'; import { readReceipts, @@ -55,7 +55,10 @@ export async function markRead( .values({ messageId, agentId }) .onConflictDoNothing(); - // Transition delivery status: accepted → delivered + // Transition delivery status to delivered. Reading consumes the message, so + // clear both still-queued states (accepted and deferred) — otherwise a + // deferred delivery would linger in the durable replay queue after the agent + // has already seen the message. `delivered`/`failed` are left untouched. await db .update(deliveries) .set({ status: 'delivered', updatedAt: new Date() }) @@ -63,7 +66,7 @@ export async function markRead( and( eq(deliveries.messageId, messageId), eq(deliveries.agentId, agentId), - eq(deliveries.status, 'accepted'), + inArray(deliveries.status, ['accepted', 'deferred']), ), ); diff --git a/packages/engine/src/routes/delivery.ts b/packages/engine/src/routes/delivery.ts index 0ee8b748..cf74d63c 100644 --- a/packages/engine/src/routes/delivery.ts +++ b/packages/engine/src/routes/delivery.ts @@ -58,15 +58,19 @@ deliveryRoutes.post( error: { code: 'delivery_not_found', message: 'Delivery not found' }, }, 404); } - runInBackground( - c, - fanoutToAgents(c, [agent!.id], 'delivery.delivered', { - delivery_id: result.id, - message_id: result.message_id, - }), - 'fanout delivery.delivered', - ); - return c.json({ ok: true, data: result }); + // Only fan out when this call actually transitioned the delivery, so + // idempotent retries don't emit duplicate notifications. + if (result.changed) { + runInBackground( + c, + fanoutToAgents(c, [agent!.id], 'delivery.delivered', { + delivery_id: result.delivery.id, + message_id: result.delivery.message_id, + }), + 'fanout delivery.delivered', + ); + } + return c.json({ ok: true, data: result.delivery }); } catch (err: unknown) { return errorResponse(c, err); } @@ -99,17 +103,19 @@ deliveryRoutes.post( error: { code: 'delivery_not_found', message: 'Delivery not found' }, }, 404); } - runInBackground( - c, - fanoutToAgents(c, [agent!.id], 'delivery.failed', { - delivery_id: result.id, - message_id: result.message_id, - error: result.error, - retryable: result.retryable, - }), - 'fanout delivery.failed', - ); - return c.json({ ok: true, data: result }); + if (result.changed) { + runInBackground( + c, + fanoutToAgents(c, [agent!.id], 'delivery.failed', { + delivery_id: result.delivery.id, + message_id: result.delivery.message_id, + error: result.delivery.error, + retryable: result.delivery.retryable, + }), + 'fanout delivery.failed', + ); + } + return c.json({ ok: true, data: result.delivery }); } catch (err: unknown) { return errorResponse(c, err); } @@ -145,17 +151,19 @@ deliveryRoutes.post( error: { code: 'delivery_not_found', message: 'Delivery not found' }, }, 404); } - runInBackground( - c, - fanoutToAgents(c, [agent!.id], 'delivery.deferred', { - delivery_id: result.id, - message_id: result.message_id, - available_at: result.available_at, - reason: result.reason, - }), - 'fanout delivery.deferred', - ); - return c.json({ ok: true, data: result }); + if (result.changed) { + runInBackground( + c, + fanoutToAgents(c, [agent!.id], 'delivery.deferred', { + delivery_id: result.delivery.id, + message_id: result.delivery.message_id, + available_at: result.delivery.available_at, + reason: result.delivery.reason, + }), + 'fanout delivery.deferred', + ); + } + return c.json({ ok: true, data: result.delivery }); } catch (err: unknown) { return errorResponse(c, err); } From 5fbe6d0feecb65fe50c71ed32a4caef930260482 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:15:43 +0000 Subject: [PATCH 08/15] fix(engine): make delivery transitions concurrency-safe via WHERE preconditions Move the "is this a real transition?" predicate into each UPDATE's WHERE clause and derive `changed` from whether RETURNING matched a row, so the database decides atomically instead of the application-level pre-read. SQLite serializes writes, so concurrent calls can no longer both report changed: - ack: WHERE status != 'delivered' - fail: WHERE status NOT IN ('delivered','failed') (failed is settled) - defer: WHERE not already deferred to the same (available_at, reason) This closes the read->write races flagged in review: duplicate delivered/failed/ deferred events under concurrent calls, and failure-metadata overwrite. Adds a test asserting a re-defer to the same time emits no second event. --- .../__tests__/conformance/delivery.test.ts | 21 ++++++++++ packages/engine/src/engine/delivery.ts | 41 +++++++++++++------ 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 07691123..6daaa38c 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -217,6 +217,27 @@ describe('durable delivery api', () => { expect(bobSock.ofType('delivery.delivered')).toHaveLength(1); }); + it('does not re-emit delivery.deferred when re-deferred to the same time', async () => { + const { ws, bob } = await seed(); + const [item] = await listDeliveries(bob.token); + const availableAt = new Date(Date.now() + 60_000).toISOString(); + + const bobSock = new FakeSocket(); + stack.runtime.realtime.attachAgentSocket(ws.workspaceId, bob.agentId, bobSock); + + const deferOnce = () => stack.app.request(`/v1/deliveries/${item.id}/defer`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ available_at: availableAt }), + }); + + expect((await deferOnce()).status).toBe(200); + // Re-deferring to the identical available_at is a no-op — no second event. + expect((await deferOnce()).status).toBe(200); + await new Promise((r) => setTimeout(r, 50)); + expect(bobSock.ofType('delivery.deferred')).toHaveLength(1); + }); + it('preserves failure metadata across repeated fail calls (idempotent)', async () => { const { bob } = await seed(); const [item] = await listDeliveries(bob.token); diff --git a/packages/engine/src/engine/delivery.ts b/packages/engine/src/engine/delivery.ts index c6e0ab22..dfd4e31e 100644 --- a/packages/engine/src/engine/delivery.ts +++ b/packages/engine/src/engine/delivery.ts @@ -1,4 +1,4 @@ -import { eq, ne, and, asc, inArray } from 'drizzle-orm'; +import { eq, ne, and, not, asc, isNull, inArray, notInArray } from 'drizzle-orm'; import type { getDb } from '../db/index.js'; import { deliveries, messages, agents } from '../db/schema.js'; import type { DeliveryStatus } from '@relaycast/types'; @@ -127,10 +127,6 @@ async function getOwnedDelivery( // `changed` is true so idempotent retries don't emit duplicate notifications. export type TransitionResult = { delivery: ReturnType; changed: boolean }; -function unixSeconds(value: Date | null | undefined): number | null { - return value ? Math.floor(value.getTime() / 1000) : null; -} - /** * Idempotently transition a delivery to `delivered`. `delivered` is terminal, * so repeated acks are no-ops (reported as `changed: false`). Returns null if @@ -146,10 +142,14 @@ export async function ackDelivery( if (!existing) return null; if (existing.status === 'delivered') return { delivery: serializeDelivery(existing), changed: false }; + // The `status != 'delivered'` predicate lives in the UPDATE so the DB decides + // atomically whether this call transitioned the row. Under concurrent acks + // only one update matches (SQLite serializes writes); the loser sees no row + // and reports `changed: false`, so the delivered event fires exactly once. const [updated] = await db .update(deliveries) .set({ status: 'delivered', updatedAt: new Date() }) - .where(eq(deliveries.id, deliveryId)) + .where(and(eq(deliveries.id, deliveryId), ne(deliveries.status, 'delivered'))) .returning(); return resolveTransition(db, workspaceId, agentId, deliveryId, updated, existing.channelId); } @@ -175,6 +175,10 @@ export async function failDelivery( return { delivery: serializeDelivery(existing), changed: false }; } + // Both `delivered` and `failed` are settled, so the UPDATE only matches a + // not-yet-settled row. Under concurrent fails the DB lets exactly one win; + // the loser matches no row, preserves the first failure's metadata, and + // reports `changed: false` (no duplicate event). const [updated] = await db .update(deliveries) .set({ @@ -183,7 +187,7 @@ export async function failDelivery( retryable: opts.retryable ?? null, updatedAt: new Date(), }) - .where(and(eq(deliveries.id, deliveryId), ne(deliveries.status, 'delivered'))) + .where(and(eq(deliveries.id, deliveryId), notInArray(deliveries.status, ['delivered', 'failed']))) .returning(); return resolveTransition(db, workspaceId, agentId, deliveryId, updated, existing.channelId); } @@ -208,11 +212,18 @@ export async function deferDelivery( if (existing.status === 'delivered') return { delivery: serializeDelivery(existing), changed: false }; const targetReason = opts.reason ?? existing.reason; - const unchanged = existing.status === 'deferred' - && unixSeconds(existing.availableAt) === unixSeconds(opts.availableAt) - && existing.reason === targetReason; - if (unchanged) return { delivery: serializeDelivery(existing), changed: false }; - + const reasonMatches = targetReason === null + ? isNull(deliveries.reason) + : eq(deliveries.reason, targetReason); + // A real change means: not terminal-delivered, and not already deferred to + // this exact (available_at, reason). Encoding the no-op predicate in the + // UPDATE makes it atomic — identical concurrent defers match no row on the + // loser and report `changed: false`, so no duplicate event fires. + const isNoop = and( + eq(deliveries.status, 'deferred'), + eq(deliveries.availableAt, opts.availableAt), + reasonMatches, + )!; const [updated] = await db .update(deliveries) .set({ @@ -221,7 +232,11 @@ export async function deferDelivery( reason: targetReason, updatedAt: new Date(), }) - .where(and(eq(deliveries.id, deliveryId), ne(deliveries.status, 'delivered'))) + .where(and( + eq(deliveries.id, deliveryId), + ne(deliveries.status, 'delivered'), + not(isNoop), + )) .returning(); return resolveTransition(db, workspaceId, agentId, deliveryId, updated, existing.channelId); } From ca928406e0f77a4b7a5cfb29247779d6aacd16f4 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:31:10 +0000 Subject: [PATCH 09/15] fix(engine): normalize legacy 'pending' deliveries; tidy lockfile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add migration 0014 to promote any historical deliveries.status = 'pending' rows (the old column default from migration 0010) to 'accepted'. The public DeliveryStatus contract has no 'pending' state and GET /v1/deliveries lists accepted + deferred, so legacy rows would otherwise be invisible and outside the typed enum. Idempotent. - Restore the @clack/prompts bundled dep lockfile metadata to match main (dev, not extraneous) — incidental churn from an earlier reinstall. --- package-lock.json | 2 +- .../db/migrations/0014_normalize_delivery_pending.sql | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 packages/engine/src/db/migrations/0014_normalize_delivery_pending.sql diff --git a/package-lock.json b/package-lock.json index 59792160..4af64a79 100644 --- a/package-lock.json +++ b/package-lock.json @@ -206,7 +206,7 @@ }, "node_modules/@clack/prompts/node_modules/is-unicode-supported": { "version": "1.3.0", - "extraneous": true, + "dev": true, "inBundle": true, "license": "MIT", "engines": { diff --git a/packages/engine/src/db/migrations/0014_normalize_delivery_pending.sql b/packages/engine/src/db/migrations/0014_normalize_delivery_pending.sql new file mode 100644 index 00000000..1b9ce6f1 --- /dev/null +++ b/packages/engine/src/db/migrations/0014_normalize_delivery_pending.sql @@ -0,0 +1,9 @@ +-- Normalize legacy delivery rows to the public status contract. +-- The deliveries table (migration 0010) defaulted `status` to 'pending', but the +-- public DeliveryStatus contract is accepted | delivered | deferred | failed, and +-- every insert path now writes 'accepted' explicitly. Any historical 'pending' +-- rows (created before that, via the old column default) would be invisible to +-- GET /v1/deliveries (which lists accepted + deferred) and would fall outside the +-- typed status enum. Promote them to 'accepted' so upgraded queues stay visible +-- and consistent. Idempotent: a no-op once no 'pending' rows remain. +UPDATE deliveries SET status = 'accepted' WHERE status = 'pending'; From b1c35f0972cacb59128a00fc5ff8c0ab48a6ea32 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:41:18 +0000 Subject: [PATCH 10/15] docs(delivery): codify that only `delivered` is terminal; `failed` is recoverable A reviewer suggested making `failed` terminal for late ack/defer. That conflicts with the retryable-failure contract: a retryable failure should be able to recover by acking (retry succeeded) or deferring (retry later). Document the intended lifecycle on DeliveryStatus and add a conformance test that a failed delivery can still be acked. No behavior change. --- .../__tests__/conformance/delivery.test.ts | 21 +++++++++++++++++++ packages/types/src/delivery.ts | 5 +++++ 2 files changed, 26 insertions(+) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 6daaa38c..49ecad0d 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -178,6 +178,27 @@ describe('durable delivery api', () => { expect(await listDeliveries(bob.token)).toHaveLength(0); }); + it('allows recovering a failed delivery via ack (retryable failures are not terminal)', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const fail = await stack.app.request(`/v1/deliveries/${item.id}/fail`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ error: 'transient', retryable: true }), + }); + expect(fail.status).toBe(200); + expect(((await fail.json()) as { data: { status: string } }).data.status).toBe('failed'); + + // A retry that succeeds can ack the previously-failed delivery. + const ack = await stack.app.request(`/v1/deliveries/${item.id}/ack`, { + method: 'POST', + headers: { authorization: `Bearer ${bob.token}` }, + }); + expect(ack.status).toBe(200); + expect(((await ack.json()) as { data: { status: string } }).data.status).toBe('delivered'); + }); + it('rejects an invalid defer payload', async () => { const { bob } = await seed(); const [item] = await listDeliveries(bob.token); diff --git a/packages/types/src/delivery.ts b/packages/types/src/delivery.ts index d63ba7b5..1bdf0537 100644 --- a/packages/types/src/delivery.ts +++ b/packages/types/src/delivery.ts @@ -5,6 +5,11 @@ import { z } from 'zod'; // delivered -> recipient acked (terminal success) // deferred -> recipient asked to retry no earlier than available_at // failed -> recipient failed to handle; retryable indicates whether a retry is sane +// +// `delivered` is the only terminal state — nothing reopens an acked delivery. +// `failed` is intentionally NOT terminal: a retryable failure may later be acked +// (the retry succeeded) or deferred (retry later). `failDelivery` is still +// idempotent — repeating a fail preserves the original error/retryable. export const DeliveryStatusSchema = z.enum(['accepted', 'delivered', 'deferred', 'failed']); export type DeliveryStatus = z.infer; From 3930c7ac108799bb6b1290bb0ed24ac2679c09c3 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 16:56:01 +0000 Subject: [PATCH 11/15] fix(engine): include deferred deliveries in agent pending_deliveries GET /v1/agents/:name listed only `accepted` rows in pending_deliveries, but the durable delivery contract treats `accepted + deferred` as the replayable queue (matching GET /v1/deliveries). Include deferred so the two surfaces agree. Adds conformance coverage. --- .../__tests__/conformance/delivery.test.ts | 23 +++++++++++++++++++ packages/engine/src/engine/agent.ts | 4 +++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 49ecad0d..918e7a77 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -199,6 +199,29 @@ describe('durable delivery api', () => { expect(((await ack.json()) as { data: { status: string } }).data.status).toBe('delivered'); }); + it('includes deferred deliveries in the agent detail pending_deliveries', async () => { + const { ws, bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const defer = await stack.app.request(`/v1/deliveries/${item.id}/defer`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ available_at: new Date(Date.now() + 60_000).toISOString() }), + }); + expect(defer.status).toBe(200); + + const res = await stack.app.request('/v1/agents/bob', { + headers: { authorization: `Bearer ${ws.workspaceKey}` }, + }); + expect(res.status).toBe(200); + const pending = ((await res.json()) as { + data: { pending_deliveries: Array<{ id: string; status: string }> }; + }).data.pending_deliveries; + const deferred = pending.find((d) => d.id === item.id); + expect(deferred).toBeDefined(); + expect(deferred!.status).toBe('deferred'); + }); + it('rejects an invalid defer payload', async () => { const { bob } = await seed(); const [item] = await listDeliveries(bob.token); diff --git a/packages/engine/src/engine/agent.ts b/packages/engine/src/engine/agent.ts index d6ab8e80..d32e09ed 100644 --- a/packages/engine/src/engine/agent.ts +++ b/packages/engine/src/engine/agent.ts @@ -172,7 +172,9 @@ export async function getAgentByName(db: Db, workspaceId: string, name: string) createdAt: deliveries.createdAt, }) .from(deliveries) - .where(and(eq(deliveries.agentId, agent.id), eq(deliveries.status, 'accepted'))) + // Mirror the GET /v1/deliveries replay queue: accepted + deferred are the + // non-terminal, still-pending states. + .where(and(eq(deliveries.agentId, agent.id), inArray(deliveries.status, ['accepted', 'deferred']))) .orderBy(deliveries.createdAt) .limit(50), ]); From 98a86fb25a54bf639e158c38d53c57843bdca82f Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Tue, 2 Jun 2026 07:57:44 -0400 Subject: [PATCH 12/15] chore: record conflict resolution trajectory --- .../completed/2026-06/traj_1pjavtw1yit2.json | 49 ++ .../completed/2026-06/traj_1pjavtw1yit2.md | 21 + .../2026-06/traj_1pjavtw1yit2.trace.json | 698 ++++++++++++++++++ .trajectories/index.json | 12 +- 4 files changed, 778 insertions(+), 2 deletions(-) create mode 100644 .trajectories/completed/2026-06/traj_1pjavtw1yit2.json create mode 100644 .trajectories/completed/2026-06/traj_1pjavtw1yit2.md create mode 100644 .trajectories/completed/2026-06/traj_1pjavtw1yit2.trace.json diff --git a/.trajectories/completed/2026-06/traj_1pjavtw1yit2.json b/.trajectories/completed/2026-06/traj_1pjavtw1yit2.json new file mode 100644 index 00000000..610ad3da --- /dev/null +++ b/.trajectories/completed/2026-06/traj_1pjavtw1yit2.json @@ -0,0 +1,49 @@ +{ + "id": "traj_1pjavtw1yit2", + "version": 1, + "task": { + "title": "Fix PR merge conflicts" + }, + "status": "completed", + "startedAt": "2026-06-02T11:53:41.161Z", + "agents": [], + "chapters": [], + "commits": [ + "82a6195", + "e52b06e", + "066c218", + "85f1e21", + "8e167e5", + "5f36d69", + "2620f09", + "82a5b8c" + ], + "filesChanged": [ + ".github/workflows/publish-engine.yml", + ".github/workflows/publish-npm.yml", + ".trajectories/index.json", + "package-lock.json", + "packages/a2a/package.json", + "packages/cli/package.json", + "packages/engine/package.json", + "packages/mcp/package.json", + "packages/observer-dashboard/package.json", + "packages/openclaw/package.json", + "packages/react/package.json", + "packages/sdk-typescript/package.json", + "packages/types/package.json" + ], + "projectId": "/Users/will/Projects/AgentWorkforce/relaycast", + "tags": [], + "_trace": { + "startRef": "3930c7ac108799bb6b1290bb0ed24ac2679c09c3", + "endRef": "82a61959d2246ce4a16efb5eda0bfff60d9a046d", + "traceId": "f241730b-39b5-4093-91d7-841fb6f27500" + }, + "completedAt": "2026-06-02T11:57:15.470Z", + "retrospective": { + "summary": "Merged origin/main into PR #155, resolved package version lockfile conflicts, verified GitHub mergeStateStatus is CLEAN, and passed @relaycast/engine tests and build.", + "approach": "Standard approach", + "confidence": 0.92 + } +} \ No newline at end of file diff --git a/.trajectories/completed/2026-06/traj_1pjavtw1yit2.md b/.trajectories/completed/2026-06/traj_1pjavtw1yit2.md new file mode 100644 index 00000000..a178f775 --- /dev/null +++ b/.trajectories/completed/2026-06/traj_1pjavtw1yit2.md @@ -0,0 +1,21 @@ +# Trajectory: Fix PR merge conflicts + +> **Status:** ✅ Completed +> **Confidence:** 92% +> **Started:** June 2, 2026 at 07:53 AM +> **Completed:** June 2, 2026 at 07:57 AM + +--- + +## Summary + +Merged origin/main into PR #155, resolved package version lockfile conflicts, verified GitHub mergeStateStatus is CLEAN, and passed @relaycast/engine tests and build. + +**Approach:** Standard approach + +--- + +## Artifacts + +**Commits:** 82a6195, e52b06e, 066c218, 85f1e21, 8e167e5, 5f36d69, 2620f09, 82a5b8c +**Files changed:** 13 diff --git a/.trajectories/completed/2026-06/traj_1pjavtw1yit2.trace.json b/.trajectories/completed/2026-06/traj_1pjavtw1yit2.trace.json new file mode 100644 index 00000000..dfb2e454 --- /dev/null +++ b/.trajectories/completed/2026-06/traj_1pjavtw1yit2.trace.json @@ -0,0 +1,698 @@ +{ + "version": "1.0.0", + "id": "f241730b-39b5-4093-91d7-841fb6f27500", + "timestamp": "2026-06-02T11:57:15.544Z", + "trajectory": "traj_1pjavtw1yit2", + "files": [ + { + "path": ".github/workflows/publish-engine.yml", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [] + } + ] + }, + { + "path": ".github/workflows/publish-npm.yml", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 88, + "end_line": 99, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 114, + "end_line": 138, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 145, + "end_line": 158, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 165, + "end_line": 171, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 191, + "end_line": 197, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 321, + "end_line": 327, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 333, + "end_line": 339, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": ".trajectories/index.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 5, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "package-lock.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1323, + "end_line": 1328, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1528, + "end_line": 1555, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1733, + "end_line": 1741, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1747, + "end_line": 1755, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1761, + "end_line": 1769, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1775, + "end_line": 1783, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1789, + "end_line": 1797, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1803, + "end_line": 1811, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1817, + "end_line": 1825, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1831, + "end_line": 1839, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1845, + "end_line": 1853, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1859, + "end_line": 1867, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1873, + "end_line": 1881, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1887, + "end_line": 1895, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1901, + "end_line": 1909, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1915, + "end_line": 1923, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1929, + "end_line": 1937, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1943, + "end_line": 1951, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1957, + "end_line": 1965, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1971, + "end_line": 1979, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1985, + "end_line": 1993, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 1999, + "end_line": 2007, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 2013, + "end_line": 2021, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 2027, + "end_line": 2035, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 2041, + "end_line": 2049, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 2055, + "end_line": 2063, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 2069, + "end_line": 2077, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 2661, + "end_line": 2673, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 2697, + "end_line": 2705, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3042, + "end_line": 3057, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3060, + "end_line": 3072, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3087, + "end_line": 3095, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3100, + "end_line": 3112, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3114, + "end_line": 3127, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3130, + "end_line": 3138, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3140, + "end_line": 3152, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3167, + "end_line": 3197, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3254, + "end_line": 3269, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3287, + "end_line": 3314, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 3479, + "end_line": 3500, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 4531, + "end_line": 4536, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 4732, + "end_line": 4762, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 4919, + "end_line": 4924, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 5251, + "end_line": 5266, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 5567, + "end_line": 5576, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 6992, + "end_line": 7017, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 7617, + "end_line": 7625, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 7715, + "end_line": 7736, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 7968, + "end_line": 7980, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 7984, + "end_line": 8017, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 8121, + "end_line": 8151, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 9394, + "end_line": 9424, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 9465, + "end_line": 9473, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 9621, + "end_line": 9629, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10203, + "end_line": 10221, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10243, + "end_line": 10254, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10336, + "end_line": 10341, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10536, + "end_line": 10552, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10557, + "end_line": 10569, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10581, + "end_line": 10610, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10618, + "end_line": 10628, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10651, + "end_line": 10663, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10668, + "end_line": 10683, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10686, + "end_line": 10694, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10699, + "end_line": 10711, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10714, + "end_line": 10726, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10729, + "end_line": 10737, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10742, + "end_line": 10754, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10797, + "end_line": 10817, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10841, + "end_line": 10848, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10871, + "end_line": 10883, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10899, + "end_line": 10909, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10923, + "end_line": 10937, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10940, + "end_line": 10948, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10953, + "end_line": 10965, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10968, + "end_line": 10980, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10983, + "end_line": 10991, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 10996, + "end_line": 11008, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 11052, + "end_line": 11071, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 11095, + "end_line": 11102, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 11125, + "end_line": 11137, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 11153, + "end_line": 11161, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 11164, + "end_line": 11172, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/a2a/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/cli/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 21, + "end_line": 27, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/engine/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 43, + "end_line": 50, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/mcp/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 13, + "end_line": 20, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/observer-dashboard/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 8, + "end_line": 16, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/openclaw/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 36, + "end_line": 43, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/react/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 33, + "end_line": 40, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/sdk-typescript/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + }, + { + "start_line": 27, + "end_line": 33, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + }, + { + "path": "packages/types/package.json", + "conversations": [ + { + "contributor": { + "type": "ai" + }, + "ranges": [ + { + "start_line": 1, + "end_line": 6, + "revision": "82a61959d2246ce4a16efb5eda0bfff60d9a046d" + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/.trajectories/index.json b/.trajectories/index.json index f5e85359..ffebca5d 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,5 +1,13 @@ { "version": 1, - "lastUpdated": "2026-06-01T04:23:22.030Z", - "trajectories": {} + "lastUpdated": "2026-06-02T11:57:15.658Z", + "trajectories": { + "traj_1pjavtw1yit2": { + "title": "Fix PR merge conflicts", + "status": "completed", + "startedAt": "2026-06-02T11:53:41.161Z", + "completedAt": "2026-06-02T11:57:15.470Z", + "path": "/Users/will/Projects/AgentWorkforce/relaycast/.trajectories/completed/2026-06/traj_1pjavtw1yit2.json" + } + } } \ No newline at end of file From f6cc1bddc7ad41130463a776aaec8a9b93314123 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 12:02:06 +0000 Subject: [PATCH 13/15] fix(engine): 400 on malformed JSON in fail endpoint The fail body is optional, but `c.req.json().catch(() => ({}))` silently coerced malformed JSON into {} and still marked the delivery failed (200), inconsistent with the defer endpoint. Now read the raw body: an empty body still defaults to {} (valid), but non-empty malformed JSON returns 400 without mutating state. Adds conformance coverage for both cases. --- .../__tests__/conformance/delivery.test.ts | 29 +++++++++++++++++++ packages/engine/src/routes/delivery.ts | 18 ++++++++++-- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 918e7a77..3c1454d4 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -222,6 +222,35 @@ describe('durable delivery api', () => { expect(deferred!.status).toBe('deferred'); }); + it('rejects a malformed JSON fail body with 400 (no state change)', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const res = await stack.app.request(`/v1/deliveries/${item.id}/fail`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: '{"error": "boom"', // missing closing brace + }); + expect(res.status).toBe(400); + + // The delivery must remain queued (accepted), not flipped to failed. + const queued = await listDeliveries(bob.token); + expect(queued).toHaveLength(1); + expect(queued[0].status).toBe('accepted'); + }); + + it('accepts an empty fail body (optional metadata)', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + + const res = await stack.app.request(`/v1/deliveries/${item.id}/fail`, { + method: 'POST', + headers: { authorization: `Bearer ${bob.token}` }, + }); + expect(res.status).toBe(200); + expect(((await res.json()) as { data: { status: string } }).data.status).toBe('failed'); + }); + it('rejects an invalid defer payload', async () => { const { bob } = await seed(); const [item] = await listDeliveries(bob.token); diff --git a/packages/engine/src/routes/delivery.ts b/packages/engine/src/routes/delivery.ts index cf74d63c..5e334773 100644 --- a/packages/engine/src/routes/delivery.ts +++ b/packages/engine/src/routes/delivery.ts @@ -84,8 +84,22 @@ deliveryRoutes.post( rateLimit, async (c) => { try { - const raw = await c.req.json().catch(() => ({})); - const parsed = FailDeliveryRequestSchema.safeParse(raw ?? {}); + // The fail body is optional (error/retryable), so an empty body is valid + // and defaults to {}. But a non-empty malformed JSON body is a client + // error and must 400 — don't silently coerce it and mutate state. + const bodyText = await c.req.text(); + let raw: unknown = {}; + if (bodyText.trim().length > 0) { + try { + raw = JSON.parse(bodyText); + } catch { + return c.json({ + ok: false, + error: { code: 'invalid_request', message: 'Invalid JSON body' }, + }, 400); + } + } + const parsed = FailDeliveryRequestSchema.safeParse(raw); if (!parsed.success) { return c.json({ ok: false, From b9fbf6c906adb711a7e6a10eda720ab023f28c14 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 12:02:06 +0000 Subject: [PATCH 14/15] chore: store repo-relative trajectory path in index The recorded trajectory index baked in an absolute local path (/Users/will/...), which isn't portable across machines/CI and leaks local workspace details. Store it repo-relative. --- .trajectories/index.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.trajectories/index.json b/.trajectories/index.json index ffebca5d..49afc5e5 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -7,7 +7,7 @@ "status": "completed", "startedAt": "2026-06-02T11:53:41.161Z", "completedAt": "2026-06-02T11:57:15.470Z", - "path": "/Users/will/Projects/AgentWorkforce/relaycast/.trajectories/completed/2026-06/traj_1pjavtw1yit2.json" + "path": ".trajectories/completed/2026-06/traj_1pjavtw1yit2.json" } } } \ No newline at end of file From 1d5f674cae2a7af4b4bf932d90dce7111dc8c7f7 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 12:04:31 +0000 Subject: [PATCH 15/15] fix(engine): don't inherit acceptance reason on defer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit deferDelivery defaulted the reason to the existing row's value, so deferring without a reason inherited the acceptance reason (message/mention/dm/...), making deferred records and delivery.deferred events carry a misleading reason. The defer reason is its own concept — default to null when none is provided. Adds a conformance test. --- .../src/__tests__/conformance/delivery.test.ts | 17 +++++++++++++++++ packages/engine/src/engine/delivery.ts | 5 ++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/packages/engine/src/__tests__/conformance/delivery.test.ts b/packages/engine/src/__tests__/conformance/delivery.test.ts index 3c1454d4..9b76c122 100644 --- a/packages/engine/src/__tests__/conformance/delivery.test.ts +++ b/packages/engine/src/__tests__/conformance/delivery.test.ts @@ -251,6 +251,23 @@ describe('durable delivery api', () => { expect(((await res.json()) as { data: { status: string } }).data.status).toBe('failed'); }); + it('does not inherit the acceptance reason when deferring without a reason', async () => { + const { bob } = await seed(); + const [item] = await listDeliveries(bob.token); + // The queued item carries an acceptance reason (e.g. "message"). + expect(item.reason).toBeTruthy(); + + const res = await stack.app.request(`/v1/deliveries/${item.id}/defer`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${bob.token}` }, + body: JSON.stringify({ available_at: new Date(Date.now() + 60_000).toISOString() }), + }); + expect(res.status).toBe(200); + const data = ((await res.json()) as { data: { status: string; reason: string | null } }).data; + expect(data.status).toBe('deferred'); + expect(data.reason).toBeNull(); + }); + it('rejects an invalid defer payload', async () => { const { bob } = await seed(); const [item] = await listDeliveries(bob.token); diff --git a/packages/engine/src/engine/delivery.ts b/packages/engine/src/engine/delivery.ts index dfd4e31e..f91272d1 100644 --- a/packages/engine/src/engine/delivery.ts +++ b/packages/engine/src/engine/delivery.ts @@ -211,7 +211,10 @@ export async function deferDelivery( if (!existing) return null; if (existing.status === 'delivered') return { delivery: serializeDelivery(existing), changed: false }; - const targetReason = opts.reason ?? existing.reason; + // The defer reason is its own concept; don't inherit the acceptance reason + // (message/mention/dm/...) when the caller omits one, or deferred records and + // delivery.deferred events would carry a misleading reason. Default to null. + const targetReason = opts.reason ?? null; const reasonMatches = targetReason === null ? isNull(deliveries.reason) : eq(deliveries.reason, targetReason);