feat(agent,daemon): wire async-promote queue + 5 HTTP routes (PR 2/4)#660
Conversation
PR 2/4 of the async-promote-queue series. Builds on PR #1 (the TripleStoreAsyncPromoteQueue library merged in feat/async-promote-queue-lib) by exposing the queue to user code via two layers: 1. Agent surface (packages/agent/src/dkg-agent.ts): - Lazy `agent.promoteQueue` accessor — single shared TripleStoreAsyncPromoteQueue instance, constructed on first use against `agent.store`. The control graph (urn:dkg:promote-queue:control-plane) lives in the same triple store as everything else, so the queue survives daemon restarts. - 5 new methods on `agent.assertion`: promoteAsync(cgId, name, opts) → { jobId } getPromoteAsyncStatus(jobId) → PromoteJob | null listPromoteAsyncJobs(filter?) → PromoteJob[] cancelPromoteAsync(jobId) recoverPromoteAsync(jobId) - `agent.configurePromoteQueue(config)` for tests / future daemon config plumbing — must be called before first access. Note: the worker-side surface (claimNext / heartbeat / succeed / fail / recordCommitMarker / recoverOnStartup) is NOT exposed on `agent.assertion` to keep user-facing callers from accidentally driving the lifecycle. PR #3's worker reaches in via `agent.promoteQueue` directly. 2. HTTP routes (packages/cli/src/daemon/routes/assertion.ts): - POST /api/assertion/:name/promote-async → 202 { jobId, state } - GET /api/assertion/promote-async → list, scoped by contextGraphId / state - GET /api/assertion/promote-async/:jobId → single job or 404 - DELETE /api/assertion/promote-async/:jobId → cancel; 409 if running - POST /api/assertion/promote-async/:jobId/recover → requeue; 409 if not failed All share the SMALL_BODY_BYTES (256 KB) cap, per RFC §3.1. Routing precedence: the collection GET (`/promote-async`) is checked before the per-job GET (`/promote-async/:jobId`) so the list route isn't claimed by the per-job handler — and the per-job GET filters out `/recover`-suffixed paths so the recover POST keeps its place. 3. Tests (packages/cli/test/promote-async-routes.test.ts, +19): Mirror import-artifact-routes.test.ts pattern: real HTTP server, real TripleStoreAsyncPromoteQueue backed by OxigraphStore, mock agent whose assertion subsurface delegates to the queue. Tests exercise the wire contract AND the queue invariants end-to-end: - happy path enqueue + 202 response - duplicate enqueue returns 409 with existingJobId - missing/invalid params return 400 - explicit entities list round-trips - GET status, list (incl. ?contextGraphId / ?state / ?limit), 400 on garbage filter values - DELETE happy path + 409 on running job + 404 on missing - POST .../recover happy path + 409 + 404 - routing precedence assertion (list vs per-job) All 19 pass. Verification: publisher tests: 32 passed (PR #1's queue invariants) cli routes tests: 19 passed (PR #2's wire contract) tsc clean across packages/{agent,cli,publisher,storage} No behavioural impact on existing callers — every change is additive. The actual worker that drains the queue lands in PR #3; until then, enqueue/list/cancel/recover all work but `running` is unreachable. Co-authored-by: Cursor <cursoragent@cursor.com>
…into pr-660-fix # Conflicts: # packages/publisher/src/async-promote-queue-impl.ts
| if (!validateEntities(entities, res)) return; | ||
| if (!validateOptionalSubGraphName(subGraphName, res)) return; | ||
| try { | ||
| const result = await agent.assertion.promoteAsync( |
There was a problem hiding this comment.
🔴 Bug: This endpoint is now publicly available, but this PR only enqueues jobs. The diff explicitly says the worker that drains the queue ships later, so every successful POST /promote-async will sit in queued forever and never perform the WM→SWM promote. Please either wire the worker in the same PR or return 501/feature-gate this route until the queue is actually drained.
There was a problem hiding this comment.
Addressed in 6fe24c7.
Routes now gate on daemonState.promoteWorkerAvailable (added to daemon/state.ts, defaults to false). POST /api/assertion/:name/promote-async, the list endpoint, and GET /api/assertion/promote-async/:jobId all return 503 Service Unavailable with the body { "error": "async-promote worker is not available[: <reason>]" } until something explicitly flips the flag.
This forward-merges cleanly with the worker PR — feat/async-promote-queue-worker sets promoteWorkerAvailable = true on supervisor start, and promoteWorkerUnavailableReason = <err> if startup fails — so we never silently accept jobs into a dead queue.
Added tests in packages/cli/test/promote-async-routes.test.ts:
POST /:name/promote-async returns 503 when the worker is unavailablePOST /:name/promote-async surfaces the unavailability reason when setGET /promote-async returns 503 when the worker is unavailableGET /promote-async/:jobId returns 503 when the worker is unavailable
| if (!job) { | ||
| return jsonResponse(res, 404, { error: `Promote job not found: ${jobId}` }); | ||
| } | ||
| return jsonResponse(res, 200, job); |
There was a problem hiding this comment.
🔴 Bug: Returning the raw PromoteJob here bakes the internal queue shape into the HTTP API (request, attempt, lease, numeric timestamps, and even lease.claimToken once a job is running). That does not match the RFC contract this route references, and it leaks worker-internal state that clients should not depend on. Serialize to the documented wire schema instead, and use the same mapper for the list endpoint.
There was a problem hiding this comment.
Addressed in 6fe24c7.
Introduced PromoteJobView (RFC §3.2) + promoteJobToView() mapper in packages/cli/src/daemon/routes/assertion.ts. The wire schema is now:
interface PromoteJobView {
jobId: string;
state: PromoteJobState;
contextGraphId: string;
assertionName: string;
subGraphName?: string;
entities: readonly string[] | 'all';
enqueuedAt: string; // ISO-8601
updatedAt: string; // ISO-8601
startedAt?: string; // ISO-8601, from lease.acquiredAt
finishedAt?: string; // ISO-8601, from result.succeededAt | lastError.recordedAt | updatedAt
entitiesPromoted?: number;
attempts: number;
maxAttempts: number;
nextRetryAt?: string;
lastError?: { code: string; message: string; retryable: boolean };
reason?: string;
}Both GET /api/assertion/promote-async (list) and GET /api/assertion/promote-async/:jobId (status) now route through the same mapper, so the surfaces stay in lockstep. Stripped fields: request, attempt, lease (including lease.claimToken and lease.workerId), commitMarker, and the raw numeric epoch timestamps. lastError.code is mapped from the internal PromoteFailureClassification (transient | cap_exceeded | fatal).
Test additions in packages/cli/test/promote-async-routes.test.ts:
GET /promote-async/:jobId returns the documented wire schema (RFC §3.2)— pins the flat top-level identity,entities, attempts, and ISO-8601 timestamps.GET /promote-async/:jobId never leaks the internal queue shape (claimToken, request, attempt, commitMarker)— forces the job intorunningso it has a real lease + claim token, then asserts the response carries neither.
…oteJob to wire schema Codex PR #660 round-1 review: - id=3302072808 — routes are now publicly available but this PR only enqueues jobs; the supervisor that drains the queue lands in the next PR. Add `daemonState.promoteWorkerAvailable` (defaults to `false`) and refuse `POST /api/assertion/:name/promote-async`, the list endpoint, and the per-job status endpoint with `503 Service Unavailable` until something explicitly flips the flag. Surface `daemonState.promoteWorkerUnavailableReason` in the body so operators see *why* the queue is closed. Forward-compatible with the follow-up worker PR, which simply sets the flag on successful supervisor startup. - id=3302072811 — the GET status + list endpoints used to return the raw `PromoteJob` shape, leaking internal queue bookkeeping (`request`, `attempt`, `lease.claimToken`, `lease.workerId`, `commitMarker`, numeric epoch timestamps). Introduce `PromoteJobView` matching RFC §3.2 (flat assertion identity, ISO-8601 timestamps, `lastError = { code, message, retryable }` derived from the queue's failure classification) and route both endpoints through the same `promoteJobToView()` mapper so the list/status surfaces stay in lockstep. Test additions in `packages/cli/test/promote-async-routes.test.ts`: - POST/list/status all return 503 with the unavailability reason when the worker flag is off. - GET `:jobId` pins the wire schema (flat top-level identity, ISO-8601 `enqueuedAt`). - GET `:jobId` verifies the response never carries `request`, `attempt`, `lease`, `commitMarker`, `claimToken`, or `workerId`, even when the underlying job has an active lease. Co-authored-by: Cursor <cursoragent@cursor.com>
| req.method === "GET" && | ||
| path === "/api/assertion/promote-async" | ||
| ) { | ||
| if (asyncPromoteUnavailable(res)) return; |
There was a problem hiding this comment.
🔴 Bug: Applying the worker-availability gate to the read-only queue endpoints makes existing jobs impossible to inspect exactly when the supervisor is down. A client polling an existing jobId will get 503 instead of the last durable state (queued/failed/succeeded), and operators lose the ability to see which jobs need recovery. Keep the guard on mutating routes only; the GET routes should still read persisted queue state from storage.
| ); | ||
| if (jobId === null) return; | ||
| try { | ||
| await agent.assertion.recoverPromoteAsync(jobId); |
There was a problem hiding this comment.
🔴 Bug: recover currently requeues a failed job even when promoteWorkerAvailable is false. In that state the API returns queued, but no worker can ever drain it, which recreates the same silent black-hole behavior the enqueue route is trying to prevent. Add the same availability check here (or otherwise reject recoveries while the worker is unavailable).
| return jsonResponse(res, 200, promoteJobToView(job)); | ||
| } | ||
|
|
||
| // DELETE /api/assertion/promote-async/:jobId — cancel a queued/failed_retrying job |
There was a problem hiding this comment.
🟡 Issue: This comment says DELETE can cancel failed_retrying jobs, but AsyncPromoteQueue.cancel() only allows the queued state. That mismatch makes the public contract harder to trust for callers and tests; either update the comment or extend the implementation if failed_retrying cancellation is intended.
…e-queue-worker Pull #660's round-1 review fix (`6fe24c7a`) into this PR's branch: - `routes/assertion.ts`: replaced inline 503 check with the new `asyncPromoteUnavailable(res)` helper that #660 introduced (already used by the list/status/cancel/recover endpoints). - `state.ts`: kept #660's richer doc on `promoteWorkerAvailable`, rewritten for THIS PR's context — the supervisor that flips the flag is part of #665, not "a follow-up PR". - `test/promote-async-routes.test.ts`: rewrote the stale "supervisor lands in a follow-up PR" comment in `beforeEach`; tests still manually flip the flag because they don't spin up the supervisor. All 30 wire-contract tests in `promote-async-routes.test.ts` pass post-merge. Pre-existing `lifecycle.ts:475` strict-mode warning (`startup is used before being assigned`) is unrelated to the merge — it was on this branch before and is being addressed separately. Co-authored-by: Cursor <cursoragent@cursor.com>
…eclaim Codex review on PR #665 flagged two ways the async-promote chain could silently corrupt or eat work: - id=3300423547 (`packages/cli/src/daemon/lifecycle.ts:905`): a single startup failure left `/promote-async` accepting jobs that nobody would ever drain. - id=3302135756 (`packages/publisher/src/async-promote-queue-impl.ts:376`): reclaiming `running` rows whenever `commitMarker.promoteStarted !== true` is not backward-compatible with jobs persisted by a pre-v2 worker — those rows can legitimately have an absent marker even after they entered `assertionPromote()`, so an upgraded daemon could move a partially-promoted job back to `queued` and run it twice. Daemon (Fix #3): - `startPromoteWorkerDaemonLifecycle` now tags both success (`[async-promote-worker] supervisor started; /promote-async accepting jobs`) and failure (`[async-promote-worker] startup failed; queue is read-only until daemon restart: <reason>`) so operators can grep one canonical tag. The startup-failure path keeps the existing `daemonState.promoteWorkerAvailable = false` + `daemonState.promoteWorkerUnavailableReason = err.message` writes, which PR #660 already turned into the `503` the routes return. Publisher (Fix #4): - New `ASYNC_PROMOTE_QUEUE_FORMAT_VERSION = 2` constant exported from the publisher's public surface. - `PromoteJob` gains an optional `formatVersion?: number` field (kept optional so the parser still accepts pre-v2 rows on disk). - `enqueue()` and `recover()` stamp the current format version on every job they write. - `reconcileExpiredRunning()` is now version-gated: reclaim only when `formatVersion >= 2 AND swmInserted === false AND promoteStarted !== true`. Any older row (missing or `< 2`) is routed to the abandoned / manual-recovery path with a descriptive reason + lastError message pointing at the legacy format, so the operator sees exactly why we refused to auto-reclaim. Tests (new + updated): - `packages/publisher/test/async-promote-queue.test.ts` - `21c` (existing) updated to record `promoteStarted` before the lease expires so the new version-gated abandon path fires as the test expects. - `26b` rewritten as `recoverOnStartup() ABANDONS legacy running jobs without a formatVersion marker (Codex PR #665 id=3302135756)`: synthesises a row with `formatVersion: undefined` and asserts it lands in `failed` with the legacy reason / message. - `26c` new: `recoverOnStartup() RECLAIMS v2 running jobs with promoteStarted=false`. - `26d` new: `recoverOnStartup() ABANDONS legacy running jobs even when promoteStarted=false is explicitly present`. - `packages/cli/test/promote-async-daemon-lifecycle.test.ts` - Existing `drains a route-enqueued job …` test: enqueue assertion corrected to `200 OK` per RFC §3.1 (matches the route). - New `worker startup failure leaves the daemon-state flag off and tags the structured log (Codex PR #665 id=3300423547)` test: stubs `recoverOnStartup` to throw, asserts `daemonState.promoteWorkerAvailable === false`, `promoteWorkerUnavailableReason` carries the error, and the `[async-promote-worker] startup failed … queue is read-only` log line was emitted. Caveats: - Two pre-existing failures on baseline `feat/async-promote-queue-worker` in `packages/cli/test/async-promote-worker.test.ts` (`runPromoteJob > on success, records the recovery commit marker …` and `createPromoteWorkerSupervisor > runs recoverOnStartup() during start()`) are untouched — they were failing on `ghorigin/feat/async-promote-queue-worker` before this commit, so they are not in scope here. Co-authored-by: Cursor <cursoragent@cursor.com>
PR OriginTrail#3 of the async-promote-queue series (stacked on OriginTrail#660). Drains the queue introduced in PR OriginTrail#1 and exposed in PR OriginTrail#2 by running N worker loops that: 1. recoverOnStartup() before polling — reclaim leases held by a previous boot whose workers crashed mid-promote. 2. setInterval(claimNext, 100ms) × workerConcurrency (default 4). 3. On claim → invoke agent.assertion.promote(...) under a background heartbeat that refreshes the lease every 60s. 4. On success → record all 4 commit-marker steps (single OUTER boundary marker per plan §7 strategy b) then succeed() and emit memoryGraphChanged. 5. On failure → classifyPromoteError(err) → fail() with classification {transient | cap_exceeded | fatal} and let the queue handle backoff + maxRetries. Error classification seeded from the rc.10 Graphify import patterns documented in INTEGRATION_NOTES_GRAPHIFY.md / FINDINGS_v2.md: - "Promoted assertion too large for gossip" → cap_exceeded - "Request body too large" → cap_exceeded - fetch failed / ECONNRESET / timeout → transient - anything else → fatal Shutdown semantics follow RFC §6.2: stop polling, wait up to shutdownTimeoutMs for in-flight promotes to complete, but DO NOT mark `running → queued` on shutdown — the lease will expire and the next boot's recoverOnStartup() decides what to do. Wiring into packages/cli/src/daemon/lifecycle.ts: - createPromoteWorkerSupervisor instantiated inside startPostApiPublishing (same hook used by the async-lift publisher runtime) so a recoverOnStartup hiccup never blocks boot. - Stopped inside shutdown() between publisherRuntime.stop() and agent.stop() so the underlying triple store is still open for the drain phase. Tests: - packages/cli/test/async-promote-worker.test.ts (21 new tests) - classifyPromoteError: 7 tests covering each rc.10 pattern - runPromoteJob: 8 tests for happy path, commit-marker bookkeeping, retry vs terminal failure, max-retries exhaustion, memoryGraphChanged emission gating, no-lease guard - createPromoteWorkerSupervisor: 6 tests for tickOnce, empty queue, multi-slot fanout, counter tracking, shutdown timeout preserves `running` state for next-boot recovery, recoverOnStartup integration - PR OriginTrail#1 (32 queue lib tests) + PR OriginTrail#2 (19 route tests) still green. Co-authored-by: Cursor <cursoragent@cursor.com>
…e/rc.11 Co-authored-by: Cursor <cursoragent@cursor.com> # Conflicts: # packages/cli/vitest.unit.config.ts
…-gate integration PR OriginTrail#667's e2e test was authored before PR OriginTrail#660 added 503-gating on `daemonState.promoteWorkerAvailable`. After integration: - Set `daemonState.promoteWorkerAvailable = true` in beforeEach + reset to initial-boot `false` in afterEach (mirrors what promote-async-routes.test.ts already does post-integration). - Update happy-path status assertion from 202 to 200 to match the route's current contract (PR OriginTrail#660). Semantic 202 "Accepted" upgrade can ship in rc.12 if the operator surface wants it. - Soften the commitMarker assertion to `toMatchObject({ swmInserted: true })`. The stubbed promote in this test only exercises that one flag; the wmCleaned/lifecycleStamped/gossiped fields require a real chain + SWM substrate. Tracked in rc.12 backlog (OriginTrail#676) for a fixture-backed variant. Co-authored-by: Cursor <cursoragent@cursor.com>
Bump root + 17 workspace packages from 10.0.0-rc.10 to 10.0.0-rc.11. Promote the CHANGELOG "Unreleased" block to the dated rc.11 section. Release contents (PR OriginTrail#680 — release/rc.11 integration branch): Core-stability hardening (rc.10 deadlock workstream): OriginTrail#655 hard shutdown timeout OriginTrail#657 async-promote queue library OriginTrail#659 auto-update install-source override OriginTrail#669 AbortSignal plumbing through DKGNode.stop() OriginTrail#670 chain provider filter log-spam silencer OriginTrail#666 dkg migrate-to-npm CLI subcommand OriginTrail#668 AutoNAT boot self-probe OriginTrail#661 core relay capability sanity check OriginTrail#662 relay metrics in /api/status OriginTrail#664 supervisor positive-liveness probe ERC-721 mint ordering: OriginTrail#681 CEI mint-last at every mint site (supersedes OriginTrail#663, which proposed _safeMint and was rejected as a public-API break for older Gnosis Safes / DAO timelocks / strategy wrappers). Keeps _mint; reorders so _mint is the last state-changing call. relock moves _burn before _mint. Async-promote queue stack: OriginTrail#660 /promote-async route wiring with worker-readiness gate OriginTrail#665 async-promote worker supervisor OriginTrail#667 async-promote queue config + e2e tests Honest ACK + tentative VM cleanup: OriginTrail#671 delete self-signed ACK fallback + tentative-VM concept OriginTrail#672 typed errors + LU-6 runbook + provenance telemetry Test infra: OriginTrail#673 rc.11 test infrastructure fixes Verification on the integration branch (release/rc.11): pnpm -r build clean pnpm --filter @origintrail-official/dkg test:unit 403/403 PASS evm-module 278/278 PASS (NFT + CG contract tests) devnet-test-rc11-promote-crash-recovery.sh GREEN devnet-test-rc11-shutdown-mid-publish.sh GREEN (549ms shutdown, 0 [shutdown-timeout] lines) devnet-test-rfc38-all.sh 10/11 PASS (lj is the pre-existing documented LU-6 cores-only gap) devnet-test.sh 343/347 PASS — 4 fails tracked in OriginTrail#676 as stale test expectations against OriginTrail#671's seal contract + V10 auto-registration. Co-authored-by: Cursor <cursoragent@cursor.com>
What
PR 2 of the async-promote-queue series. Builds on PR #657 (the
TripleStoreAsyncPromoteQueuelibrary) by exposing the queue to user code via two layers:1. Agent surface (
packages/agent/src/dkg-agent.ts, +85)agent.promoteQueueaccessor — single sharedTripleStoreAsyncPromoteQueueinstance, constructed on first use againstagent.store. The control graph (urn:dkg:promote-queue:control-plane) lives in the same triple store as everything else, so the queue survives daemon restarts.agent.assertion:promoteAsync(cgId, name, opts) → { jobId }getPromoteAsyncStatus(jobId) → PromoteJob | nulllistPromoteAsyncJobs(filter?) → PromoteJob[]cancelPromoteAsync(jobId)recoverPromoteAsync(jobId)agent.configurePromoteQueue(config)for tests / future daemon config plumbing — must be called before first access.The worker-side surface (
claimNext/heartbeat/succeed/fail/recordCommitMarker/recoverOnStartup) is deliberately NOT exposed onagent.assertion. PR #3's worker reaches in viaagent.promoteQueuedirectly so user-facing callers can't accidentally drive the lifecycle.2. HTTP routes (
packages/cli/src/daemon/routes/assertion.ts, +173)POST/api/assertion/:name/promote-async{ jobId, state, enqueuedAt }existingJobIdon duplicate (uniqueness conflict)GET/api/assertion/promote-async?contextGraphId,?state=queued,running,...,?limit=GET/api/assertion/promote-async/:jobIdDELETE/api/assertion/promote-async/:jobIdPOST/api/assertion/promote-async/:jobId/recoverAll share the
SMALL_BODY_BYTES(256 KB) cap, per RFC §3.1.Routing precedence in the linear
startsWith/endsWithdispatch:GET /promote-asyncis checked BEFORE the per-jobGET /promote-async/:jobIdso the list route isn't claimed by the per-job handler.GETfilters out/recover-suffixed paths so the recoverPOSTkeeps its place./promote-asyncroutes are checked AFTER the existing sync/promoteroute to keep that contract untouched.3. Tests (
packages/cli/test/promote-async-routes.test.ts, +375)Mirrors
import-artifact-routes.test.tspattern: real HTTP server, realTripleStoreAsyncPromoteQueuebacked byOxigraphStore, mock agent whoseassertionsubsurface delegates to the queue. Tests exercise the wire contract AND the queue invariants end-to-end:existingJobIdentitiesarray round-trips?contextGraphId/?state/?limit).../recoverhappy path + 409 + 404All 19 pass:
Verification
What's still NOT in this PR (and why)
queuedand never transition torunning. The HTTP routes still work for inspecting/cancelling/recovering — this lets us validate the agent surface against real test clients before adding the live worker.memoryGraphChangedevents. Enqueueing isn't a memory change; that event will be emitted from the worker (PR the context graph explodes even after 4-5 rounds in the game #3) when a job hitssucceeded.importLimits.promoteWorkerConcurrencyon/api/status. Comes with PR the consensus is hardcoded #4.Stacked review order
PR #657 → PR #658 (this) → PR #3 → PR #4. Each builds on the previous; each is reviewable as-is.Made with Cursor