diff --git a/packages/realm-server/lib/module-cache-invalidation-listener.ts b/packages/realm-server/lib/module-cache-invalidation-listener.ts new file mode 100644 index 0000000000..a13c012f99 --- /dev/null +++ b/packages/realm-server/lib/module-cache-invalidation-listener.ts @@ -0,0 +1,180 @@ +import { + logger, + MODULE_CACHE_INVALIDATED_CHANNEL, + type CachingDefinitionLookup, +} from '@cardstack/runtime-common'; +import type { PgAdapter, NotificationSubscription } from '@cardstack/postgres'; + +const log = logger('realm-server:module-cache-invalidation-listener'); + +// Cross-instance module-cache invalidation broadcast (CS-10952). Peer +// realm-server processes emit `NOTIFY module_cache_invalidated, ''` +// from CachingDefinitionLookup.invalidate / clearRealmCache / clearAllModules +// after their DELETE commits; this listener parses the payload and replays +// the appropriate generation bump on the locally-attached +// CachingDefinitionLookup so its in-flight prerenders observe the +// invalidation at persist time and discard stale results instead of +// re-inserting the row a peer just deleted. +// +// The LISTEN is backed by `PgAdapter.subscribe` (shared multiplexed +// notification client). There is no periodic work to run between +// notifications — the whole dispatch is in the payload — so we don't keep a +// WorkLoop here. Mirrors `RealmFileChangesListener`. +// +// Self-notify is harmless: the emitting process bumps its counter +// synchronously before its DELETE, so the listener's bump on receiving its +// own NOTIFY is a second bump on a counter that's only used for snapshot +// equality. Idempotent. +export interface ModuleCacheInvalidationListenerDeps { + dbAdapter: PgAdapter; + definitionLookup: CachingDefinitionLookup; +} + +export type ParsedModuleCacheInvalidation = + | { kind: 'module'; resolvedRealmURL: string; moduleURLs: string[] } + | { kind: 'realm'; resolvedRealmURL: string } + | { kind: 'global' }; + +export class ModuleCacheInvalidationListener { + #deps: ModuleCacheInvalidationListenerDeps; + #subscription?: NotificationSubscription; + #starting?: Promise; + + constructor(deps: ModuleCacheInvalidationListenerDeps) { + this.#deps = deps; + } + + async start(): Promise { + if (this.#subscription || this.#starting) { + await this.#starting; + return; + } + this.#starting = (async () => { + this.#subscription = await this.#deps.dbAdapter.subscribe( + MODULE_CACHE_INVALIDATED_CHANNEL, + (notification) => { + this.#handleNotification(notification.payload); + }, + ); + })(); + try { + await this.#starting; + } finally { + this.#starting = undefined; + } + } + + async shutDown(): Promise { + // Wait for any in-flight start() to finish wiring up #subscription before + // tearing down. Otherwise shutDown can run while subscribe() is still + // awaiting the LISTEN, return early with #subscription still undefined, + // and the racing start() then installs a live subscription after we + // thought we were shut down. Swallow start() errors here — if startup + // failed, there's nothing for us to unsubscribe. + try { + await this.#starting; + } catch { + // ignore + } + const sub = this.#subscription; + this.#subscription = undefined; + await sub?.unsubscribe(); + } + + // Exposed for tests; invoked internally by the LISTEN handler. + handleNotification(payload: string | undefined): void { + this.#handleNotification(payload); + } + + #handleNotification(payload: string | undefined): void { + if (!payload) { + return; + } + const parsed = parseModuleCacheInvalidationPayload(payload); + if (!parsed) { + log.warn( + `ignoring malformed ${MODULE_CACHE_INVALIDATED_CHANNEL} payload: ${payload}`, + ); + return; + } + try { + switch (parsed.kind) { + case 'module': + for (const moduleURL of parsed.moduleURLs) { + this.#deps.definitionLookup.bumpModuleGeneration( + parsed.resolvedRealmURL, + moduleURL, + ); + } + return; + case 'realm': + this.#deps.definitionLookup.bumpRealmGeneration( + parsed.resolvedRealmURL, + ); + return; + case 'global': + this.#deps.definitionLookup.bumpGlobalGeneration(); + return; + } + } catch (err: unknown) { + log.warn( + `bump failed for ${MODULE_CACHE_INVALIDATED_CHANNEL} payload "${payload}": ${String(err)}`, + ); + } + } +} + +// Payload formats emitted by CachingDefinitionLookup invalidation paths +// (JSON-encoded): +// {"k":"module","r":,"m":[,...]} +// {"k":"realm","r":} +// {"k":"global"} +// +// Module fan-out is batched into a single payload (chunked at the emitter +// to stay under Postgres's 8000-byte NOTIFY payload cap) so one invalidate() +// produces one notify per chunk instead of one per URL. +export function parseModuleCacheInvalidationPayload( + payload: string, +): ParsedModuleCacheInvalidation | undefined { + let parsed: unknown; + try { + parsed = JSON.parse(payload); + } catch { + return undefined; + } + if (!parsed || typeof parsed !== 'object') { + return undefined; + } + const obj = parsed as Record; + switch (obj.k) { + case 'module': { + const resolvedRealmURL = obj.r; + const moduleURLs = obj.m; + if (typeof resolvedRealmURL !== 'string' || !resolvedRealmURL) { + return undefined; + } + if (!Array.isArray(moduleURLs) || moduleURLs.length === 0) { + return undefined; + } + const urls: string[] = []; + for (const url of moduleURLs) { + if (typeof url !== 'string' || !url) { + return undefined; + } + urls.push(url); + } + return { kind: 'module', resolvedRealmURL, moduleURLs: urls }; + } + case 'realm': { + const resolvedRealmURL = obj.r; + if (typeof resolvedRealmURL !== 'string' || !resolvedRealmURL) { + return undefined; + } + return { kind: 'realm', resolvedRealmURL }; + } + case 'global': + return { kind: 'global' }; + default: + return undefined; + } +} diff --git a/packages/realm-server/main.ts b/packages/realm-server/main.ts index eac6fca992..b153367741 100644 --- a/packages/realm-server/main.ts +++ b/packages/realm-server/main.ts @@ -36,6 +36,7 @@ import { type RealmRegistryRow, } from './lib/realm-registry-reconciler'; import { RealmFileChangesListener } from './lib/realm-file-changes-listener'; +import { ModuleCacheInvalidationListener } from './lib/module-cache-invalidation-listener'; import { PUBLISHED_DIRECTORY_NAME } from '@cardstack/runtime-common'; let log = logger('main'); @@ -285,6 +286,9 @@ const getIndexHTML = async () => { let queue = new PgQueuePublisher(dbAdapter); let reconciler: RealmRegistryReconciler | undefined; let fileChangesListener: RealmFileChangesListener | undefined; + let moduleCacheInvalidationListener: + | ModuleCacheInvalidationListener + | undefined; if (workerManagerUrl) { await waitForWorkerManager(workerManagerUrl); @@ -496,6 +500,7 @@ const getIndexHTML = async () => { await Promise.all([ reconciler?.shutDown(), fileChangesListener?.shutDown(), + moduleCacheInvalidationListener?.shutDown(), ]); queue.destroy(); // warning this is async dbAdapter.close(); // warning this is async @@ -572,6 +577,18 @@ const getIndexHTML = async () => { }); await fileChangesListener.start(); + // Cross-instance module-cache invalidation (CS-10952). When a peer + // realm-server emits NOTIFY module_cache_invalidated, replay the bump on + // this instance's CachingDefinitionLookup so its in-flight prerenders + // observe the invalidation at persist time and discard stale results. + // Self-notify is harmless — the emitter already bumped synchronously + // before the DELETE; a second bump from the listener loop is idempotent. + moduleCacheInvalidationListener = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup, + }); + await moduleCacheInvalidationListener.start(); + let actualPort = (httpServer.address() as import('net').AddressInfo | null)?.port ?? port; log.info(`Realm server listening on port ${actualPort} is serving realms:`); diff --git a/packages/realm-server/tests/index.ts b/packages/realm-server/tests/index.ts index b23b5df45b..d3609773b4 100644 --- a/packages/realm-server/tests/index.ts +++ b/packages/realm-server/tests/index.ts @@ -194,6 +194,7 @@ import './realm-registry-backfill-test'; import './realm-registry-reconciler-test'; import './realm-registry-writes-test'; import './realm-file-changes-listener-test'; +import './module-cache-invalidation-listener-test'; import './pg-adapter-subscribe-test'; import './realm-endpoints/directory-test'; import './realm-endpoints/info-test'; diff --git a/packages/realm-server/tests/module-cache-invalidation-listener-test.ts b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts new file mode 100644 index 0000000000..5072c24cd6 --- /dev/null +++ b/packages/realm-server/tests/module-cache-invalidation-listener-test.ts @@ -0,0 +1,573 @@ +import { module, test } from 'qunit'; +import { basename } from 'path'; +import type { PgAdapter } from '@cardstack/postgres'; +import { + CachingDefinitionLookup, + MODULE_CACHE_INVALIDATED_CHANNEL, + param, + query, + type Prerenderer, + type VirtualNetwork, + type RealmPermissions, +} from '@cardstack/runtime-common'; +import { setupDB } from './helpers'; +import { + ModuleCacheInvalidationListener, + parseModuleCacheInvalidationPayload, +} from '../lib/module-cache-invalidation-listener'; + +// Records bump calls on a stub-or-real CachingDefinitionLookup. Used by +// dispatch tests (stub form) and end-to-end tests (wrapping a real lookup +// to keep its bump methods, but recording the calls for assertion). +interface BumpRecorder { + module: Array<{ resolvedRealmURL: string; moduleURL: string }>; + realm: string[]; + global: number; +} +function newRecorder(): BumpRecorder { + return { module: [], realm: [], global: 0 }; +} + +// Minimal stand-in shaped like the bump surface the listener uses. Avoids +// constructing a full CachingDefinitionLookup for the unit-dispatch tests +// where we don't need the prerender / virtual-network plumbing. +function makeStubLookup(recorder: BumpRecorder): CachingDefinitionLookup { + const stub = { + bumpModuleGeneration(resolvedRealmURL: string, moduleURL: string) { + recorder.module.push({ resolvedRealmURL, moduleURL }); + }, + bumpRealmGeneration(resolvedRealmURL: string) { + recorder.realm.push(resolvedRealmURL); + }, + bumpGlobalGeneration() { + recorder.global += 1; + }, + }; + return stub as unknown as CachingDefinitionLookup; +} + +// Real CachingDefinitionLookup wrapped so the originals still run AND each +// bump is recorded. End-to-end tests use this so we can prove the listener +// hit the lookup attached to *this* instance, with the right args. +function recordBumpsOn( + lookup: CachingDefinitionLookup, + recorder: BumpRecorder, +): void { + const originalModule = lookup.bumpModuleGeneration.bind(lookup); + const originalRealm = lookup.bumpRealmGeneration.bind(lookup); + const originalGlobal = lookup.bumpGlobalGeneration.bind(lookup); + lookup.bumpModuleGeneration = (resolvedRealmURL, moduleURL) => { + recorder.module.push({ resolvedRealmURL, moduleURL }); + originalModule(resolvedRealmURL, moduleURL); + }; + lookup.bumpRealmGeneration = (resolvedRealmURL) => { + recorder.realm.push(resolvedRealmURL); + originalRealm(resolvedRealmURL); + }; + lookup.bumpGlobalGeneration = () => { + recorder.global += 1; + originalGlobal(); + }; +} + +// Minimal Prerenderer / VirtualNetwork stubs — these tests never trigger +// lookupDefinition, so the methods only need to typecheck. +const stubPrerenderer: Prerenderer = { + async prerenderModule() { + throw new Error('prerenderModule not used in this test'); + }, + async prerenderVisit() { + throw new Error('prerenderVisit not used in this test'); + }, + async runCommand() { + throw new Error('runCommand not used in this test'); + }, +}; +const stubVirtualNetwork = { + fetch: (async () => { + throw new Error('fetch not used in this test'); + }) as typeof fetch, +} as unknown as VirtualNetwork; +const stubCreatePrerenderAuth = ( + _userId: string, + _permissions: RealmPermissions, +) => 'stub-auth'; + +function waitFor( + getValue: () => T | undefined, + timeoutMs = 3000, + pollMs = 20, +): Promise { + return new Promise((resolve, reject) => { + const started = Date.now(); + const tick = () => { + const value = getValue(); + if (value !== undefined) { + resolve(value); + return; + } + if (Date.now() - started > timeoutMs) { + reject(new Error(`timeout after ${timeoutMs}ms`)); + return; + } + setTimeout(tick, pollMs); + }; + tick(); + }); +} + +module(basename(__filename), function () { + module('parseModuleCacheInvalidationPayload', function () { + test('parses a module payload carrying a single URL', function (assert) { + assert.deepEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ + k: 'module', + r: 'http://localhost:4201/luke/', + m: ['http://localhost:4201/luke/cards/person.gts'], + }), + ), + { + kind: 'module', + resolvedRealmURL: 'http://localhost:4201/luke/', + moduleURLs: ['http://localhost:4201/luke/cards/person.gts'], + }, + ); + }); + + test('parses a module payload carrying many URLs', function (assert) { + const urls = [ + 'https://cardstack.com/base/card-api.gts', + 'https://cardstack.com/base/string.gts', + 'https://cardstack.com/base/number.gts', + ]; + assert.deepEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ + k: 'module', + r: 'https://cardstack.com/base/', + m: urls, + }), + ), + { + kind: 'module', + resolvedRealmURL: 'https://cardstack.com/base/', + moduleURLs: urls, + }, + ); + }); + + test('parses a realm payload', function (assert) { + assert.deepEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'realm', r: 'http://localhost:4201/luke/' }), + ), + { kind: 'realm', resolvedRealmURL: 'http://localhost:4201/luke/' }, + ); + }); + + test('parses a global payload', function (assert) { + assert.deepEqual( + parseModuleCacheInvalidationPayload(JSON.stringify({ k: 'global' })), + { kind: 'global' }, + ); + }); + + test('returns undefined for non-JSON', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload('not-json'), + undefined, + ); + }); + + test('returns undefined for an unknown kind', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'garbage', r: 'http://x/' }), + ), + undefined, + ); + }); + + test('returns undefined for module payload missing realm url', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'module', r: '', m: ['http://x/foo.gts'] }), + ), + undefined, + ); + }); + + test('returns undefined for module payload with empty url list', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'module', r: 'http://x/', m: [] }), + ), + undefined, + ); + }); + + test('returns undefined for module payload with non-string url entry', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload( + JSON.stringify({ k: 'module', r: 'http://x/', m: [123] }), + ), + undefined, + ); + }); + + test('returns undefined for an empty realm payload', function (assert) { + assert.strictEqual( + parseModuleCacheInvalidationPayload(JSON.stringify({ k: 'realm' })), + undefined, + ); + }); + }); + + module('ModuleCacheInvalidationListener (dispatch)', function () { + test('handleNotification with single-URL module payload bumps once', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification( + JSON.stringify({ + k: 'module', + r: 'http://x.test/r/', + m: ['http://x.test/r/cards/foo.gts'], + }), + ); + + assert.deepEqual(recorder.module, [ + { + resolvedRealmURL: 'http://x.test/r/', + moduleURL: 'http://x.test/r/cards/foo.gts', + }, + ]); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification with multi-URL module payload bumps once per URL', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + const urls = [ + 'http://x.test/r/cards/foo.gts', + 'http://x.test/r/cards/bar.gts', + 'http://x.test/r/cards/baz.gts', + ]; + listener.handleNotification( + JSON.stringify({ k: 'module', r: 'http://x.test/r/', m: urls }), + ); + + assert.deepEqual( + recorder.module, + urls.map((moduleURL) => ({ + resolvedRealmURL: 'http://x.test/r/', + moduleURL, + })), + ); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification with realm payload bumps bumpRealmGeneration', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification( + JSON.stringify({ k: 'realm', r: 'http://x.test/r/' }), + ); + + assert.deepEqual(recorder.realm, ['http://x.test/r/']); + assert.deepEqual(recorder.module, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification with global payload bumps bumpGlobalGeneration', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification(JSON.stringify({ k: 'global' })); + + assert.strictEqual(recorder.global, 1); + assert.deepEqual(recorder.module, []); + assert.deepEqual(recorder.realm, []); + }); + + test('handleNotification ignores a malformed payload without throwing', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification('not-json'); + listener.handleNotification( + JSON.stringify({ k: 'module', r: 'http://x.test/r/' }), + ); + listener.handleNotification(JSON.stringify({ k: 'realm' })); + + assert.deepEqual(recorder.module, []); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + + test('handleNotification ignores empty/undefined payloads', function (assert) { + const recorder = newRecorder(); + const listener = new ModuleCacheInvalidationListener({ + dbAdapter: {} as unknown as PgAdapter, + definitionLookup: makeStubLookup(recorder), + }); + + listener.handleNotification(undefined); + listener.handleNotification(''); + + assert.deepEqual(recorder.module, []); + assert.deepEqual(recorder.realm, []); + assert.strictEqual(recorder.global, 0); + }); + }); + + module( + 'ModuleCacheInvalidationListener (LISTEN end-to-end)', + function (hooks) { + let dbAdapter: PgAdapter; + + setupDB(hooks, { + beforeEach: async (adapter) => { + dbAdapter = adapter; + }, + }); + + // The end-to-end scenario: instance A and instance B share a DB. + // A calls invalidate(); A's invalidate() emits NOTIFY + // module_cache_invalidated. B's listener receives the notify and + // calls bumpModuleGeneration on B's CachingDefinitionLookup. Without + // CS-10952, B's counters would lag A's, and a B-side in-flight + // prerender could persist a row A just deleted. + test('A.invalidate(url) → B listener bumps B.bumpModuleGeneration', async function (assert) { + const realmURL = 'http://x.test/peer-invalidate/'; + const instanceB = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + instanceB.registerRealm({ + url: realmURL, + async getRealmOwnerUserId() { + return 'owner-b'; + }, + async visibility() { + return 'private'; + }, + }); + const recorderB = newRecorder(); + recordBumpsOn(instanceB, recorderB); + + const listenerB = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceB, + }); + await listenerB.start(); + try { + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + instanceA.registerRealm({ + url: realmURL, + async getRealmOwnerUserId() { + return 'owner-a'; + }, + async visibility() { + return 'private'; + }, + }); + + await instanceA.invalidate(`${realmURL}cards/foo.gts`); + + const seen = await waitFor(() => + recorderB.module.length > 0 ? recorderB.module : undefined, + ); + // The first bump corresponds to invalidate's fan-out. A's + // moduleURLVariants pass produces multiple URL forms (`.gts`, + // `.ts`, `.gjs`, `.js`, extensionless). The exact URL we + // invalidated is in the list; assert that. + const targetURL = `${realmURL}cards/foo.gts`; + const matched = seen.find((b) => b.moduleURL === targetURL); + assert.ok( + matched, + `peer received bump for ${targetURL}; got ${JSON.stringify(seen)}`, + ); + assert.strictEqual(matched?.resolvedRealmURL, realmURL); + } finally { + await listenerB.shutDown(); + } + }); + + test('A.clearRealmCache(url) → B listener bumps B.bumpRealmGeneration', async function (assert) { + const realmURL = 'http://x.test/peer-clear-realm/'; + const instanceB = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + const recorderB = newRecorder(); + recordBumpsOn(instanceB, recorderB); + + const listenerB = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceB, + }); + await listenerB.start(); + try { + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + await instanceA.clearRealmCache(realmURL); + + const seen = await waitFor(() => + recorderB.realm.length > 0 ? recorderB.realm : undefined, + ); + assert.deepEqual(seen, [realmURL]); + } finally { + await listenerB.shutDown(); + } + }); + + test('A.clearAllModules() → B listener bumps B.bumpGlobalGeneration', async function (assert) { + const instanceB = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + const recorderB = newRecorder(); + recordBumpsOn(instanceB, recorderB); + + const listenerB = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceB, + }); + await listenerB.start(); + try { + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + await instanceA.clearAllModules(); + + await waitFor(() => (recorderB.global > 0 ? true : undefined)); + assert.strictEqual( + recorderB.global, + 1, + `peer's global counter bumped exactly once`, + ); + } finally { + await listenerB.shutDown(); + } + }); + + test('self-NOTIFY is harmless: emitter receives its own bump as an idempotent second bump', async function (assert) { + const realmURL = 'http://x.test/self-echo/'; + const instanceA = new CachingDefinitionLookup( + dbAdapter, + stubPrerenderer, + stubVirtualNetwork, + stubCreatePrerenderAuth, + ); + instanceA.registerRealm({ + url: realmURL, + async getRealmOwnerUserId() { + return 'owner-a'; + }, + async visibility() { + return 'private'; + }, + }); + const recorderA = newRecorder(); + recordBumpsOn(instanceA, recorderA); + + const listenerA = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: instanceA, + }); + await listenerA.start(); + try { + // Synchronously, invalidate() bumps the module generation + // before awaiting the DELETE. Then the DELETE commits, the + // pg_notify fires, and the listener (this same instance) bumps + // again. So we expect to see the local-bump entries plus a + // listener-replay bump for the same URL. + const targetURL = `${realmURL}cards/self.gts`; + await instanceA.invalidate(targetURL); + + // Wait until the listener has had a chance to replay (i.e. + // there are MORE entries than the synchronous invalidate path + // produced). Synchronous invalidate fans out across module + // variants; the listener replay produces one entry per URL + // notified — same set as the synchronous fan-out. So the total + // count after replay is roughly 2x the fan-out, with ≥1 echo + // visible for the original URL. + const targetMatches = await waitFor(() => { + const matches = recorderA.module.filter( + (b) => b.moduleURL === targetURL, + ); + return matches.length >= 2 ? matches : undefined; + }); + assert.ok( + targetMatches.length >= 2, + `expected ≥2 bumps for ${targetURL} (1 synchronous + 1 listener echo); got ${targetMatches.length}`, + ); + } finally { + await listenerA.shutDown(); + } + }); + + test('listener receives a manually-emitted NOTIFY with the channel constant', async function (assert) { + const recorderA = newRecorder(); + const listenerA = new ModuleCacheInvalidationListener({ + dbAdapter, + definitionLookup: makeStubLookup(recorderA), + }); + await listenerA.start(); + try { + await query(dbAdapter, [ + `SELECT pg_notify(`, + param(MODULE_CACHE_INVALIDATED_CHANNEL), + `,`, + param(JSON.stringify({ k: 'global' })), + `)`, + ]); + + await waitFor(() => (recorderA.global > 0 ? true : undefined)); + assert.strictEqual(recorderA.global, 1); + } finally { + await listenerA.shutDown(); + } + }); + }, + ); +}); diff --git a/packages/runtime-common/definition-lookup.ts b/packages/runtime-common/definition-lookup.ts index e07cd38e37..2de6923020 100644 --- a/packages/runtime-common/definition-lookup.ts +++ b/packages/runtime-common/definition-lookup.ts @@ -38,6 +38,21 @@ import type { VirtualNetwork } from './virtual-network'; const MODULES_TABLE = 'modules'; const PREFERRED_EXECUTABLE_EXTENSIONS = ['.gts', '.ts', '.gjs', '.js']; +// Postgres NOTIFY channel for cross-instance module-cache invalidation +// (CS-10952). Each invalidation path emits one or more notifications so +// peer realm-server processes can bump their in-memory generation counters +// in lockstep with the DB. Payload is JSON; one of: +// {"k":"module","r":,"m":[,...]} — invalidate fan-out +// {"k":"realm","r":} — clearRealmCache +// {"k":"global"} — clearAllModules +// Self-notify is idempotent: the emitting process already bumped its +// counter synchronously before the DB delete, and a second bump on +// listener receive is observationally equivalent (counters are monotonic +// and only used for snapshot equality). +export const MODULE_CACHE_INVALIDATED_CHANNEL = 'module_cache_invalidated'; +// Postgres caps NOTIFY payloads at 8000 bytes; stay well under so JSON +// encoding overhead and pathological URL lengths don't blow the limit. +const NOTIFY_PAYLOAD_BUDGET = 7000; // Cached module errors expire after this interval. When a stale error entry // is encountered, the prerenderer is called again to get a fresh result. // This prevents transient prerender failures from being permanently cached. @@ -459,10 +474,14 @@ export class CachingDefinitionLookup implements DefinitionLookup { ); } - private bumpModuleGeneration( - resolvedRealmURL: string, - moduleURL: string, - ): void { + // Public so the cross-instance ModuleCacheInvalidationListener (CS-10952) + // can replay an invalidation broadcast from a peer realm-server into this + // process's counters. Internal callers in invalidate() / clearRealmCache() + // use the same methods. Bumping is idempotent w.r.t. correctness — a + // double-bump from the self-notify echo is observationally indistinguishable + // from a single bump because in-flight prerenders only test for snapshot + // equality, not absolute value. + bumpModuleGeneration(resolvedRealmURL: string, moduleURL: string): void { let key = moduleGenerationKey(resolvedRealmURL, moduleURL); this.#moduleGenerations.set( key, @@ -470,13 +489,17 @@ export class CachingDefinitionLookup implements DefinitionLookup { ); } - private bumpRealmGeneration(resolvedRealmURL: string): void { + bumpRealmGeneration(resolvedRealmURL: string): void { this.#realmGenerations.set( resolvedRealmURL, (this.#realmGenerations.get(resolvedRealmURL) ?? 0) + 1, ); } + bumpGlobalGeneration(): void { + this.#globalGeneration += 1; + } + // Returns true if the cached entry has a top-level error and has exceeded // the error TTL. This causes the entry to be treated as a cache miss so // the prerenderer is called again to get a fresh result. This prevents @@ -627,6 +650,10 @@ export class CachingDefinitionLookup implements DefinitionLookup { } this.dropInFlightForRealm(resolvedRealmURL, uniqueInvalidations); await this.deleteModuleAliases(resolvedRealmURL, uniqueInvalidations); + await this.notifyModuleCacheInvalidations( + resolvedRealmURL, + uniqueInvalidations, + ); return uniqueInvalidations; } @@ -643,12 +670,104 @@ export class CachingDefinitionLookup implements DefinitionLookup { ['resolved_realm_url =', param(resolvedRealmURL)], ]) as Expression), ]); + await this.notifyRealmCacheInvalidation(resolvedRealmURL); } async clearAllModules(): Promise { - this.#globalGeneration += 1; + this.bumpGlobalGeneration(); this.#inFlight.clear(); await this.query(['DELETE FROM', MODULES_TABLE]); + await this.notifyGlobalCacheInvalidation(); + } + + // pg_notify emission helpers. Mirror Realm.#notifyFileChange's best-effort + // pattern: the local instance's counters and DB row are already updated + // synchronously before this runs, so a notify failure is a bounded + // cross-instance staleness window — peers self-heal on their next + // prerender of the same key. Sequenced after the DELETE rather than + // wrapped in BEGIN/COMMIT because each invalidation path is a single + // autocommit DELETE; sequential pg_notify after the DELETE has the same + // observable effect (peer sees notify only after delete commits). + // Suppressed for sqlite/in-memory DBAdapters where NOTIFY isn't + // available; the cross-instance scenario only applies to pg. + // + // Payloads are JSON-encoded so a single invalidate() fan-out (which can + // include the source URL plus its extension variants plus transitive + // consumers from calculateInvalidations()) emits one pg_notify carrying + // the full URL list instead of one per URL. With M peer processes that + // turns M*N listener wakeups into M; the listener does the same N bumps + // either way. Postgres caps NOTIFY payloads at 8000 bytes, so the module + // emitter chunks the URL list to stay under a 7000-byte budget — common + // case is one notify; pathological fan-out becomes a handful. + private async notifyModuleCacheInvalidations( + resolvedRealmURL: string, + moduleURLs: string[], + ): Promise { + if (this.#dbAdapter.kind !== 'pg' || moduleURLs.length === 0) { + return; + } + const wrapperBytes = JSON.stringify({ + k: 'module', + r: resolvedRealmURL, + m: [], + }).length; + const budget = NOTIFY_PAYLOAD_BUDGET - wrapperBytes; + let chunk: string[] = []; + let chunkBytes = 0; + const flush = async () => { + if (chunk.length === 0) return; + await this.bestEffortNotify( + JSON.stringify({ k: 'module', r: resolvedRealmURL, m: chunk }), + ); + chunk = []; + chunkBytes = 0; + }; + for (let moduleURL of moduleURLs) { + const encodedLen = JSON.stringify(moduleURL).length; + const addedCost = encodedLen + (chunk.length === 0 ? 0 : 1); + if (chunkBytes + addedCost > budget && chunk.length > 0) { + await flush(); + } + chunk.push(moduleURL); + chunkBytes += chunk.length === 1 ? encodedLen : addedCost; + } + await flush(); + } + + private async notifyRealmCacheInvalidation( + resolvedRealmURL: string, + ): Promise { + if (this.#dbAdapter.kind !== 'pg') { + return; + } + await this.bestEffortNotify( + JSON.stringify({ k: 'realm', r: resolvedRealmURL }), + ); + } + + private async notifyGlobalCacheInvalidation(): Promise { + if (this.#dbAdapter.kind !== 'pg') { + return; + } + await this.bestEffortNotify(JSON.stringify({ k: 'global' })); + } + + private async bestEffortNotify(payload: string): Promise { + try { + await this.query([ + 'SELECT pg_notify(', + param(MODULE_CACHE_INVALIDATED_CHANNEL), + ',', + param(payload), + ')', + ]); + } catch (err: unknown) { + // Local state is already consistent; cross-instance staleness is + // bounded and self-healing. Don't fail the invalidation. + console.warn( + `pg_notify ${MODULE_CACHE_INVALIDATED_CHANNEL} failed for "${payload}": ${String(err)}`, + ); + } } // Drops in-flight entries whose pending prerender result would no longer