PgAdapter.subscribe: multiplex LISTEN over a shared client#4660
PgAdapter.subscribe: multiplex LISTEN over a shared client#4660
Conversation
Each call to PgAdapter.listen() opens its own dedicated Client
because LISTEN/NOTIFY is unreliable on Pool connections (see
node-postgres#1543). With four LISTEN-using callers planned (file
changes, registry reconciler, pg-queue jobs, and CS-10952's module
cache invalidation), an N-instance fleet pays N*4 listener
connections against `max_connections` for what is observably one
event stream per process.
Add `subscribe(channel, handler) -> { unsubscribe() }` that lazily
opens a single shared Client on first subscribe, multiplexes any
number of channels and per-channel handlers onto it, dispatches
incoming notifications by `notification.channel`, sends
LISTEN/UNLISTEN only on the first/last subscribe per channel, and
closes the shared client in `close()`. Concurrent subscribes await
the same in-flight client-start promise. A handler that throws is
caught so it can't break siblings.
Migrate RealmFileChangesListener (drops its WorkLoop entirely — it
had no periodic work) and RealmRegistryReconciler (keeps its loop
for the periodic reconcile pass; just uses `subscribe` for the LISTEN
half). Both now `await start()` and call `unsubscribe()` on shutdown.
main.ts awaits the new async `start()` calls.
The legacy `listen()` API is kept (with a deprecation comment) so
pg-queue's job runner doesn't have to migrate in the same change;
that's a follow-up.
Reconnect-on-disconnect for the shared client is explicitly out of
scope: the legacy listen() API has the same hazard today, and current
production has not seen it. The shared client's `error` event is
logged loudly so a future occurrence is visible.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR refactors Postgres LISTEN handling so multiple realm-server listeners can share one dedicated notification client instead of each opening its own connection. It fits the server’s cross-process coordination work by reducing steady-state Postgres connection usage while preserving the existing LISTEN/NOTIFY behavior for migrated listeners.
Changes:
- Adds
PgAdapter.subscribe()to multiplex channel handlers over one shared dedicatedClient. - Migrates
RealmFileChangesListenerandRealmRegistryReconcilerto the new subscription API and makes theirstart()methods async. - Adds end-to-end tests for the new subscription API and updates existing listener tests to await startup.
Findings
packages/postgres/pg-adapter.ts:close()does not handle an in-flight#notificationClientStarting, so shutdown can still leave the shared LISTEN client connected after the adapter is marked closed.packages/postgres/pg-adapter.ts: concurrent same-channelsubscribe()calls can get stuck in a broken state if the firstLISTENfails after a second handler was already added to#subscribers.packages/postgres/pg-adapter.ts: storing handlers in aSetbreaks the advertised per-call unsubscribe semantics when the same function reference is subscribed twice on the same channel.packages/realm-server/lib/realm-file-changes-listener.ts:shutDown()does not wait for an in-flightstart(), so shutdown during startup can still leave a live subscription behind.packages/realm-server/lib/realm-registry-reconciler.ts: the reconciler has the same startup/shutdown race and can retain its LISTEN subscription after shutdown is requested.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
packages/realm-server/tests/realm-file-changes-listener-test.ts |
Updates listener tests to await async startup instead of sleeping. |
packages/realm-server/tests/pg-adapter-subscribe-test.ts |
Adds new end-to-end coverage for multiplexed LISTEN subscriptions. |
packages/realm-server/tests/index.ts |
Registers the new subscribe test file in the realm-server suite. |
packages/realm-server/main.ts |
Awaits the reconciler and file-change listener startup during boot. |
packages/realm-server/lib/realm-registry-reconciler.ts |
Switches the reconciler from dedicated listen() usage to shared subscribe(). |
packages/realm-server/lib/realm-file-changes-listener.ts |
Replaces the old WorkLoop-backed LISTEN client with direct shared subscription handling. |
packages/postgres/pg-adapter.ts |
Introduces the shared notification client, multiplexed subscriptions, and legacy listen() deprecation path. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Host Test Results 1 files 1 suites 1h 42m 27s ⏱️ Results for commit 99260ba. Realm Server Test Results 1 files ±0 1 suites ±0 16m 56s ⏱️ -39s Results for commit 99260ba. ± Comparison against earlier commit 5a5c74d. |
Address Copilot review on #4660. Five concurrency bugs from the same root cause — lazy/concurrent state isn't fully serialized against starts and shutdowns. Reworked into a coherent design. Per-channel state changes from `Set<NotificationHandler>` to `{ handlers: HandlerEntry[]; establishment: Promise<void> }`. Each subscribe() pushes its own HandlerEntry and holds the reference, so unsubscribe removes the exact entry — fixing the case where the same function reference is subscribed twice on the same channel and a Set silently collapses them into one (#4660 comment 3). Concurrent subscribes on the same empty channel now share the same in-flight LISTEN-establishment promise. If that LISTEN rejects, every racing subscribe rejects atomically and the channel is torn down for a fresh retry, instead of leaving later subscribers with their handlers in the map but no actual LISTEN backing them (#4660 comment 2). After awaiting establishment, subscribe re-checks that the channel state it joined is still the live one and retries from the top if a concurrent unsubscribe tore it down — the only-correctness-relevant teardown window for an entry-that-never-receives. close() now awaits any in-flight #notificationClientStarting before ending the resulting Client. Without this, a close() that races a first subscribe()'s connect() returns while the connect is still in flight, then subscribe assigns the Client into #notificationClient after close already ran and the connection lives on under a closed adapter (#4660 comment 1). subscribe() also re-checks isClosed after awaiting connect so it fails fast against a closing adapter. RealmFileChangesListener.shutDown() and RealmRegistryReconciler .shutDown() both await any in-flight start() before tearing down, so a shutdown that fires during startup waits for #subscription to be installed and then unsubscribes it (#4660 comments 4 and 5). The reconciler now also tracks #starting so shutDown can join it. Tests: - New "concurrent subscribes on the same empty channel both receive notifications" — exercises the shared-establishment path under Promise.all. - New "subscribing the same handler reference twice yields independent subscriptions" — locks in the array-of-entries fix; A.unsubscribe() must leave B receiving even though their fn refs are identical. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Realm.#notifyFileChange runs in both Node/pg (realm-server) and browser/SQLite (host) contexts. The SQL hits `pg_notify` which SQLite doesn't expose, raising `no such function: pg_notify` on every write. The host runs a single in-process realm, so there are no peers to notify — short-circuit the call entirely. Surfaced as a flake in the merged main code (LISTEN/multiplex PR #4660 widened the call site without gating SQLite). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Realm.#notifyFileChange was issuing raw `SELECT pg_notify(...)` SQL against this.#dbAdapter. In the host/browser context the adapter is SQLite, which has no pg_notify function — every write logged `SQLITE_ERROR: no such function: pg_notify`. The host runs a single in-process realm with no peers to notify, so issuing the SQL was also semantically a no-op even when it didn't error. Push the cross-instance broadcast capability down into the existing DBAdapter abstraction: - Add `notify(channel, payload): Promise<void>` to the DBAdapter interface, documented as best-effort cache-coherency. - PgAdapter implements via `SELECT pg_notify($1, $2)`. - SQLiteAdapter implements as a no-op (no pub/sub primitive, no peers). - Realm.#notifyFileChange calls `this.#dbAdapter.notify(...)` and no longer embeds pg-specific SQL. - Update the realm-server listener test to drive notifications via the new method (also gives PgAdapter.notify direct CI coverage). - Add `notify` to the inline DBAdapter mocks in bot-runner / realm-server / runtime-common tests. Surfaced as flaky test-log noise after PR #4660 (CS-10892: realm_file_changes NOTIFY channel) widened the call site without gating SQLite. Original diagnosis and the kind === 'sqlite' guard fix from Hassan Abdel-Rahman on cs-11036-indexer-resume-progress. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Why
Each call to
PgAdapter.listen()opens its own dedicatedClientbecause LISTEN/NOTIFY is unreliable on Pool connections (see node-postgres#1543). With four LISTEN-using callers planned —RealmFileChangesListenerRealmRegistryReconcilerpg-queuejobs runner— an N-instance fleet pays
N × 4listener connections against Postgresmax_connectionsfor what is observably one notification stream per process. Copilot flagged this on #4644; this PR is the underlying refactor so that PR (and CS-10953's sibling listener) drop in as additional channel handlers on the shared client rather than as additional dedicated connections.What
Add
PgAdapter.subscribe(channel, handler) → { unsubscribe() }:Clienton the first subscribe.notification.channel.LISTEN <channel>only on the first subscriber,UNLISTEN <channel>only when the last unsubscribes.PgAdapter.close().Migrations in this PR:
RealmFileChangesListenerdrops itsWorkLoopentirely — it had no periodic work, only used the loop to keep the dedicated LISTEN connection alive.start()is nowasyncand awaits the subscription;shutDown()callsunsubscribe().RealmRegistryReconcilerkeeps itsWorkLoopfor the periodicreconcile()pass; it usessubscribe()only for the LISTEN half.start()is nowasync.shutDown()shuts the loop down then unsubscribes.main.tsawaits both newly-asyncstart()calls.What this does NOT do
pg-queue'slisten('jobs')is not migrated. Its loop body does periodic job processing and is internal to@cardstack/postgres; migrating it touches more surface than I wanted in one refactor PR. The legacylisten()API is kept (with a deprecation comment) so it keeps working unchanged.listen()API has the same hazard today, and current production has not seen it. The shared client'serrorevent is logged loudly so a future occurrence is visible. Reconnect is a separate concern (and a wider change than this).Test plan
realm-file-changes-listener-test,realm-registry-reconciler-test).pg-adapter-subscribe-test: covers single subscriber, two subscribers same channel, two channels, partial unsubscribe, re-subscribe after the channel was emptied, idempotent unsubscribe, throwing handler doesn't break siblings.Stacks with
subscribe()instead of standing up its own dedicatedClient.subscribe()rather thanlisten().🤖 Generated with Claude Code