From a43bb05ad3be1d7e35084b04935302f9fc1f8370 Mon Sep 17 00:00:00 2001 From: Luke Melia Date: Mon, 4 May 2026 16:30:13 -0400 Subject: [PATCH 1/4] CS-10952: cross-process invalidation broadcast for CachingDefinitionLookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacks on CS-10948's three-counter generation discard. In a single process the counters keep an in-flight prerender from re-INSERTing a row a concurrent invalidate just deleted. Across N processes that protection doesn't span instances: an invalidate on one process doesn't bump peer processes' counters, so a peer's mid-flight prerender can still re-persist pre-invalidation state. This change closes that gap by broadcasting invalidations over a new `module_cache_invalidated` Postgres NOTIFY channel. * runtime-common/definition-lookup.ts: emit one notify per URL in invalidate()'s fan-out (`module::`), one notify for clearRealmCache() (`realm:`), and one notify for clearAllModules() (`global`). Sequenced after the DELETE rather than wrapped in BEGIN/COMMIT — each existing path is a single autocommit DELETE, so sequential pg_notify has the same observable semantics as a same-tx notify. Best-effort try/catch with console.warn, mirroring Realm.#notifyFileChange. Promotes the bump helpers from private to public so the listener can replay them; adds bumpGlobalGeneration. Sqlite/in-memory adapters short-circuit before pg_notify. * realm-server/lib/module-cache-invalidation-listener.ts (new): WorkLoop + PgAdapter.listen pattern, mirroring RealmFileChangesListener exactly. Parses payloads, dispatches to the appropriate bump method on the attached CachingDefinitionLookup. 60s safety poll. Self-notify is idempotent — emitter already bumped synchronously before its DELETE. * realm-server/main.ts: construct + start alongside fileChangesListener, add to shutdown Promise.all. The listener lives in realm-server rather than the path the spec suggested (runtime-common/definition-lookup-coordination.ts) because runtime-common doesn't depend on @cardstack/postgres and adding the dep would be circular (postgres already imports DBAdapter from runtime-common). The MODULE_CACHE_INVALIDATED_CHANNEL constant is exported from runtime-common so consumers in other packages can match it. Behavior at N=1 (today's production) is inert: the emitter has already bumped its counter synchronously before the DELETE, and a self-notify replay is a second monotonic bump that any in-flight snapshot would have already mismatched on the first bump. At N>1 it closes the cross-process half of the persist-after-invalidate race; the only remaining window is the sub-millisecond NOTIFY latency, which self-heals on the next prerender of the same key. Tests: payload-parsing units, dispatch units, and end-to-end pg tests where two CachingDefinitionLookup instances share one PgAdapter and an A.invalidate / A.clearRealmCache / A.clearAllModules drives the right bump on B's recorder (mirroring realm-file-changes-listener-test.ts's shape). Plus a self-notify idempotence test. Linear: https://linear.app/cardstack/issue/CS-10952 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lib/module-cache-invalidation-listener.ts | 162 ++++++ packages/realm-server/main.ts | 17 + packages/realm-server/tests/index.ts | 1 + ...module-cache-invalidation-listener-test.ts | 512 ++++++++++++++++++ packages/runtime-common/definition-lookup.ts | 94 +++- 5 files changed, 780 insertions(+), 6 deletions(-) create mode 100644 packages/realm-server/lib/module-cache-invalidation-listener.ts create mode 100644 packages/realm-server/tests/module-cache-invalidation-listener-test.ts diff --git a/packages/realm-server/lib/module-cache-invalidation-listener.ts b/packages/realm-server/lib/module-cache-invalidation-listener.ts new file mode 100644 index 0000000000..6619f84582 --- /dev/null +++ b/packages/realm-server/lib/module-cache-invalidation-listener.ts @@ -0,0 +1,162 @@ +import { + logger, + MODULE_CACHE_INVALIDATED_CHANNEL, + type CachingDefinitionLookup, +} from '@cardstack/runtime-common'; +import type { PgAdapter } from '@cardstack/postgres'; +import { WorkLoop } from '@cardstack/postgres'; + +const log = logger('realm-server:module-cache-invalidation-listener'); +const DEFAULT_POLL_INTERVAL_MS = 60_000; + +// Cross-instance module-cache invalidation broadcast (CS-10952). Peer +// realm-server processes emit `NOTIFY module_cache_invalidated, ''` +// from CachingDefinitionLookup.invalidate / clearRealmCache / clearAllModules +// after their DELETE commits; this listener parses the payload and replays +// the appropriate generation bump on the locally-attached +// CachingDefinitionLookup so its in-flight prerenders observe the +// invalidation at persist time and discard stale results instead of +// re-inserting the row a peer just deleted. +// +// Mirrors RealmFileChangesListener exactly: dedicated LISTEN connection +// (PgAdapter.listen uses a fresh Client to dodge pool-LISTEN reliability +// issues — see node-postgres#1543), WorkLoop for predictable shutdown, 60s +// safety poll. There's nothing to poll from the DB side — the entire +// dispatch is in the payload — so the wake-loop just sleeps until shutdown. +// +// Self-notify is harmless: the emitting process bumps its counter +// synchronously before its DELETE, so the listener's bump on receiving its +// own NOTIFY is a second bump on a counter that's only used for snapshot +// equality. Idempotent. +export interface ModuleCacheInvalidationListenerDeps { + dbAdapter: PgAdapter; + definitionLookup: CachingDefinitionLookup; + // Optional for tests. + pollIntervalMs?: number; +} + +export type ParsedModuleCacheInvalidation = + | { kind: 'module'; resolvedRealmURL: string; moduleURL: string } + | { kind: 'realm'; resolvedRealmURL: string } + | { kind: 'global' }; + +export class ModuleCacheInvalidationListener { + #deps: ModuleCacheInvalidationListenerDeps; + #loop: WorkLoop; + #started = false; + + constructor(deps: ModuleCacheInvalidationListenerDeps) { + this.#deps = deps; + this.#loop = new WorkLoop( + 'module-cache-invalidation', + deps.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS, + ); + } + + start(): void { + if (this.#started) { + return; + } + this.#started = true; + this.#loop.run(async (loop) => { + await this.#deps.dbAdapter.listen( + MODULE_CACHE_INVALIDATED_CHANNEL, + (notification: { payload?: string }) => { + this.#handleNotification(notification.payload); + }, + async () => { + while (!loop.shuttingDown) { + await loop.sleep(); + } + }, + ); + }); + } + + async shutDown(): Promise { + await this.#loop.shutDown(); + } + + // Exposed for tests; invoked internally by the LISTEN handler. + handleNotification(payload: string | undefined): void { + this.#handleNotification(payload); + } + + #handleNotification(payload: string | undefined): void { + if (!payload) { + return; + } + const parsed = parseModuleCacheInvalidationPayload(payload); + if (!parsed) { + log.warn( + `ignoring malformed ${MODULE_CACHE_INVALIDATED_CHANNEL} payload: ${payload}`, + ); + return; + } + try { + switch (parsed.kind) { + case 'module': + this.#deps.definitionLookup.bumpModuleGeneration( + parsed.resolvedRealmURL, + parsed.moduleURL, + ); + return; + case 'realm': + this.#deps.definitionLookup.bumpRealmGeneration( + parsed.resolvedRealmURL, + ); + return; + case 'global': + this.#deps.definitionLookup.bumpGlobalGeneration(); + return; + } + } catch (err: unknown) { + log.warn( + `bump failed for ${MODULE_CACHE_INVALIDATED_CHANNEL} payload "${payload}": ${String(err)}`, + ); + } + } +} + +// Payload formats emitted by CachingDefinitionLookup invalidation paths: +// `module::` +// `realm:` +// `global` +// +// Realm and module URLs always carry a scheme (`http://`, `https://`) and a +// trailing slash on the realm URL; the discriminator prefix is separated by +// the first `:` that immediately precedes a non-`/` character. We split on +// the first `:` after the kind keyword to keep parsing simple — the kind +// keyword is one of three known values and never contains `:`. +export function parseModuleCacheInvalidationPayload( + payload: string, +): ParsedModuleCacheInvalidation | undefined { + if (payload === 'global') { + return { kind: 'global' }; + } + if (payload.startsWith('realm:')) { + const resolvedRealmURL = payload.slice('realm:'.length); + if (!resolvedRealmURL) { + return undefined; + } + return { kind: 'realm', resolvedRealmURL }; + } + if (payload.startsWith('module:')) { + // After stripping `module:`, the rest is `:`. + // Realm URLs always end in `/`, so the separator is the first `:` that + // immediately follows a `/`. Mirrors realm-file-changes-listener + // parsePayload's separator approach. + const rest = payload.slice('module:'.length); + const match = /\/:/.exec(rest); + if (!match) { + return undefined; + } + const resolvedRealmURL = rest.slice(0, match.index + 1); + const moduleURL = rest.slice(match.index + 2); + if (!resolvedRealmURL || !moduleURL) { + return undefined; + } + return { kind: 'module', resolvedRealmURL, moduleURL }; + } + return undefined; +} diff --git a/packages/realm-server/main.ts b/packages/realm-server/main.ts index eac6fca992..0223266806 100644 --- a/packages/realm-server/main.ts +++ b/packages/realm-server/main.ts @@ -36,6 +36,7 @@ import { type RealmRegistryRow, } from './lib/realm-registry-reconciler'; import { RealmFileChangesListener } from './lib/realm-file-changes-listener'; +import { ModuleCacheInvalidationListener } from './lib/module-cache-invalidation-listener'; import { PUBLISHED_DIRECTORY_NAME } from '@cardstack/runtime-common'; let log = logger('main'); @@ -285,6 +286,9 @@ const getIndexHTML = async () => { let queue = new PgQueuePublisher(dbAdapter); let reconciler: RealmRegistryReconciler | undefined; let fileChangesListener: RealmFileChangesListener | undefined; + let moduleCacheInvalidationListener: + | ModuleCacheInvalidationListener + | undefined; if (workerManagerUrl) { await waitForWorkerManager(workerManagerUrl); @@ -496,6 +500,7 @@ const getIndexHTML = async () => { await Promise.all([ reconciler?.shutDown(), fileChangesListener?.shutDown(), + moduleCacheInvalidationListener?.shutDown(), ]); queue.destroy(); // warning this is async dbAdapter.close(); // warning this is async @@ -572,6 +577,18 @@ const getIndexHTML = async () => { }); await fileChangesListener.start(); + // Cross-instance module-cache invalidation (CS-10952). When a peer + // realm-server emits NOTIFY module_cache_invalidated, replay the bump on + // this instance's CachingDefinitionLookup so its in-flight prerenders + // observe the invalidation at persist time and discard stale results. + // Self-notify is harmless — the emitter already bumped synchronously + // before the DELETE; a second bump from the listener loop is idempotent. + moduleCacheInvalidationListener = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup, + }); + moduleCacheInvalidationListener.start(); + let actualPort = (httpServer.address() as import('net').AddressInfo | null)?.port ?? port; log.info(`Realm server listening on port ${actualPort} is serving realms:`); diff --git a/packages/realm-server/tests/index.ts b/packages/realm-server/tests/index.ts index b23b5df45b..d3609773b4 100644 --- a/packages/realm-server/tests/index.ts +++ b/packages/realm-server/tests/index.ts @@ -194,6 +194,7 @@ import './realm-registry-backfill-test'; import './realm-registry-reconciler-test'; import './realm-registry-writes-test'; import './realm-file-changes-listener-test'; +import './module-cache-invalidation-listener-test'; import './pg-adapter-subscribe-test'; import './realm-endpoints/directory-test'; import './realm-endpoints/info-test'; diff --git a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts new file mode 100644 index 0000000000..8cc9cc6463 --- /dev/null +++ b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts @@ -0,0 +1,512 @@ +import { module, test } from 'qunit'; +import { basename } from 'path'; +import type { PgAdapter } from '@cardstack/postgres'; +import { + CachingDefinitionLookup, + MODULE_CACHE_INVALIDATED_CHANNEL, + param, + query, + type Prerenderer, + type VirtualNetwork, + type RealmPermissions, +} from '@cardstack/runtime-common'; +import { setupDB } from './helpers'; +import { + ModuleCacheInvalidationListener, + parseModuleCacheInvalidationPayload, +} from '../lib/module-cache-invalidation-listener'; + +// Records bump calls on a stub-or-real CachingDefinitionLookup. Used by +// dispatch tests (stub form) and end-to-end tests (wrapping a real lookup +// to keep its bump methods, but recording the calls for assertion). +interface BumpRecorder { + module: Array<{ resolvedRealmURL: string; moduleURL: string }>; + realm: string[]; + global: number; +} +function newRecorder(): BumpRecorder { + return { module: [], realm: [], global: 0 }; +} + +// Minimal stand-in shaped like the bump surface the listener uses. Avoids +// constructing a full CachingDefinitionLookup for the unit-dispatch tests +// where we don't need the prerender / virtual-network plumbing. +function makeStubLookup( + recorder: BumpRecorder, +): CachingDefinitionLookup { + const stub = { + bumpModuleGeneration(resolvedRealmURL: string, moduleURL: string) { + recorder.module.push({ resolvedRealmURL, moduleURL }); + }, + bumpRealmGeneration(resolvedRealmURL: string) { + recorder.realm.push(resolvedRealmURL); + }, + bumpGlobalGeneration() { + recorder.global += 1; + }, + }; + return stub as unknown as CachingDefinitionLookup; +} + +// Real CachingDefinitionLookup wrapped so the originals still run AND each +// bump is recorded. End-to-end tests use this so we can prove the listener +// hit the lookup attached to *this* instance, with the right args. +function recordBumpsOn( + lookup: CachingDefinitionLookup, + recorder: BumpRecorder, +): void { + const originalModule = lookup.bumpModuleGeneration.bind(lookup); + const originalRealm = lookup.bumpRealmGeneration.bind(lookup); + const originalGlobal = lookup.bumpGlobalGeneration.bind(lookup); + lookup.bumpModuleGeneration = (resolvedRealmURL, moduleURL) => { + recorder.module.push({ resolvedRealmURL, moduleURL }); + originalModule(resolvedRealmURL, moduleURL); + }; + lookup.bumpRealmGeneration = (resolvedRealmURL) => { + recorder.realm.push(resolvedRealmURL); + originalRealm(resolvedRealmURL); + }; + lookup.bumpGlobalGeneration = () => { + recorder.global += 1; + originalGlobal(); + }; +} + +// Minimal Prerenderer / VirtualNetwork stubs — these tests never trigger +// lookupDefinition, so the methods only need to typecheck. +const stubPrerenderer: Prerenderer = { + async prerenderModule() { + throw new Error('prerenderModule not used in this test'); + }, + async prerenderVisit() { + throw new Error('prerenderVisit not used in this test'); + }, + async runCommand() { + throw new Error('runCommand not used in this test'); + }, +}; +const stubVirtualNetwork = { + fetch: (async () => { + throw new Error('fetch not used in this test'); + }) as typeof fetch, +} as unknown as VirtualNetwork; +const stubCreatePrerenderAuth = ( + _userId: string, + _permissions: RealmPermissions, +) => 'stub-auth'; + +function waitFor( + getValue: () => T | undefined, + timeoutMs = 3000, + pollMs = 20, +): Promise { + return new Promise((resolve, reject) => { + const started = Date.now(); + const tick = () => { + const value = getValue(); + if (value !== undefined) { + resolve(value); + return; + } + if (Date.now() - started > timeoutMs) { + reject(new Error(`timeout after ${timeoutMs}ms`)); + return; + } + setTimeout(tick, pollMs); + }; + tick(); + }); +} + +module(basename(__filename), function () { + module('parseModuleCacheInvalidationPayload', function () { + test('parses a module: payload with port-bearing realm url', function (assert) { + assert.deepEqual( + parseModuleCacheInvalidationPayload( + 'module:http://localhost:4201/luke/:http://localhost:4201/luke/cards/person.gts', + ), + { + kind: 'module', + resolvedRealmURL: 'http://localhost:4201/luke/', + moduleURL: 'http://localhost:4201/luke/cards/person.gts', + }, + ); + }); + + test('parses a module: payload without a port', function (assert) { + assert.deepEqual( + parseModuleCacheInvalidationPayload( + 'module:https://cardstack.com/base/:https://cardstack.com/base/card-api.gts', + ), + { + kind: 'module', + resolvedRealmURL: 'https://cardstack.com/base/', + moduleURL: 'https://cardstack.com/base/card-api.gts', + }, + ); + }); + + test('parses a realm: payload', function (assert) { + assert.deepEqual( + parseModuleCacheInvalidationPayload( + 'realm:http://localhost:4201/luke/', + ), + { kind: 'realm', resolvedRealmURL: 'http://localhost:4201/luke/' }, + ); + }); + + test('parses a global payload', function (assert) { + assert.deepEqual(parseModuleCacheInvalidationPayload('global'), { + kind: 'global', + }); + }); + + test('returns undefined for an unknown kind prefix', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload('garbage:http://x/'), + undefined, + ); + }); + + test('returns undefined for module: without a /: separator', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload('module:no-separator-here'), + undefined, + ); + }); + + test('returns undefined for an empty realm: payload', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload('realm:'), + undefined, + ); + }); + }); + + module('ModuleCacheInvalidationListener (dispatch)', function () { + test('handleNotification with module: invokes bumpModuleGeneration with parsed args', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification( + 'module:http://x.test/r/:http://x.test/r/cards/foo.gts', + ); + + assert.deepEqual(recorder.module, [ + { + resolvedRealmURL: 'http://x.test/r/', + moduleURL: 'http://x.test/r/cards/foo.gts', + }, + ]); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification with realm: invokes bumpRealmGeneration', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification('realm:http://x.test/r/'); + + assert.deepEqual(recorder.realm, ['http://x.test/r/']); + assert.deepEqual(recorder.module, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification with global invokes bumpGlobalGeneration', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification('global'); + + assert.strictEqual(recorder.global, 1); + assert.deepEqual(recorder.module, []); + assert.deepEqual(recorder.realm, []); + }); + + test('handleNotification ignores a malformed payload without throwing', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification('not-a-valid-payload'); + listener.handleNotification('module:no-separator'); + listener.handleNotification('realm:'); + + assert.deepEqual(recorder.module, []); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification ignores empty/undefined payloads', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification(undefined); + listener.handleNotification(''); + + assert.deepEqual(recorder.module, []); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + }); + + module( + 'ModuleCacheInvalidationListener (LISTEN end-to-end)', + function (hooks) { + let dbAdapter: PgAdapter; + + setupDB(hooks, { + beforeEach: async (adapter) => { + dbAdapter = adapter; + }, + }); + + // The end-to-end scenario: instance A and instance B share a DB. + // A calls invalidate(); A's invalidate() emits NOTIFY + // module_cache_invalidated. B's listener receives the notify and + // calls bumpModuleGeneration on B's CachingDefinitionLookup. Without + // CS-10952, B's counters would lag A's, and a B-side in-flight + // prerender could persist a row A just deleted. + test('A.invalidate(url) → B listener bumps B.bumpModuleGeneration', async function (assert) { + const realmURL = 'http://x.test/peer-invalidate/'; + const instanceB = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + instanceB.registerRealm({ + url: realmURL, + async getRealmOwnerUserId() { + return 'owner-b'; + }, + async visibility() { + return 'private'; + }, + }); + const recorderB = newRecorder(); + recordBumpsOn(instanceB, recorderB); + + const listenerB = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceB, + }); + listenerB.start(); + try { + // Give B's LISTEN connection a moment to subscribe before A + // emits the NOTIFY. Otherwise B will miss the notification + // entirely (and have to fall back to the 60s health poll, which + // doesn't resync state because there's nothing to poll — the + // payload IS the dispatch). + await new Promise((r) => setTimeout(r, 100)); + + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + instanceA.registerRealm({ + url: realmURL, + async getRealmOwnerUserId() { + return 'owner-a'; + }, + async visibility() { + return 'private'; + }, + }); + + await instanceA.invalidate(`${realmURL}cards/foo.gts`); + + const seen = await waitFor(() => + recorderB.module.length > 0 ? recorderB.module : undefined, + ); + // The first bump corresponds to invalidate's fan-out. A's + // moduleURLVariants pass produces multiple URL forms (`.gts`, + // `.ts`, `.gjs`, `.js`, extensionless). The exact URL we + // invalidated is in the list; assert that. + const targetURL = `${realmURL}cards/foo.gts`; + const matched = seen.find((b) => b.moduleURL === targetURL); + assert.ok( + matched, + `peer received bump for ${targetURL}; got ${JSON.stringify(seen)}`, + ); + assert.strictEqual(matched?.resolvedRealmURL, realmURL); + } finally { + await listenerB.shutDown(); + } + }); + + test('A.clearRealmCache(url) → B listener bumps B.bumpRealmGeneration', async function (assert) { + const realmURL = 'http://x.test/peer-clear-realm/'; + const instanceB = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + const recorderB = newRecorder(); + recordBumpsOn(instanceB, recorderB); + + const listenerB = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceB, + }); + listenerB.start(); + try { + await new Promise((r) => setTimeout(r, 100)); + + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + await instanceA.clearRealmCache(realmURL); + + const seen = await waitFor(() => + recorderB.realm.length > 0 ? recorderB.realm : undefined, + ); + assert.deepEqual(seen, [realmURL]); + } finally { + await listenerB.shutDown(); + } + }); + + test('A.clearAllModules() → B listener bumps B.bumpGlobalGeneration', async function (assert) { + const instanceB = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + const recorderB = newRecorder(); + recordBumpsOn(instanceB, recorderB); + + const listenerB = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceB, + }); + listenerB.start(); + try { + await new Promise((r) => setTimeout(r, 100)); + + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + await instanceA.clearAllModules(); + + await waitFor(() => (recorderB.global > 0 ? true : undefined)); + assert.strictEqual( + recorderB.global, + 1, + `peer's global counter bumped exactly once`, + ); + } finally { + await listenerB.shutDown(); + } + }); + + test('self-NOTIFY is harmless: emitter receives its own bump as an idempotent second bump', async function (assert) { + const realmURL = 'http://x.test/self-echo/'; + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + instanceA.registerRealm({ + url: realmURL, + async getRealmOwnerUserId() { + return 'owner-a'; + }, + async visibility() { + return 'private'; + }, + }); + const recorderA = newRecorder(); + recordBumpsOn(instanceA, recorderA); + + const listenerA = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceA, + }); + listenerA.start(); + try { + await new Promise((r) => setTimeout(r, 100)); + + // Synchronously, invalidate() bumps the module generation + // before awaiting the DELETE. Then the DELETE commits, the + // pg_notify fires, and the listener (this same instance) bumps + // again. So we expect to see the local-bump entries plus a + // listener-replay bump for the same URL. + const targetURL = `${realmURL}cards/self.gts`; + await instanceA.invalidate(targetURL); + + // Wait until the listener has had a chance to replay (i.e. + // there are MORE entries than the synchronous invalidate path + // produced). Synchronous invalidate fans out across module + // variants; the listener replay produces one entry per URL + // notified — same set as the synchronous fan-out. So the total + // count after replay is roughly 2x the fan-out, with ≥1 echo + // visible for the original URL. + const targetMatches = await waitFor(() => { + const matches = recorderA.module.filter( + (b) => b.moduleURL === targetURL, + ); + return matches.length >= 2 ? matches : undefined; + }); + assert.ok( + targetMatches.length >= 2, + `expected ≥2 bumps for ${targetURL} (1 synchronous + 1 listener echo); got ${targetMatches.length}`, + ); + } finally { + await listenerA.shutDown(); + } + }); + + test('listener receives a manually-emitted NOTIFY with the channel constant', async function (assert) { + const recorderA = newRecorder(); + const listenerA = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: makeStubLookup(recorderA), + }); + listenerA.start(); + try { + await new Promise((r) => setTimeout(r, 100)); + + await query(dbAdapter, [ + `SELECT pg_notify(`, + param(MODULE_CACHE_INVALIDATED_CHANNEL), + `,`, + param('global'), + `)`, + ]); + + await waitFor(() => (recorderA.global > 0 ? true : undefined)); + assert.strictEqual(recorderA.global, 1); + } finally { + await listenerA.shutDown(); + } + }); + }, + ); +}); diff --git a/packages/runtime-common/definition-lookup.ts b/packages/runtime-common/definition-lookup.ts index e07cd38e37..5a9869ba1c 100644 --- a/packages/runtime-common/definition-lookup.ts +++ b/packages/runtime-common/definition-lookup.ts @@ -38,6 +38,18 @@ import type { VirtualNetwork } from './virtual-network'; const MODULES_TABLE = 'modules'; const PREFERRED_EXECUTABLE_EXTENSIONS = ['.gts', '.ts', '.gjs', '.js']; +// Postgres NOTIFY channel for cross-instance module-cache invalidation +// (CS-10952). Each invalidation path emits one or more notifications so +// peer realm-server processes can bump their in-memory generation counters +// in lockstep with the DB. Payloads: +// `module::` — invalidate(moduleURL) fan-out +// `realm:` — clearRealmCache +// `global` — clearAllModules +// Self-notify is idempotent: the emitting process already bumped its +// counter synchronously before the DB delete, and a second bump on +// listener receive is observationally equivalent (counters are monotonic +// and only used for snapshot equality). +export const MODULE_CACHE_INVALIDATED_CHANNEL = 'module_cache_invalidated'; // Cached module errors expire after this interval. When a stale error entry // is encountered, the prerenderer is called again to get a fresh result. // This prevents transient prerender failures from being permanently cached. @@ -459,10 +471,14 @@ export class CachingDefinitionLookup implements DefinitionLookup { ); } - private bumpModuleGeneration( - resolvedRealmURL: string, - moduleURL: string, - ): void { + // Public so the cross-instance ModuleCacheInvalidationListener (CS-10952) + // can replay an invalidation broadcast from a peer realm-server into this + // process's counters. Internal callers in invalidate() / clearRealmCache() + // use the same methods. Bumping is idempotent w.r.t. correctness — a + // double-bump from the self-notify echo is observationally indistinguishable + // from a single bump because in-flight prerenders only test for snapshot + // equality, not absolute value. + bumpModuleGeneration(resolvedRealmURL: string, moduleURL: string): void { let key = moduleGenerationKey(resolvedRealmURL, moduleURL); this.#moduleGenerations.set( key, @@ -470,13 +486,17 @@ export class CachingDefinitionLookup implements DefinitionLookup { ); } - private bumpRealmGeneration(resolvedRealmURL: string): void { + bumpRealmGeneration(resolvedRealmURL: string): void { this.#realmGenerations.set( resolvedRealmURL, (this.#realmGenerations.get(resolvedRealmURL) ?? 0) + 1, ); } + bumpGlobalGeneration(): void { + this.#globalGeneration += 1; + } + // Returns true if the cached entry has a top-level error and has exceeded // the error TTL. This causes the entry to be treated as a cache miss so // the prerenderer is called again to get a fresh result. This prevents @@ -627,6 +647,10 @@ export class CachingDefinitionLookup implements DefinitionLookup { } this.dropInFlightForRealm(resolvedRealmURL, uniqueInvalidations); await this.deleteModuleAliases(resolvedRealmURL, uniqueInvalidations); + await this.notifyModuleCacheInvalidations( + resolvedRealmURL, + uniqueInvalidations, + ); return uniqueInvalidations; } @@ -643,12 +667,70 @@ export class CachingDefinitionLookup implements DefinitionLookup { ['resolved_realm_url =', param(resolvedRealmURL)], ]) as Expression), ]); + await this.notifyRealmCacheInvalidation(resolvedRealmURL); } async clearAllModules(): Promise { - this.#globalGeneration += 1; + this.bumpGlobalGeneration(); this.#inFlight.clear(); await this.query(['DELETE FROM', MODULES_TABLE]); + await this.notifyGlobalCacheInvalidation(); + } + + // pg_notify emission helpers. Mirror Realm.#notifyFileChange's best-effort + // pattern: the local instance's counters and DB row are already updated + // synchronously before this runs, so a notify failure is a bounded + // cross-instance staleness window — peers self-heal on their next + // prerender of the same key. Sequenced after the DELETE rather than + // wrapped in BEGIN/COMMIT because each invalidation path is a single + // autocommit DELETE; sequential pg_notify after the DELETE has the same + // observable effect (peer sees notify only after delete commits). + // Suppressed for sqlite/in-memory DBAdapters where NOTIFY isn't + // available; the cross-instance scenario only applies to pg. + private async notifyModuleCacheInvalidations( + resolvedRealmURL: string, + moduleURLs: string[], + ): Promise { + if (this.#dbAdapter.kind !== 'pg' || moduleURLs.length === 0) { + return; + } + for (let moduleURL of moduleURLs) { + await this.bestEffortNotify(`module:${resolvedRealmURL}:${moduleURL}`); + } + } + + private async notifyRealmCacheInvalidation( + resolvedRealmURL: string, + ): Promise { + if (this.#dbAdapter.kind !== 'pg') { + return; + } + await this.bestEffortNotify(`realm:${resolvedRealmURL}`); + } + + private async notifyGlobalCacheInvalidation(): Promise { + if (this.#dbAdapter.kind !== 'pg') { + return; + } + await this.bestEffortNotify('global'); + } + + private async bestEffortNotify(payload: string): Promise { + try { + await this.query([ + 'SELECT pg_notify(', + param(MODULE_CACHE_INVALIDATED_CHANNEL), + ',', + param(payload), + ')', + ]); + } catch (err: unknown) { + // Local state is already consistent; cross-instance staleness is + // bounded and self-healing. Don't fail the invalidation. + console.warn( + `pg_notify ${MODULE_CACHE_INVALIDATED_CHANNEL} failed for "${payload}": ${String(err)}`, + ); + } } // Drops in-flight entries whose pending prerender result would no longer From 4a29595f6a1f8ee3e52794249a1d06fe30d633cc Mon Sep 17 00:00:00 2001 From: Luke Melia Date: Mon, 4 May 2026 17:27:14 -0400 Subject: [PATCH 2/4] CS-10952: fix prettier lint on makeStubLookup signature Lint job blocked the PR on a prettier nit (multi-line single-arg signature should be one line). No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/module-cache-invalidation-listener-test.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts index 8cc9cc6463..06938aecdf 100644 --- a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts +++ b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts @@ -31,9 +31,7 @@ function newRecorder(): BumpRecorder { // Minimal stand-in shaped like the bump surface the listener uses. Avoids // constructing a full CachingDefinitionLookup for the unit-dispatch tests // where we don't need the prerender / virtual-network plumbing. -function makeStubLookup( - recorder: BumpRecorder, -): CachingDefinitionLookup { +function makeStubLookup(recorder: BumpRecorder): CachingDefinitionLookup { const stub = { bumpModuleGeneration(resolvedRealmURL: string, moduleURL: string) { recorder.module.push({ resolvedRealmURL, moduleURL }); From 99cf381a539e79985065c3f1dc7d225a9551392a Mon Sep 17 00:00:00 2001 From: Luke Melia Date: Tue, 5 May 2026 11:40:20 -0400 Subject: [PATCH 3/4] CS-10952: batch invalidation NOTIFYs as JSON URL arrays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Copilot review on #4644: notifyModuleCacheInvalidations issued one pg_notify round-trip per URL, which N+1's against invalidate()'s fan-out (the source URL plus extension variants plus transitive consumers from calculateInvalidations). Switch the wire format from `:<...>` strings to JSON. Module fan-out now packs URLs into a single payload of the form {"k":"module","r":,"m":[,...]} and the emitter chunks the list to stay under a 7000-byte safety budget below Postgres's 8000-byte NOTIFY cap. Common case is one notify per invalidate; worst case is a handful of chunks. With M peer processes that turns M*N listener wakeups (and N notification deliveries per peer) into M wakeups carrying the same N bumps. Listener does the same N bumps either way. Realm and global notifications also move to JSON ({"k":"realm",...}, {"k":"global"}) for parser uniformity. Listener parses JSON and dispatches; updated parsing/dispatch unit tests and the manual-NOTIFY end-to-end test cover both single- and multi-URL module payloads, malformed JSON, missing fields, and non-string array entries. Doing this in this PR (rather than a follow-up) so there is never a moment where peers running mixed code disagree on payload format — N=1 in production today, so no cross-process traffic exists yet. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lib/module-cache-invalidation-listener.ts | 88 +++++++----- ...module-cache-invalidation-listener-test.ts | 132 ++++++++++++++---- packages/runtime-common/definition-lookup.ts | 51 ++++++- 3 files changed, 200 insertions(+), 71 deletions(-) diff --git a/packages/realm-server/lib/module-cache-invalidation-listener.ts b/packages/realm-server/lib/module-cache-invalidation-listener.ts index 6619f84582..2c1f8f0bab 100644 --- a/packages/realm-server/lib/module-cache-invalidation-listener.ts +++ b/packages/realm-server/lib/module-cache-invalidation-listener.ts @@ -36,7 +36,7 @@ export interface ModuleCacheInvalidationListenerDeps { } export type ParsedModuleCacheInvalidation = - | { kind: 'module'; resolvedRealmURL: string; moduleURL: string } + | { kind: 'module'; resolvedRealmURL: string; moduleURLs: string[] } | { kind: 'realm'; resolvedRealmURL: string } | { kind: 'global' }; @@ -96,10 +96,12 @@ export class ModuleCacheInvalidationListener { try { switch (parsed.kind) { case 'module': - this.#deps.definitionLookup.bumpModuleGeneration( - parsed.resolvedRealmURL, - parsed.moduleURL, - ); + for (const moduleURL of parsed.moduleURLs) { + this.#deps.definitionLookup.bumpModuleGeneration( + parsed.resolvedRealmURL, + moduleURL, + ); + } return; case 'realm': this.#deps.definitionLookup.bumpRealmGeneration( @@ -118,45 +120,57 @@ export class ModuleCacheInvalidationListener { } } -// Payload formats emitted by CachingDefinitionLookup invalidation paths: -// `module::` -// `realm:` -// `global` +// Payload formats emitted by CachingDefinitionLookup invalidation paths +// (JSON-encoded): +// {"k":"module","r":,"m":[,...]} +// {"k":"realm","r":} +// {"k":"global"} // -// Realm and module URLs always carry a scheme (`http://`, `https://`) and a -// trailing slash on the realm URL; the discriminator prefix is separated by -// the first `:` that immediately precedes a non-`/` character. We split on -// the first `:` after the kind keyword to keep parsing simple — the kind -// keyword is one of three known values and never contains `:`. +// Module fan-out is batched into a single payload (chunked at the emitter +// to stay under Postgres's 8000-byte NOTIFY payload cap) so one invalidate() +// produces one notify per chunk instead of one per URL. export function parseModuleCacheInvalidationPayload( payload: string, ): ParsedModuleCacheInvalidation | undefined { - if (payload === 'global') { - return { kind: 'global' }; + let parsed: unknown; + try { + parsed = JSON.parse(payload); + } catch { + return undefined; } - if (payload.startsWith('realm:')) { - const resolvedRealmURL = payload.slice('realm:'.length); - if (!resolvedRealmURL) { - return undefined; - } - return { kind: 'realm', resolvedRealmURL }; + if (!parsed || typeof parsed !== 'object') { + return undefined; } - if (payload.startsWith('module:')) { - // After stripping `module:`, the rest is `:`. - // Realm URLs always end in `/`, so the separator is the first `:` that - // immediately follows a `/`. Mirrors realm-file-changes-listener - // parsePayload's separator approach. - const rest = payload.slice('module:'.length); - const match = /\/:/.exec(rest); - if (!match) { - return undefined; + const obj = parsed as Record; + switch (obj.k) { + case 'module': { + const resolvedRealmURL = obj.r; + const moduleURLs = obj.m; + if (typeof resolvedRealmURL !== 'string' || !resolvedRealmURL) { + return undefined; + } + if (!Array.isArray(moduleURLs) || moduleURLs.length === 0) { + return undefined; + } + const urls: string[] = []; + for (const url of moduleURLs) { + if (typeof url !== 'string' || !url) { + return undefined; + } + urls.push(url); + } + return { kind: 'module', resolvedRealmURL, moduleURLs: urls }; } - const resolvedRealmURL = rest.slice(0, match.index + 1); - const moduleURL = rest.slice(match.index + 2); - if (!resolvedRealmURL || !moduleURL) { - return undefined; + case 'realm': { + const resolvedRealmURL = obj.r; + if (typeof resolvedRealmURL !== 'string' || !resolvedRealmURL) { + return undefined; + } + return { kind: 'realm', resolvedRealmURL }; } - return { kind: 'module', resolvedRealmURL, moduleURL }; + case 'global': + return { kind: 'global' }; + default: + return undefined; } - return undefined; } diff --git a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts index 06938aecdf..eb8ec4a498 100644 --- a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts +++ b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts @@ -118,71 +118,114 @@ function waitFor( module(basename(__filename), function () { module('parseModuleCacheInvalidationPayload', function () { - test('parses a module: payload with port-bearing realm url', function (assert) { + test('parses a module payload carrying a single URL', function (assert) { assert.deepEqual( parseModuleCacheInvalidationPayload( - 'module:http://localhost:4201/luke/:http://localhost:4201/luke/cards/person.gts', + JSON.stringify({ + k: 'module', + r: 'http://localhost:4201/luke/', + m: ['http://localhost:4201/luke/cards/person.gts'], + }), ), { kind: 'module', resolvedRealmURL: 'http://localhost:4201/luke/', - moduleURL: 'http://localhost:4201/luke/cards/person.gts', + moduleURLs: ['http://localhost:4201/luke/cards/person.gts'], }, ); }); - test('parses a module: payload without a port', function (assert) { + test('parses a module payload carrying many URLs', function (assert) { + const urls = [ + 'https://cardstack.com/base/card-api.gts', + 'https://cardstack.com/base/string.gts', + 'https://cardstack.com/base/number.gts', + ]; assert.deepEqual( parseModuleCacheInvalidationPayload( - 'module:https://cardstack.com/base/:https://cardstack.com/base/card-api.gts', + JSON.stringify({ + k: 'module', + r: 'https://cardstack.com/base/', + m: urls, + }), ), { kind: 'module', resolvedRealmURL: 'https://cardstack.com/base/', - moduleURL: 'https://cardstack.com/base/card-api.gts', + moduleURLs: urls, }, ); }); - test('parses a realm: payload', function (assert) { + test('parses a realm payload', function (assert) { assert.deepEqual( parseModuleCacheInvalidationPayload( - 'realm:http://localhost:4201/luke/', + JSON.stringify({ k: 'realm', r: 'http://localhost:4201/luke/' }), ), { kind: 'realm', resolvedRealmURL: 'http://localhost:4201/luke/' }, ); }); test('parses a global payload', function (assert) { - assert.deepEqual(parseModuleCacheInvalidationPayload('global'), { - kind: 'global', - }); + assert.deepEqual( + parseModuleCacheInvalidationPayload(JSON.stringify({ k: 'global' })), + { kind: 'global' }, + ); + }); + + test('returns undefined for non-JSON', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload('not-json'), + undefined, + ); }); - test('returns undefined for an unknown kind prefix', function (assert) { + test('returns undefined for an unknown kind', function (assert) { assert.strictEqual( - parseModuleCacheInvalidationPayload('garbage:http://x/'), + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'garbage', r: 'http://x/' }), + ), + undefined, + ); + }); + + test('returns undefined for module payload missing realm url', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'module', r: '', m: ['http://x/foo.gts'] }), + ), + undefined, + ); + }); + + test('returns undefined for module payload with empty url list', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'module', r: 'http://x/', m: [] }), + ), undefined, ); }); - test('returns undefined for module: without a /: separator', function (assert) { + test('returns undefined for module payload with non-string url entry', function (assert) { assert.strictEqual( - parseModuleCacheInvalidationPayload('module:no-separator-here'), + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'module', r: 'http://x/', m: [123] }), + ), undefined, ); }); - test('returns undefined for an empty realm: payload', function (assert) { + test('returns undefined for an empty realm payload', function (assert) { assert.strictEqual( - parseModuleCacheInvalidationPayload('realm:'), + parseModuleCacheInvalidationPayload(JSON.stringify({ k: 'realm' })), undefined, ); }); }); module('ModuleCacheInvalidationListener (dispatch)', function () { - test('handleNotification with module: invokes bumpModuleGeneration with parsed args', function (assert) { + test('handleNotification with single-URL module payload bumps once', function (assert) { const recorder = newRecorder(); const listener = new ModuleCacheInvalidationListener({ dbAdapter: {} as unknown as PgAdapter, @@ -190,7 +233,11 @@ module(basename(__filename), function () { }); listener.handleNotification( - 'module:http://x.test/r/:http://x.test/r/cards/foo.gts', + JSON.stringify({ + k: 'module', + r: 'http://x.test/r/', + m: ['http://x.test/r/cards/foo.gts'], + }), ); assert.deepEqual(recorder.module, [ @@ -203,28 +250,57 @@ module(basename(__filename), function () { assert.strictEqual(recorder.global, 0); }); - test('handleNotification with realm: invokes bumpRealmGeneration', function (assert) { + test('handleNotification with multi-URL module payload bumps once per URL', function (assert) { const recorder = newRecorder(); const listener = new ModuleCacheInvalidationListener({ dbAdapter: {} as unknown as PgAdapter, definitionLookup: makeStubLookup(recorder), }); - listener.handleNotification('realm:http://x.test/r/'); + const urls = [ + 'http://x.test/r/cards/foo.gts', + 'http://x.test/r/cards/bar.gts', + 'http://x.test/r/cards/baz.gts', + ]; + listener.handleNotification( + JSON.stringify({ k: 'module', r: 'http://x.test/r/', m: urls }), + ); + + assert.deepEqual( + recorder.module, + urls.map((moduleURL) => ({ + resolvedRealmURL: 'http://x.test/r/', + moduleURL, + })), + ); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification with realm payload bumps bumpRealmGeneration', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification( + JSON.stringify({ k: 'realm', r: 'http://x.test/r/' }), + ); assert.deepEqual(recorder.realm, ['http://x.test/r/']); assert.deepEqual(recorder.module, []); assert.strictEqual(recorder.global, 0); }); - test('handleNotification with global invokes bumpGlobalGeneration', function (assert) { + test('handleNotification with global payload bumps bumpGlobalGeneration', function (assert) { const recorder = newRecorder(); const listener = new ModuleCacheInvalidationListener({ dbAdapter: {} as unknown as PgAdapter, definitionLookup: makeStubLookup(recorder), }); - listener.handleNotification('global'); + listener.handleNotification(JSON.stringify({ k: 'global' })); assert.strictEqual(recorder.global, 1); assert.deepEqual(recorder.module, []); @@ -238,9 +314,11 @@ module(basename(__filename), function () { definitionLookup: makeStubLookup(recorder), }); - listener.handleNotification('not-a-valid-payload'); - listener.handleNotification('module:no-separator'); - listener.handleNotification('realm:'); + listener.handleNotification('not-json'); + listener.handleNotification( + JSON.stringify({ k: 'module', r: 'http://x.test/r/' }), + ); + listener.handleNotification(JSON.stringify({ k: 'realm' })); assert.deepEqual(recorder.module, []); assert.deepEqual(recorder.realm, []); @@ -495,7 +573,7 @@ module(basename(__filename), function () { `SELECT pg_notify(`, param(MODULE_CACHE_INVALIDATED_CHANNEL), `,`, - param('global'), + param(JSON.stringify({ k: 'global' })), `)`, ]); diff --git a/packages/runtime-common/definition-lookup.ts b/packages/runtime-common/definition-lookup.ts index 5a9869ba1c..2de6923020 100644 --- a/packages/runtime-common/definition-lookup.ts +++ b/packages/runtime-common/definition-lookup.ts @@ -41,15 +41,18 @@ const PREFERRED_EXECUTABLE_EXTENSIONS = ['.gts', '.ts', '.gjs', '.js']; // Postgres NOTIFY channel for cross-instance module-cache invalidation // (CS-10952). Each invalidation path emits one or more notifications so // peer realm-server processes can bump their in-memory generation counters -// in lockstep with the DB. Payloads: -// `module::` — invalidate(moduleURL) fan-out -// `realm:` — clearRealmCache -// `global` — clearAllModules +// in lockstep with the DB. Payload is JSON; one of: +// {"k":"module","r":,"m":[,...]} — invalidate fan-out +// {"k":"realm","r":} — clearRealmCache +// {"k":"global"} — clearAllModules // Self-notify is idempotent: the emitting process already bumped its // counter synchronously before the DB delete, and a second bump on // listener receive is observationally equivalent (counters are monotonic // and only used for snapshot equality). export const MODULE_CACHE_INVALIDATED_CHANNEL = 'module_cache_invalidated'; +// Postgres caps NOTIFY payloads at 8000 bytes; stay well under so JSON +// encoding overhead and pathological URL lengths don't blow the limit. +const NOTIFY_PAYLOAD_BUDGET = 7000; // Cached module errors expire after this interval. When a stale error entry // is encountered, the prerenderer is called again to get a fresh result. // This prevents transient prerender failures from being permanently cached. @@ -687,6 +690,15 @@ export class CachingDefinitionLookup implements DefinitionLookup { // observable effect (peer sees notify only after delete commits). // Suppressed for sqlite/in-memory DBAdapters where NOTIFY isn't // available; the cross-instance scenario only applies to pg. + // + // Payloads are JSON-encoded so a single invalidate() fan-out (which can + // include the source URL plus its extension variants plus transitive + // consumers from calculateInvalidations()) emits one pg_notify carrying + // the full URL list instead of one per URL. With M peer processes that + // turns M*N listener wakeups into M; the listener does the same N bumps + // either way. Postgres caps NOTIFY payloads at 8000 bytes, so the module + // emitter chunks the URL list to stay under a 7000-byte budget — common + // case is one notify; pathological fan-out becomes a handful. private async notifyModuleCacheInvalidations( resolvedRealmURL: string, moduleURLs: string[], @@ -694,9 +706,32 @@ export class CachingDefinitionLookup implements DefinitionLookup { if (this.#dbAdapter.kind !== 'pg' || moduleURLs.length === 0) { return; } + const wrapperBytes = JSON.stringify({ + k: 'module', + r: resolvedRealmURL, + m: [], + }).length; + const budget = NOTIFY_PAYLOAD_BUDGET - wrapperBytes; + let chunk: string[] = []; + let chunkBytes = 0; + const flush = async () => { + if (chunk.length === 0) return; + await this.bestEffortNotify( + JSON.stringify({ k: 'module', r: resolvedRealmURL, m: chunk }), + ); + chunk = []; + chunkBytes = 0; + }; for (let moduleURL of moduleURLs) { - await this.bestEffortNotify(`module:${resolvedRealmURL}:${moduleURL}`); + const encodedLen = JSON.stringify(moduleURL).length; + const addedCost = encodedLen + (chunk.length === 0 ? 0 : 1); + if (chunkBytes + addedCost > budget && chunk.length > 0) { + await flush(); + } + chunk.push(moduleURL); + chunkBytes += chunk.length === 1 ? encodedLen : addedCost; } + await flush(); } private async notifyRealmCacheInvalidation( @@ -705,14 +740,16 @@ export class CachingDefinitionLookup implements DefinitionLookup { if (this.#dbAdapter.kind !== 'pg') { return; } - await this.bestEffortNotify(`realm:${resolvedRealmURL}`); + await this.bestEffortNotify( + JSON.stringify({ k: 'realm', r: resolvedRealmURL }), + ); } private async notifyGlobalCacheInvalidation(): Promise { if (this.#dbAdapter.kind !== 'pg') { return; } - await this.bestEffortNotify('global'); + await this.bestEffortNotify(JSON.stringify({ k: 'global' })); } private async bestEffortNotify(payload: string): Promise { From 524b2b815f854e6797c3934557c1f9b3689cf93a Mon Sep 17 00:00:00 2001 From: Luke Melia Date: Tue, 5 May 2026 19:37:02 -0400 Subject: [PATCH 4/4] CS-10952: migrate listener to PgAdapter.subscribe (post-multiplex) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CS-10952 (multiplex LISTEN over a shared client) landed on main, so `PgAdapter.subscribe(channel, handler) → { unsubscribe }` now exists and `PgAdapter.listen(...)` is deprecated. `RealmFileChangesListener` in main has already moved to the new API; mirror the same shape here. Drops: - `WorkLoop` dependency (was only there to keep the dedicated Client connection alive between notifications). - `pollIntervalMs` / `DEFAULT_POLL_INTERVAL_MS` constructor option (no periodic work — the dispatch is in the payload). Adds: - `#subscription: NotificationSubscription | undefined` - `#starting: Promise | undefined` race guard so a `shutDown()` mid-startup waits for `subscribe()` to finish wiring up before unsubscribing (otherwise the racing start() installs a live subscription after we thought we were down). `start()` is now async — wait for `subscribe()` to complete the LISTEN before returning. `main.ts` and the end-to-end tests `await` it. Tests no longer need the 100 ms post-start sleep that was previously there to give the dedicated connection a chance to actually subscribe. Behavior is identical from outside the class. The self-notify-idempotence semantics in the doc comment still hold: the emitting process bumps synchronously before its DELETE; the listener's bump on receiving its own NOTIFY is a second monotonic bump on a snapshot-equality counter. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lib/module-cache-invalidation-listener.ts | 62 ++++++++++--------- packages/realm-server/main.ts | 2 +- ...module-cache-invalidation-listener-test.ts | 25 ++------ 3 files changed, 39 insertions(+), 50 deletions(-) diff --git a/packages/realm-server/lib/module-cache-invalidation-listener.ts b/packages/realm-server/lib/module-cache-invalidation-listener.ts index 2c1f8f0bab..a13c012f99 100644 --- a/packages/realm-server/lib/module-cache-invalidation-listener.ts +++ b/packages/realm-server/lib/module-cache-invalidation-listener.ts @@ -3,11 +3,9 @@ import { MODULE_CACHE_INVALIDATED_CHANNEL, type CachingDefinitionLookup, } from '@cardstack/runtime-common'; -import type { PgAdapter } from '@cardstack/postgres'; -import { WorkLoop } from '@cardstack/postgres'; +import type { PgAdapter, NotificationSubscription } from '@cardstack/postgres'; const log = logger('realm-server:module-cache-invalidation-listener'); -const DEFAULT_POLL_INTERVAL_MS = 60_000; // Cross-instance module-cache invalidation broadcast (CS-10952). Peer // realm-server processes emit `NOTIFY module_cache_invalidated, ''` @@ -18,11 +16,10 @@ const DEFAULT_POLL_INTERVAL_MS = 60_000; // invalidation at persist time and discard stale results instead of // re-inserting the row a peer just deleted. // -// Mirrors RealmFileChangesListener exactly: dedicated LISTEN connection -// (PgAdapter.listen uses a fresh Client to dodge pool-LISTEN reliability -// issues — see node-postgres#1543), WorkLoop for predictable shutdown, 60s -// safety poll. There's nothing to poll from the DB side — the entire -// dispatch is in the payload — so the wake-loop just sleeps until shutdown. +// The LISTEN is backed by `PgAdapter.subscribe` (shared multiplexed +// notification client). There is no periodic work to run between +// notifications — the whole dispatch is in the payload — so we don't keep a +// WorkLoop here. Mirrors `RealmFileChangesListener`. // // Self-notify is harmless: the emitting process bumps its counter // synchronously before its DELETE, so the listener's bump on receiving its @@ -31,8 +28,6 @@ const DEFAULT_POLL_INTERVAL_MS = 60_000; export interface ModuleCacheInvalidationListenerDeps { dbAdapter: PgAdapter; definitionLookup: CachingDefinitionLookup; - // Optional for tests. - pollIntervalMs?: number; } export type ParsedModuleCacheInvalidation = @@ -42,39 +37,48 @@ export type ParsedModuleCacheInvalidation = export class ModuleCacheInvalidationListener { #deps: ModuleCacheInvalidationListenerDeps; - #loop: WorkLoop; - #started = false; + #subscription?: NotificationSubscription; + #starting?: Promise; constructor(deps: ModuleCacheInvalidationListenerDeps) { this.#deps = deps; - this.#loop = new WorkLoop( - 'module-cache-invalidation', - deps.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS, - ); } - start(): void { - if (this.#started) { + async start(): Promise { + if (this.#subscription || this.#starting) { + await this.#starting; return; } - this.#started = true; - this.#loop.run(async (loop) => { - await this.#deps.dbAdapter.listen( + this.#starting = (async () => { + this.#subscription = await this.#deps.dbAdapter.subscribe( MODULE_CACHE_INVALIDATED_CHANNEL, - (notification: { payload?: string }) => { + (notification) => { this.#handleNotification(notification.payload); }, - async () => { - while (!loop.shuttingDown) { - await loop.sleep(); - } - }, ); - }); + })(); + try { + await this.#starting; + } finally { + this.#starting = undefined; + } } async shutDown(): Promise { - await this.#loop.shutDown(); + // Wait for any in-flight start() to finish wiring up #subscription before + // tearing down. Otherwise shutDown can run while subscribe() is still + // awaiting the LISTEN, return early with #subscription still undefined, + // and the racing start() then installs a live subscription after we + // thought we were shut down. Swallow start() errors here — if startup + // failed, there's nothing for us to unsubscribe. + try { + await this.#starting; + } catch { + // ignore + } + const sub = this.#subscription; + this.#subscription = undefined; + await sub?.unsubscribe(); } // Exposed for tests; invoked internally by the LISTEN handler. diff --git a/packages/realm-server/main.ts b/packages/realm-server/main.ts index 0223266806..b153367741 100644 --- a/packages/realm-server/main.ts +++ b/packages/realm-server/main.ts @@ -587,7 +587,7 @@ const getIndexHTML = async () => { dbAdapter, definitionLookup, }); - moduleCacheInvalidationListener.start(); + await moduleCacheInvalidationListener.start(); let actualPort = (httpServer.address() as import('net').AddressInfo | null)?.port ?? port; diff --git a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts index eb8ec4a498..5072c24cd6 100644 --- a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts +++ b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts @@ -382,15 +382,8 @@ module(basename(__filename), function () { dbAdapter, definitionLookup: instanceB, }); - listenerB.start(); + await listenerB.start(); try { - // Give B's LISTEN connection a moment to subscribe before A - // emits the NOTIFY. Otherwise B will miss the notification - // entirely (and have to fall back to the 60s health poll, which - // doesn't resync state because there's nothing to poll — the - // payload IS the dispatch). - await new Promise((r) => setTimeout(r, 100)); - const instanceA = new CachingDefinitionLookup( dbAdapter, stubPrerenderer, @@ -443,10 +436,8 @@ module(basename(__filename), function () { dbAdapter, definitionLookup: instanceB, }); - listenerB.start(); + await listenerB.start(); try { - await new Promise((r) => setTimeout(r, 100)); - const instanceA = new CachingDefinitionLookup( dbAdapter, stubPrerenderer, @@ -478,10 +469,8 @@ module(basename(__filename), function () { dbAdapter, definitionLookup: instanceB, }); - listenerB.start(); + await listenerB.start(); try { - await new Promise((r) => setTimeout(r, 100)); - const instanceA = new CachingDefinitionLookup( dbAdapter, stubPrerenderer, @@ -525,10 +514,8 @@ module(basename(__filename), function () { dbAdapter, definitionLookup: instanceA, }); - listenerA.start(); + await listenerA.start(); try { - await new Promise((r) => setTimeout(r, 100)); - // Synchronously, invalidate() bumps the module generation // before awaiting the DELETE. Then the DELETE commits, the // pg_notify fires, and the listener (this same instance) bumps @@ -565,10 +552,8 @@ module(basename(__filename), function () { dbAdapter, definitionLookup: makeStubLookup(recorderA), }); - listenerA.start(); + await listenerA.start(); try { - await new Promise((r) => setTimeout(r, 100)); - await query(dbAdapter, [ `SELECT pg_notify(`, param(MODULE_CACHE_INVALIDATED_CHANNEL),