Skip to content

[C2] Build CDC-driven enrichment consumer (separate worker) #892

@jakebromberg

Description

@jakebromberg

Problem

Enrichment is wired inline at every insert site (flowsheet.controller.ts:addEntry, internal.route.ts:flowsheet-webhook). It runs in the BS Express process. The Promise is lost on restart. There's no single component responsible for "this row needs enrichment."

Per docs/cdc.md, BS already broadcasts every INSERT/UPDATE/DELETE on flowsheet via the CDC WebSocket. So the infrastructure for "subscribe to row inserts" is already running — it's just being consumed only by the reconciliation monitor.

Consumer cardinality

Decision: N×N with idempotent claim. Multiple BS instances each run the consumer; the same CDC event delivered to N instances is safe because the first instance to win the atomic claim does the LML work, and the others see 0 rows updated and move on.

The claim is a single statement:

UPDATE wxyc_schema.flowsheet
SET metadata_status = 'enriching', enriching_since = now()
WHERE id = $1 AND metadata_status = 'pending'
RETURNING id;

If RETURNING is empty, a sibling consumer won — log and skip. If RETURNING returns the row, this consumer owns the enrichment and proceeds to call LML, then UPDATE to the final state ('enriched_match' / 'enriched_no_match' / 'failed_no_retry').

Process death between claim and final UPDATE strands the row at 'enriching'. The Epic C6 cron sweep handles recovery (see C1 for the index that makes the recovery query cheap and the contract that gives stale claims a 60s grace).

Why not N×1 with leader election? Single point of failure that needs its own liveness story. The cron-style pattern is simpler operationally but introduces a routing decision we don't need — the claim is already idempotent.

CDC fan-out preconditions

The N×N cardinality decision above depends on every BS instance receiving every relevant CDC event independently. Today the only CDC consumer is the reconciliation monitor (scripts/sync/reconcile.ts) — single-process. The pattern hasn't been exercised with N simultaneous listeners.

PG's pg_notify broadcasts a NOTIFY to every connection that has issued a matching LISTEN. So if every BS process opens its own LISTEN on the cdc channel at startup, every process should receive every notification. But this needs to be confirmed end-to-end before C5 deletes the runtime fire-and-forget — if any deduplication exists upstream of the consumer (in shared/database/src/cdc-listener.ts, in the WS server at apps/backend/services/cdc/cdc-websocket.ts, or in any pool-shared LISTEN connection), N×N delivery is broken and the consumer would silently miss events.

Audit items, to land before this child closes:

  1. Confirm cdc-listener.ts opens a per-process LISTEN connection (not a shared/pooled one). If it shares the connection across the BS instance's worker threads, that's fine — the pg_notify broadcasts per-connection, not per-thread. But verify no pooling collapses N instances into 1 effective LISTEN.
  2. Confirm cdc-websocket.ts does not deduplicate or coalesce events upstream of the enrichment consumer. (It probably doesn't — its job is to fan out to WS clients — but confirm.)
  3. Document the audited fan-out path inline in the enrichment consumer's file header so the next reader can trust it.

If the audit reveals upstream dedup that's hard to remove, the cardinality decision falls back to N×1 with leader election (originally rejected as too complex but the only correct response to upstream dedup). That should be filed as a separate sub-issue at audit time, not pre-emptively.

Proposal

Build an enrichment consumer that subscribes to CDC events:

/cdc?key=...
  ↓ INSERT events on flowsheet WHERE metadata_status='pending' AND entry_type='track'
EnrichmentConsumer
  ↓ via @wxyc/lml-client (the shared workspace; BS#1000 merged)
LML /api/v1/lookup
  ↓ response
DB UPDATE: write metadata + flip metadata_status
  ↓ liveFs:update SSE broadcast (closes BS#628)
SSE → iOS/dj-site clients re-render

Two deployment shapes to choose from:

Option A — separate worker process: Standalone Node app, apps/enrichment-worker/, runs alongside BS on the same EC2. Independent restart cycle, doesn't compete with HTTP traffic for the event loop. Better isolation, more infra.

Option B — in-BS background task: Lives inside apps/backend/, started by the same npm start. Simpler infra, but shares the event loop with HTTP handlers; a slow LML call can starve other requests if not careful.

Recommend Option A — the bolt-tightening principle is one component per concern.

Idempotency

The consumer must handle the same event arriving twice (CDC at-least-once delivery). Use the same idempotent-WHERE pattern as the cron's enrich.ts:applyEnrichment: UPDATE ... WHERE id = $row.id AND metadata_status = 'pending'. The .returning() length tells whether we won or raced.

Backpressure

LML has a 50/min Discogs rate-limit ceiling and 5 concurrent slots. The consumer must respect both. Wire a local semaphore matching LML's, plus a leaky-bucket rate limiter. If load exceeds capacity, queue events in memory; if memory fills, drop oldest and let the cron pick them up (the safety net's job).

Acceptance

  • apps/enrichment-worker/ builds and deploys.
  • CDC fan-out audit complete: per-process LISTEN confirmed in cdc-listener.ts; no upstream dedup in cdc-websocket.ts or pooling layer; audited path documented in the worker's file header. (Pre-condition for the N×N cardinality decision above.)
  • Integration test, two concurrent consumer instances: run two consumer processes against the same DB; insert a row; assert (a) both processes receive the CDC event, (b) exactly one wins the idempotent claim, (c) the loser logs the no-op cleanly without erroring.
  • CDC subscription receives flowsheet INSERT events.
  • For each event, the consumer calls LML and writes metadata + flips metadata_status.
  • Idempotency test: deliver the same event twice; only one DB UPDATE lands.
  • Backpressure test: 1000 events in 1 second; consumer drains within rate-limit, no events lost (verified via the metric in G3).

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requeststatus:blockedCannot start until a dependency closes

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions