Skip to content

feat(enrichment-worker): scaffold CDC consumer + claim primitive (BS#892 PR-1)#1007

Merged
jakebromberg merged 3 commits into
mainfrom
feat/892-enrichment-consumer
May 22, 2026
Merged

feat(enrichment-worker): scaffold CDC consumer + claim primitive (BS#892 PR-1)#1007
jakebromberg merged 3 commits into
mainfrom
feat/892-enrichment-consumer

Conversation

@jakebromberg
Copy link
Copy Markdown
Member

First PR of BS#892 (Epic C C2). Stands up the apps/enrichment-worker/ package with the log-only CDC dispatcher and the atomic claim primitive that the N×N consumer cardinality depends on. No DB writes; no LML calls — that's PR-2.

Why split

The keystone consumer is too big for one PR (acceptance: package + dispatcher + claim + LML wiring + finalize + SSE broadcast + two-consumer integration test). Splitting lets the dispatch path be verified in prod (deploy this PR, watch the would-enrich logs flow on real CDC traffic) before any write-side risk.

PR-2 (follow-up): swap the log-only handler for claim@wxyc/lml-client.lookupMetadata → finalize UPDATE → SSE broadcast (also closes #893/#628). The two-consumer integration test from #892 acceptance also lands in PR-2 since it needs the full pipeline.

What's in this PR

  • apps/enrichment-worker/claim.tsclaimRowForEnrichment(id). Atomic UPDATE flowsheet SET metadata_status='enriching', enriching_since=now() WHERE id=$1 AND metadata_status='pending'. Race-free across N consumer instances: the first claimer wins, every sibling sees 0 rows and skips. Returns { claimed: true, id } or { claimed: false }.
  • apps/enrichment-worker/cdc-subscriber.tsfilterForEnrichment(event) + makeLogOnlyHandler(). The filter is the perimeter for which CDC events the consumer acts on: flowsheet INSERT, entry_type='track', metadata_status='pending', non-empty artist_name, numeric id.
  • apps/enrichment-worker/worker.ts — entrypoint. Starts the LISTEN connection via @wxyc/database's shared startCdcListener() and registers the log-only handler. Graceful SIGTERM/SIGINT shutdown closes the LISTEN connection and the DB pool cleanly.
  • Package scaffoldpackage.json (@wxyc/enrichment-worker), tsconfig.json (references shared/database), tsup.config.ts (ESM, node20, single-file output).
  • tests/mocks/database.mock.ts — adds metadata_status + enriching_since to the flowsheet mock so the new primitive's .set() typechecks under the existing Drizzle mock moduleNameMapper.
  • CLAUDE.md — new package row.

CDC fan-out audit (acceptance pre-condition)

Documented inline in cdc-subscriber.ts's file header so the next reader can trust the dispatch path:

  • cdc-listener.ts:43-49 opens a per-process postgres() client — no pool collapse across N BS instances or N workers. Each Node process gets its own LISTEN connection; PG pg_notify broadcasts to every listener.
  • cdc-websocket.ts:89-99 is pure fan-out — no upstream dedup or coalescing of events.
  • pg_notify is fire-and-forget per docs/cdc.md:25; a worker that drops its LISTEN connection misses events until reconnect with no replay endpoint. The C6 cron is the mandatory complement, not optional safety net.

This validates the N×N cardinality decision in #892's body. No fallback to N×1 leader election needed.

Tests

  • tests/unit/apps/enrichment-worker/claim.test.ts — 5 tests pinning the claim contract: pending row claimed; WHERE narrows by id + metadata_status='pending'; sibling-claimed no-op; terminal-state no-op; DB errors propagate.
  • tests/unit/apps/enrichment-worker/cdc-subscriber.test.ts — 11 tests pinning the filter perimeter: happy path + every skip case from the documented criteria (non-flowsheet, UPDATE, DELETE, null data, non-track entry_type, already-claimed metadata_status, null/empty artist_name, non-number id, null/undefined album_title/track_title coercion). Plus 1 test for makeLogOnlyHandler behavior.

Pre-flight

  • npm run typecheck — clean
  • npm run lint — 0 errors, 422 warnings (none new)
  • npm run format:check — clean
  • npm run build --workspace=@wxyc/enrichment-worker — clean
  • npm run test:unit — 2070/2070 pass

Out of scope (PR-2)

Related

  • Parent: #877 (Epic C)
  • Sibling children blocked on the full consumer: #893 (C3 SSE), #894 (C5 drop fire-and-forget), #895 (C6 cron retune), #896 (C7 ETL CDC verification)
  • Predecessor: BS#891 / PR #1004metadata_status enum column (merged 2026-05-22)
  • Spine context: post-launch service hardening project #32

…mitive (BS#892 PR-1)

First PR of Epic C C2: stand up the apps/enrichment-worker/ package with the
log-only CDC dispatcher and the atomic claim primitive that the N×N consumer
cardinality depends on. No DB writes; no LML calls.

Components:
- claim.ts: claimRowForEnrichment(id) — UPDATE flowsheet SET metadata_status =
  'enriching', enriching_since = now() WHERE id = $1 AND metadata_status =
  'pending'. The narrow WHERE makes this race-free across N consumer instances
  (the first claimer wins, every sibling sees 0 rows and skips).
- cdc-subscriber.ts: filterForEnrichment + makeLogOnlyHandler. The filter is
  the perimeter that decides which CDC events the consumer acts on
  (flowsheet INSERT, entry_type=track, metadata_status=pending, non-empty
  artist_name).
- worker.ts: entrypoint. Starts the LISTEN connection and registers the
  log-only handler. Graceful SIGTERM/SIGINT shutdown.

CDC fan-out audit (acceptance pre-condition for the cardinality decision):
- cdc-listener.ts:43-49 opens a per-process postgres() client — no pool
  collapse across N BS instances or N workers.
- cdc-websocket.ts:89-99 is pure fan-out — no upstream dedup or coalescing.
- pg_notify is fire-and-forget; missed events fall to the C6 (#895) cron.
- The audit is documented in cdc-subscriber.ts's file header so the next
  reader can trust the dispatch path.

PR-2 will swap the log-only handler for claim → @wxyc/lml-client.lookupMetadata
→ finalize UPDATE → SSE broadcast (also closes #893/#628). The split lets the
dispatch path be verified in prod (deploy this PR, watch the would-enrich logs
flow on real CDC traffic) before any write-side risk.

Tests: 16 unit tests pin the claim contract (5: pending row claimed; WHERE
narrows by id + status; sibling-claimed no-op; terminal-state no-op; DB
errors propagate) and the filter perimeter (11: happy path + every skip
case from the documented criteria). Suite: 2070/2070 pass.

Mock: tests/mocks/database.mock.ts adds metadata_status + enriching_since
to the flowsheet mock so the new primitive's .set() typechecks under the
mock's Drizzle moduleNameMapper.

CLAUDE.md: new package row in the monorepo table.
…ry guard

Review feedback on PR-1:
- claim.ts: WHERE switches from raw `sql\`\`` template to `and(eq(flowsheet.id, id), eq(flowsheet.metadata_status, 'pending'))`. The typed builders give compile-time checking on the metadata_status enum literal (the 5-state column from BS#891) and drop hand-quoted column names. Matches the backend services convention; the closest analog (jobs/flowsheet-metadata-backfill/enrich.ts) uses raw SQL but for the same idempotency guard, typed builders are the more defensive shape.
- worker.ts: shutdown handler now latches on first signal. SIGTERM+SIGINT (or a duplicate signal) racing through stopCdcListener + closeDatabaseConnection in parallel was previously possible; the inner functions are idempotent so this was safe but not robust.
- claim.test.ts: 'narrows the WHERE' assertion swapped from SQL-string-matching to a structural check on the .where() call. The id + 'pending' contract is now compile-time-enforced by the typed builders; the runtime behavior is still pinned by the sibling-claimed / terminal-state / DB-error tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Development

Successfully merging this pull request may close these issues.

[C3] Broadcast liveFs:update SSE event after enrichment lands (closes #628)

1 participant