fix: route publisher CLI commands through daemon API to prevent store corruption#114
fix: route publisher CLI commands through daemon API to prevent store corruption#114branarakic merged 3 commits intov10-rcfrom
Conversation
… corruption The publisher CLI commands (enqueue, jobs, job, stats) were opening separate in-memory store connections to the same persistence file that the daemon was using. Since OxigraphStore is in-memory with file-based flush, concurrent access caused race conditions where the daemon's flush would overwrite CLI-written data (and vice versa), making publisher jobs disappear. Fix: add publisher API endpoints to the daemon (/api/publisher/*) and update CLI commands to route through the daemon's HTTP API when it is running, falling back to direct store access when the daemon is not available. This ensures a single consistent view of the publisher job queue. Fixes publisher-cli-smoke.test.ts which was failing on v10-rc since the async publisher (PR #97) was merged. Made-with: Cursor
| } | ||
| const jobId = await publisherInspector.publisher.lift(request); | ||
| return jsonResponse(res, 200, { jobId }); | ||
| } catch (err: any) { |
There was a problem hiding this comment.
🔴 Bug: This blanket 500 handler masks client-side errors. Invalid JSON, oversized bodies, or an object-shaped but malformed request now come back as server faults, even though the outer request handler already maps SyntaxError/payload errors correctly and lift() expects a full LiftRequest. Let parse/validation errors bubble, or validate the full request shape here and return 400 for bad input.
| const client = await ApiClient.connect(); | ||
| const result = await client.publisherEnqueue(request); | ||
| jobId = result.jobId; | ||
| } catch { |
There was a problem hiding this comment.
🔴 Bug: Falling back on any exception here can enqueue the same publish twice. If the daemon accepted /api/publisher/enqueue but the client lost the response, this branch will call lift() again locally and create a duplicate job. Restrict the fallback to "daemon unreachable before request" cases, or make enqueue idempotent with a caller-supplied request key.
| if (status && !['accepted', 'claimed', 'validated', 'broadcast', 'included', 'finalized', 'failed'].includes(status)) { | ||
| console.error(`Invalid publisher job status: ${status}`); | ||
| process.exit(1); | ||
| const client = await ApiClient.connect(); |
There was a problem hiding this comment.
🟡 Issue: These publisher read commands now use the daemon as their source of truth, but cancel, retry, and clear below still operate on a fresh local inspector only. With DKG_API_PORT pointing at another node or a different store config, the publisher subcommands become split-brain. Route the remaining control-plane commands through the same API, or keep the whole command group consistently local until the API is complete.
…tent API routing 1. daemon.ts: Separate JSON parse errors (400) from lift() failures (500) in /api/publisher/enqueue. Add /api/publisher/cancel, /retry, /clear endpoints with input validation. 2. cli.ts: Restrict API-to-local fallback to daemon-unreachable errors (ECONNREFUSED, not running) so a successful-but-lost API response never triggers a duplicate local enqueue against the same store file. 3. cli.ts: Route cancel/retry/clear through daemon API first (with the same scoped fallback), eliminating the split-brain where read commands used the daemon but control-plane commands bypassed it. Made-with: Cursor
| import { ApiClient } from './api-client.js'; | ||
| import { parsePositiveMsOption } from './publisher-runner.js'; | ||
|
|
||
| function isDaemonUnreachable(err: unknown): boolean { |
There was a problem hiding this comment.
🔴 Bug: The daemon-first fallback only treats socket-level failures as "daemon unavailable". If a user upgrades the CLI while an older daemon is still running, the new /api/publisher/* calls will 404 and every publisher subcommand now fails instead of falling back to the local inspector path that still works. Treat 404/405/501 or an explicit "unsupported endpoint" response as a fallback condition here.
| return jsonResponse(res, 400, { error: 'Missing request object' }); | ||
| } | ||
| try { | ||
| const jobId = await publisherInspector.publisher.lift(request); |
There was a problem hiding this comment.
🟡 Issue: lift() does not validate the request shape; it only persists the job. With this new HTTP endpoint, payloads like { "request": {} } will return 200 and create accepted jobs that only fail later in the runner/inspection path. Validate the required LiftRequest fields here and return 400 before writing queue state.
An older daemon without /api/publisher/* endpoints returns 404, which should trigger the local fallback rather than failing the command. Made-with: Cursor
| const code = (err as any)?.cause?.code ?? (err as any)?.code; | ||
| if (code === 'ECONNREFUSED' || code === 'ECONNRESET') return true; | ||
| if (msg.includes('ECONNREFUSED') || msg.includes('fetch failed')) return true; | ||
| if (/HTTP (404|405|501)/.test(msg)) return true; |
There was a problem hiding this comment.
🔴 Bug: ApiClient throws the JSON error body ("Not found", etc.), not HTTP 404, so a running older daemon that lacks /api/publisher/* will never hit this fallback. If you later broaden the match, dropping into the file-backed inspector while the daemon is still alive reopens the .nq overwrite race this PR documents in the smoke test. Restrict fallback to transport-level failures only, and surface 404/405/501 as 'daemon too old' instead.
fix: route publisher CLI commands through daemon API to prevent store corruption
Attach httpStatus to errors thrown by ApiClient.get/post so the CLI's isDaemonUnreachable() can correctly detect HTTP 405/501 responses from older daemons. Without this, the status-based fallback check was dead code since Error objects never carried the response status. Made-with: Cursor
Summary
enqueue,jobs,job,stats) were opening separate in-memory OxigraphStore connections to the samestore.nqpersistence file that the running daemon was using. Since OxigraphStore is in-memory with periodic file-based flush (50ms debounce), concurrent access caused race conditions where the daemon's flush would overwrite CLI-written data, making publisher jobs disappear immediately after being enqueued./api/publisher/enqueue,/api/publisher/jobs,/api/publisher/jobs/:id,/api/publisher/stats) and updated CLI commands to route through the daemon's HTTP API when it is running, falling back to direct store access when the daemon is not available.publisher-cli-smoke.test.tsfailure that has been present onv10-rcsince PR V10 publisher #97 was merged.Changes
daemon.ts: Added publisher queue API endpoints +publisherInspectorparameter that reuses the daemon's own store (or runtime publisher when enabled)api-client.ts: AddedpublisherEnqueue,publisherJobs,publisherJob,publisherJobPayload,publisherStatsmethodscli.ts: Updatedpublisher enqueue,publisher jobs,publisher job,publisher statscommands to try API first, fall back to direct store accessTest plan
publisher-cli-smoke.test.tspasses locally (was failing on v10-rc)Made with Cursor