diff --git a/packages/bot-runner/tests/bot-runner-test.ts b/packages/bot-runner/tests/bot-runner-test.ts index bf9be8ad138..a9804e8608e 100644 --- a/packages/bot-runner/tests/bot-runner-test.ts +++ b/packages/bot-runner/tests/bot-runner-test.ts @@ -137,6 +137,7 @@ module('timeline handler', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; queuePublisher = { diff --git a/packages/bot-runner/tests/command-runner-test.ts b/packages/bot-runner/tests/command-runner-test.ts index 7ae81c8c2e6..b99178be42b 100644 --- a/packages/bot-runner/tests/command-runner-test.ts +++ b/packages/bot-runner/tests/command-runner-test.ts @@ -98,6 +98,7 @@ module('command runner', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -243,6 +244,7 @@ module('command runner', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -464,6 +466,7 @@ module('command runner', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -559,6 +562,7 @@ module('command runner', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -697,6 +701,7 @@ module('command runner', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -910,6 +915,7 @@ module('command runner', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -1030,6 +1036,7 @@ module('command runner', () => { }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); diff --git a/packages/host/app/lib/sqlite-adapter.ts b/packages/host/app/lib/sqlite-adapter.ts index df8e599d1aa..26857a2106d 100644 --- a/packages/host/app/lib/sqlite-adapter.ts +++ b/packages/host/app/lib/sqlite-adapter.ts @@ -9,6 +9,7 @@ import { type DBAdapter, type PgPrimitive, type ExecuteOptions, + type Querier, Deferred, } from '@cardstack/runtime-common'; @@ -57,6 +58,20 @@ export default class SQLiteAdapter implements DBAdapter { // realm with no peers to notify, so this is intentionally a no-op. async notify(_channel: string, _payload: string): Promise {} + // SQLite has no cross-connection concurrency to coordinate, so this is + // a passthrough — `txQuerier` is always undefined; callers shouldn't + // depend on any transactional grouping here. The PG implementation + // provides the real serialization across replicas. We match the + // `DBAdapter` signature (`Querier | undefined`) rather than narrowing + // to `undefined` so callers of this concrete class get the same + // contextually-typed callback parameter as they would for any DBAdapter. + async withWriteLock( + _realmUrl: string, + fn: (txQuerier: Querier | undefined) => Promise, + ): Promise { + return await fn(undefined); + } + private async internalExecute(sql: string, opts?: ExecuteOptions) { sql = this.adjustSQL(sql); return await this.query(sql, opts); diff --git a/packages/postgres/pg-adapter.ts b/packages/postgres/pg-adapter.ts index 88a0f1c7113..aae6abc4943 100644 --- a/packages/postgres/pg-adapter.ts +++ b/packages/postgres/pg-adapter.ts @@ -3,9 +3,12 @@ import { type PgPrimitive, type ExecuteOptions, type Expression, + type Querier, expressionToSql, logger, + param, } from '@cardstack/runtime-common'; +import { createHash } from 'crypto'; import migrate from 'node-pg-migrate'; import { join } from 'path'; import { Pool, Client, type Notification } from 'pg'; @@ -13,6 +16,21 @@ import { Pool, Client, type Notification } from 'pg'; import { postgresConfig } from './pg-config'; import migrationNameFixes from './scripts/migration-name-fixes.js'; +// Hash a realm URL to a stable signed int64 (as a string, because JS numbers +// can't represent the full int64 range). Used as a pg advisory lock key: +// two writers for the same realm URL hash to the same key and serialize; +// writers for different URLs use different keys and run in parallel. +// +// sha256 is overkill crypto-wise but the cost is negligible, and it gives +// excellent collision resistance at the 10,000+ realm scale the project +// plans for. Returning a string (rather than BigInt) keeps the value +// parameter-compatible with our existing `query` / `param` helpers, which +// don't accept BigInt directly. +export function hashRealmUrlForAdvisoryLock(url: string): string { + const digest = createHash('sha256').update(url).digest(); + return digest.readBigInt64BE(0).toString(); +} + const log = logger('pg-adapter'); type MigrationNameFixes = { @@ -346,6 +364,92 @@ export class PgAdapter implements DBAdapter { } } + // Run `fn` while holding a per-realm transaction-scoped Postgres advisory + // lock. The lock is taken with `pg_advisory_xact_lock` inside an explicit + // BEGIN / COMMIT on a pinned pool connection (via withConnection), so the + // lock is automatically released by the transaction's commit or rollback + // — there is no "unlock failed → stale lock on pooled connection" failure + // mode that a session-scoped `pg_advisory_lock` + `pg_advisory_unlock` + // pattern would expose. + // + // Concurrent callers for the same realm URL serialize (the second call + // blocks on the xact-lock until the first's transaction commits/rolls + // back). Callers for different URLs run in parallel — the hash key space + // ensures that. + // + // Reads are NOT gated by this lock; reads hit the DB directly (or go + // through in-memory caches on the realm-server) without acquiring it. + // Read-heavy paths should never call this helper. + // + // Note on the enclosing transaction: `fn` runs inside BEGIN/COMMIT here so + // the advisory lock is correctly scoped, AND so any DELETEs `fn` runs via + // the pinned `txQuerier` argument share that transaction's atomicity. + // CS-10898 plumbed the pinned querier through the realm-destruction + // helpers (removeRealmDatabaseArtifacts, removeRealmPermissions, + // deleteRegistryRowByUrl, deletePublishedRowsBySourceUrl, + // cancelRunningJobsInConcurrencyGroup); when callers pass `txQuerier` to + // those helpers, all their writes commit or roll back together with the + // advisory lock's own transaction. Queries `fn` issues through the shared + // dbAdapter still go via separate pool connections and are NOT part of + // this transaction. The data-plane mutation paths in runtime-common + // (CS-11125) intentionally do not consume `txQuerier`: their inner work + // (writing files via the FS adapter, enqueuing indexing jobs, broadcasting + // NOTIFY events) is not transactional with the lock-holder's connection. + // The lock there serves only to serialize concurrent same-URL writers + // across replicas, not to group DB statements into a single tx. + // + // Pool-exhaustion caveat: when the callback opts into the pinned querier + // for all of its DB work, only one client is checked out for the entire + // critical section. If the callback also issues queries through the + // shared dbAdapter (e.g. existence-check SELECTs), each of those checks + // out an additional pool client briefly. Under N concurrent same-URL + // writers, N-1 block on the advisory lock before doing anything — so + // this method does not itself amplify pool pressure. Under N concurrent + // different-URL writers, each pins one client; if the pool ceiling is + // less than realistic write concurrency, callbacks that need additional + // pool clients could deadlock waiting on the pool. For current scope + // (low realistic write concurrency, pool size >= concurrent writers + + // headroom) this is acceptable. + // + // Re-entrancy: callers MUST NOT re-enter the lock for the same URL while + // already holding it — a second `pg_advisory_xact_lock` on the same key + // would pin a different pool connection and block forever on its own + // transaction. Code that wraps a wider critical section around a method + // that also takes the lock must invoke the unlocked inner variant (e.g. + // realm.ts uses `_batchWriteUnlocked` inside its own withWriteLock). + async withWriteLock( + realmUrl: string, + fn: (txQuerier: Querier | undefined) => Promise, + ): Promise { + const lockKey = hashRealmUrlForAdvisoryLock(realmUrl); + return await this.withConnection(async (queryFn) => { + await queryFn(['BEGIN']); + try { + await queryFn([ + `SELECT pg_advisory_xact_lock(`, + param(lockKey), + `::bigint)`, + ]); + const result = await fn(queryFn); + await queryFn(['COMMIT']); + return result; + } catch (err: unknown) { + try { + await queryFn(['ROLLBACK']); + } catch (rollbackErr: unknown) { + // Rollback failed — the xact-lock is still released when the + // connection's transaction is aborted (pg will auto-rollback on + // client release), so we don't have a stale-lock problem. Log + // for visibility and rethrow the original error. + log.warn( + `ROLLBACK after withWriteLock error for ${realmUrl} failed: ${String(rollbackErr)}`, + ); + } + throw err; + } + }); + } + async withConnection( fn: ( query: (e: Expression) => Promise[]>, diff --git a/packages/realm-server/handlers/handle-delete-realm.ts b/packages/realm-server/handlers/handle-delete-realm.ts index f7bdbba3b38..fc117f61d40 100644 --- a/packages/realm-server/handlers/handle-delete-realm.ts +++ b/packages/realm-server/handlers/handle-delete-realm.ts @@ -33,7 +33,6 @@ import { deletePublishedRowsBySourceUrl, deleteRegistryRowByUrl, } from '../lib/realm-registry-writes'; -import { withRealmWriteLock } from '../lib/realm-advisory-locks'; interface DeleteRealmJSON { data: { @@ -161,7 +160,7 @@ export default function handleDeleteRealm({ // soft-delete, source permissions + DB artifacts) commits atomically // or rolls back atomically. A failure halfway through no longer // leaves the realm half-deleted. - await withRealmWriteLock(dbAdapter, realmURL, async (txQuerier) => { + await dbAdapter.withWriteLock(realmURL, async (txQuerier) => { // Delete the published rows first so the RETURNING set is the // authoritative list of rows the tx will commit. Driving the // per-published cleanup off this set (instead of a pre-lock diff --git a/packages/realm-server/handlers/handle-publish-realm.ts b/packages/realm-server/handlers/handle-publish-realm.ts index a7d2d9b68e3..953a9fc3472 100644 --- a/packages/realm-server/handlers/handle-publish-realm.ts +++ b/packages/realm-server/handlers/handle-publish-realm.ts @@ -42,7 +42,6 @@ import { registerUser } from '../synapse'; import { passwordFromSeed } from '@cardstack/runtime-common/matrix-client'; import { enqueueReindexRealmJob } from '@cardstack/runtime-common/jobs/reindex-realm'; import { upsertPublishedRealmInRegistry } from '../lib/realm-registry-writes'; -import { withRealmWriteLock } from '../lib/realm-advisory-locks'; const log = logger('handle-publish'); @@ -398,8 +397,7 @@ export default function handlePublishRealm({ // mounts the (re-)published realm on its first request. The // response is 202 Accepted with status:'pending'; the client polls // //_readiness-check to learn when it's ready. - let { lastPublishedAt, publishedRealmId } = await withRealmWriteLock( - dbAdapter, + let { lastPublishedAt, publishedRealmId } = await dbAdapter.withWriteLock( publishedRealmURL, async () => { let existingRows = (await query(dbAdapter, [ diff --git a/packages/realm-server/handlers/handle-unpublish-realm.ts b/packages/realm-server/handlers/handle-unpublish-realm.ts index 88a9323ac3c..9a96e698166 100644 --- a/packages/realm-server/handlers/handle-unpublish-realm.ts +++ b/packages/realm-server/handlers/handle-unpublish-realm.ts @@ -27,7 +27,6 @@ import { removeRealmFiles, } from './realm-destruction-utils'; import { deleteRegistryRowByUrl } from '../lib/realm-registry-writes'; -import { withRealmWriteLock } from '../lib/realm-advisory-locks'; const log = logger('handle-unpublish'); @@ -137,18 +136,14 @@ export default function handleUnpublishRealm({ // mid-cleanup rolls back both DELETEs together. realms[] / // virtualNetwork mutation stays in the reconciler's hands — it // reacts to the registry DELETE + NOTIFY emitted below. - await withRealmWriteLock( - dbAdapter, - publishedRealmURL, - async (txQuerier) => { - await deleteRegistryRowByUrl(dbAdapter, publishedRealmURL, txQuerier); - await removeRealmPermissions( - dbAdapter, - new URL(publishedRealmURL), - txQuerier, - ); - }, - ); + await dbAdapter.withWriteLock(publishedRealmURL, async (txQuerier) => { + await deleteRegistryRowByUrl(dbAdapter, publishedRealmURL, txQuerier); + await removeRealmPermissions( + dbAdapter, + new URL(publishedRealmURL), + txQuerier, + ); + }); // FS removal happens after the DB transaction commits. If the rm // fails, the realm's row + permissions are already gone and the diff --git a/packages/realm-server/lib/realm-advisory-locks.ts b/packages/realm-server/lib/realm-advisory-locks.ts deleted file mode 100644 index 1e45dbdb2c0..00000000000 --- a/packages/realm-server/lib/realm-advisory-locks.ts +++ /dev/null @@ -1,119 +0,0 @@ -import { createHash } from 'crypto'; -import { - logger, - param, - type DBAdapter, - type Querier, -} from '@cardstack/runtime-common'; -import type { PgAdapter } from '@cardstack/postgres'; - -const log = logger('realm-server:advisory-locks'); - -// Hash a realm URL to a stable signed int64 (as a string, because JS numbers -// can't represent the full int64 range). Used as a pg advisory lock key: -// two writers for the same realm URL hash to the same key and serialize; -// writers for different URLs use different keys and run in parallel. -// -// sha256 is overkill crypto-wise but the cost is negligible, and it gives -// excellent collision resistance at the 10,000+ realm scale the project -// plans for. Returning a string (rather than BigInt) keeps the value -// parameter-compatible with our existing `query` / `param` helpers, which -// don't accept BigInt directly. -export function hashRealmUrlForAdvisoryLock(url: string): string { - const digest = createHash('sha256').update(url).digest(); - return digest.readBigInt64BE(0).toString(); -} - -// Run `fn` while holding a per-realm transaction-scoped Postgres advisory -// lock. The lock is taken with `pg_advisory_xact_lock` inside an explicit -// BEGIN / COMMIT on a pinned pool connection (via PgAdapter.withConnection), -// so the lock is automatically released by the transaction's commit or -// rollback — there is no "unlock failed → stale lock on pooled connection" -// failure mode that a session-scoped `pg_advisory_lock` + `pg_advisory_unlock` -// pattern would expose. -// -// Concurrent callers for the same realm URL serialize (the second call -// blocks on the xact-lock until the first's transaction commits/rolls -// back). Callers for different URLs run in parallel — the hash key space -// ensures that. -// -// Reads are NOT gated by this lock; reads hit the DB directly (or go -// through in-memory caches on the realm-server) without acquiring it. -// Read-heavy paths should never call this helper. -// -// Note on the enclosing transaction: `fn` runs inside BEGIN/COMMIT here so -// the advisory lock is correctly scoped, AND so any DELETEs `fn` runs via -// the pinned `txQuerier` argument share that transaction's atomicity. CS-10898 -// (this PR) plumbed the pinned querier through the realm-destruction helpers -// (`removeRealmDatabaseArtifacts`, `removeRealmPermissions`, -// `deleteRegistryRowByUrl`, `deletePublishedRowsBySourceUrl`, -// `cancelRunningJobsInConcurrencyGroup`); when callers pass `txQuerier` to -// those helpers, all their writes commit or roll back together with the -// advisory lock's own transaction. Queries `fn` issues through the shared -// `dbAdapter` still go via separate pool connections and are NOT part of -// this transaction — that's the pre-CS-10898 behavior preserved as the -// default for callers that don't opt in. -// -// In the SQLite branch (test environments only) `txQuerier` is `undefined`, -// so helpers fall back to the shared `dbAdapter`. SQLite has no cross- -// connection concurrency, so neither the lock nor the tx semantics matter -// there. -// -// Pool-exhaustion caveat: when the callback opts into the pinned querier -// for all of its DB work, only one client is checked out for the entire -// critical section — both the lock and the destruction queries share that -// connection. If the callback also issues queries through the shared -// `dbAdapter` (e.g. existence-check SELECTs that don't need to be inside -// the tx), each of those checks out an additional pool client briefly. -// Under N concurrent same-URL writers, N-1 block on the advisory lock -// before doing anything — so this helper does not itself amplify pool -// pressure. Under N concurrent different-URL writers, each pins one -// client; if the pool ceiling is less than realistic write concurrency, -// callbacks that need additional pool clients could deadlock waiting on -// the pool. For current scope (low realistic write concurrency, pool size -// >= concurrent writers + headroom) this is acceptable. -// -// Deadlock note: callers should never acquire a second write lock for a -// different URL while holding one — that's the only way a cycle could -// form. Mutation handlers lock exactly one URL each, so no cycles. -export async function withRealmWriteLock( - dbAdapter: DBAdapter, - realmUrl: string, - fn: (txQuerier: Querier | undefined) => Promise, -): Promise { - // Advisory locks are Postgres-specific. In test environments backed by - // SQLite (no cross-connection concurrency to worry about anyway) we - // short-circuit and run fn directly. The PgAdapter branch is the one - // that does real work. - if (dbAdapter.kind !== 'pg') { - return await fn(undefined); - } - const pg = dbAdapter as unknown as PgAdapter; - const lockKey = hashRealmUrlForAdvisoryLock(realmUrl); - return await pg.withConnection(async (queryFn) => { - await queryFn(['BEGIN']); - try { - await queryFn([ - `SELECT pg_advisory_xact_lock(`, - param(lockKey), - `::bigint)`, - ]); - const result = await fn(queryFn); - await queryFn(['COMMIT']); - return result; - } catch (err: unknown) { - try { - await queryFn(['ROLLBACK']); - } catch (rollbackErr: unknown) { - // Rollback failed — the xact-lock is still released when the - // connection's transaction is aborted (pg will auto-rollback on - // client release), so we don't have a stale-lock problem. Log for - // visibility and rethrow the original error. - log.warn( - `ROLLBACK after withRealmWriteLock error for ${realmUrl} failed: ${String(rollbackErr)}`, - ); - } - throw err; - } - }); -} diff --git a/packages/realm-server/server.ts b/packages/realm-server/server.ts index 7d89dec8b2f..206ca2a1486 100644 --- a/packages/realm-server/server.ts +++ b/packages/realm-server/server.ts @@ -50,7 +50,6 @@ import { APP_BOXEL_REALM_SERVER_EVENT_MSGTYPE } from '@cardstack/runtime-common/ import type { Prerenderer } from '@cardstack/runtime-common'; import { retrieveScopedCSS } from './lib/retrieve-scoped-css'; import { insertSourceRealmInRegistry } from './lib/realm-registry-writes'; -import { withRealmWriteLock } from './lib/realm-advisory-locks'; import type { RealmRegistryReconciler } from './lib/realm-registry-reconciler'; import { indexURLCandidates, @@ -1042,12 +1041,12 @@ export class RealmServer { publishable: true, }; - // Serialize against any other caller of withRealmWriteLock for this + // Serialize against any other caller of withWriteLock for this // same URL (concurrent createRealm for the same endpoint, or a // concurrent publish/unpublish/delete). This is almost never a real // concurrency concern — the endpoint was already checked above for // collision. - await withRealmWriteLock(this.dbAdapter, url, async () => { + await this.dbAdapter.withWriteLock(url, async () => { await insertPermissions(this.dbAdapter, new URL(url), { [ownerUserId]: DEFAULT_PERMISSIONS, }); diff --git a/packages/realm-server/tests/indexing-event-sink-test.ts b/packages/realm-server/tests/indexing-event-sink-test.ts index 65735881ee3..c9b6895d30d 100644 --- a/packages/realm-server/tests/indexing-event-sink-test.ts +++ b/packages/realm-server/tests/indexing-event-sink-test.ts @@ -30,6 +30,9 @@ function makeRecordingAdapter(): { return []; }, async notify() {}, + async withWriteLock(_url, fn) { + return await fn(undefined); + }, }, }; } @@ -376,6 +379,9 @@ module(basename(__filename), function () { return []; }, async notify() {}, + async withWriteLock(_url, fn) { + return await fn(undefined); + }, }; let sink = new IndexingEventSink({ flushIntervalMs: 10 }); sink.setAdapter(slowAdapter); diff --git a/packages/realm-server/tests/prerender-proxy-test.ts b/packages/realm-server/tests/prerender-proxy-test.ts index a739a359b1e..482a75392cc 100644 --- a/packages/realm-server/tests/prerender-proxy-test.ts +++ b/packages/realm-server/tests/prerender-proxy-test.ts @@ -29,6 +29,9 @@ module(basename(__filename), function () { async getColumnNames() { return []; }, + async withWriteLock(_url, fn) { + return await fn(undefined); + }, }; } diff --git a/packages/realm-server/tests/realm-advisory-locks-test.ts b/packages/realm-server/tests/realm-advisory-locks-test.ts index ecc73419072..e9035d4ce16 100644 --- a/packages/realm-server/tests/realm-advisory-locks-test.ts +++ b/packages/realm-server/tests/realm-advisory-locks-test.ts @@ -1,11 +1,10 @@ import { module, test } from 'qunit'; import { basename } from 'path'; -import type { PgAdapter } from '@cardstack/postgres'; -import { setupDB } from './helpers'; import { hashRealmUrlForAdvisoryLock, - withRealmWriteLock, -} from '../lib/realm-advisory-locks'; + type PgAdapter, +} from '@cardstack/postgres'; +import { setupDB } from './helpers'; module(basename(__filename), function () { module('hashRealmUrlForAdvisoryLock', function () { @@ -42,7 +41,7 @@ module(basename(__filename), function () { }); }); - module('withRealmWriteLock', function (hooks) { + module('PgAdapter.withWriteLock', function (hooks) { let dbAdapter: PgAdapter; setupDB(hooks, { beforeEach: async (adapter) => { @@ -51,8 +50,7 @@ module(basename(__filename), function () { }); test('runs the callback and returns its value', async function (assert) { - const result = await withRealmWriteLock( - dbAdapter, + const result = await dbAdapter.withWriteLock( 'http://localhost:4201/x/', async () => 42, ); @@ -67,14 +65,14 @@ module(basename(__filename), function () { // tries to acquire concurrently and should only run after the first // releases. We verify by appending to a shared array in a specific // order and checking the final ordering. - const p1 = withRealmWriteLock(dbAdapter, url, async () => { + const p1 = dbAdapter.withWriteLock(url, async () => { events.push('1-start'); await new Promise((r) => setTimeout(r, 150)); events.push('1-end'); }); // Give p1 a head start so it actually holds the lock first. await new Promise((r) => setTimeout(r, 20)); - const p2 = withRealmWriteLock(dbAdapter, url, async () => { + const p2 = dbAdapter.withWriteLock(url, async () => { events.push('2-start'); events.push('2-end'); }); @@ -91,8 +89,7 @@ module(basename(__filename), function () { test('runs concurrent callers for different URLs in parallel', async function (assert) { const events: string[] = []; - const p1 = withRealmWriteLock( - dbAdapter, + const p1 = dbAdapter.withWriteLock( 'http://localhost:4201/a/', async () => { events.push('a-start'); @@ -100,8 +97,7 @@ module(basename(__filename), function () { events.push('a-end'); }, ); - const p2 = withRealmWriteLock( - dbAdapter, + const p2 = dbAdapter.withWriteLock( 'http://localhost:4201/b/', async () => { events.push('b-start'); @@ -126,14 +122,14 @@ module(basename(__filename), function () { test('releases the lock when the callback throws', async function (assert) { const url = 'http://localhost:4201/throw/'; await assert.rejects( - withRealmWriteLock(dbAdapter, url, async () => { + dbAdapter.withWriteLock(url, async () => { throw new Error('deliberate failure'); }), /deliberate failure/, ); // A second acquisition should succeed immediately — if the lock leaked, // this would block forever (test timeout would fire). - const result = await withRealmWriteLock(dbAdapter, url, async () => 'ok'); + const result = await dbAdapter.withWriteLock(url, async () => 'ok'); assert.strictEqual(result, 'ok', 'lock released after prior failure'); }); }); diff --git a/packages/realm-server/tests/realm-cleanup-transaction-test.ts b/packages/realm-server/tests/realm-cleanup-transaction-test.ts index 3cf44f6e9e4..be006f8d80f 100644 --- a/packages/realm-server/tests/realm-cleanup-transaction-test.ts +++ b/packages/realm-server/tests/realm-cleanup-transaction-test.ts @@ -13,7 +13,6 @@ import { deleteRegistryRowByUrl, insertSourceRealmInRegistry, } from '../lib/realm-registry-writes'; -import { withRealmWriteLock } from '../lib/realm-advisory-locks'; // CS-10898 regression test: an injected error after a registry-row delete // (but before the lock-holder transaction commits) must roll back BOTH the @@ -64,7 +63,7 @@ module(basename(__filename), function () { ); await assert.rejects( - withRealmWriteLock(dbAdapter, realmURL, async (txQuerier) => { + dbAdapter.withWriteLock(realmURL, async (txQuerier) => { // First DELETE — runs on the lock-holder's pinned querier so it // joins the BEGIN/COMMIT tx instead of auto-committing. await deleteRegistryRowByUrl(dbAdapter, realmURL, txQuerier); @@ -91,7 +90,7 @@ module(basename(__filename), function () { await seedSourceRealm(realmURL, '@cs10898both:localhost', 'disk-both'); await assert.rejects( - withRealmWriteLock(dbAdapter, realmURL, async (txQuerier) => { + dbAdapter.withWriteLock(realmURL, async (txQuerier) => { await deleteRegistryRowByUrl(dbAdapter, realmURL, txQuerier); await removeRealmPermissions(dbAdapter, new URL(realmURL), txQuerier); // Throw after both DELETEs — both should roll back atomically. @@ -114,7 +113,7 @@ module(basename(__filename), function () { const realmURL = 'http://localhost:4201/cs10898/commit/'; await seedSourceRealm(realmURL, '@cs10898ok:localhost', 'disk-commit'); - await withRealmWriteLock(dbAdapter, realmURL, async (txQuerier) => { + await dbAdapter.withWriteLock(realmURL, async (txQuerier) => { await deleteRegistryRowByUrl(dbAdapter, realmURL, txQuerier); await removeRealmPermissions(dbAdapter, new URL(realmURL), txQuerier); }); diff --git a/packages/realm-server/tests/screenshot-card-test.ts b/packages/realm-server/tests/screenshot-card-test.ts index fd29f312f1f..605a92ab245 100644 --- a/packages/realm-server/tests/screenshot-card-test.ts +++ b/packages/realm-server/tests/screenshot-card-test.ts @@ -33,6 +33,9 @@ module(basename(__filename), function () { async getColumnNames() { return []; }, + async withWriteLock(_url, fn) { + return await fn(undefined); + }, }; } diff --git a/packages/runtime-common/db.ts b/packages/runtime-common/db.ts index 9f327683a72..f1de9901a56 100644 --- a/packages/runtime-common/db.ts +++ b/packages/runtime-common/db.ts @@ -1,4 +1,5 @@ import type { PgPrimitive } from './index'; +import type { Querier } from './expression'; export interface TypeCoercion { [column: string]: 'BOOLEAN' | 'JSON' | 'VARCHAR'; @@ -25,4 +26,15 @@ export interface DBAdapter { // the caller must treat it as fire-and-forget cache-coherency, never as // delivery-guaranteed messaging. notify: (channel: string, payload: string) => Promise; + // Per-realm write-lock primitive. PgAdapter implements with + // `pg_advisory_xact_lock(hash64(realmUrl))` on a pinned-connection + // transaction so concurrent same-URL callers across replicas serialize; + // different-URL callers run in parallel. SQLite has no cross-connection + // concurrency to coordinate, so it's a passthrough (`txQuerier` is + // undefined). See PgAdapter.withWriteLock for the full design notes + // (re-entrancy, pool exhaustion, rollback semantics). + withWriteLock: ( + realmUrl: string, + fn: (txQuerier: Querier | undefined) => Promise, + ) => Promise; } diff --git a/packages/runtime-common/tests/run-command-task-shared-tests.ts b/packages/runtime-common/tests/run-command-task-shared-tests.ts index 3784a2f5f5b..11a36873a83 100644 --- a/packages/runtime-common/tests/run-command-task-shared-tests.ts +++ b/packages/runtime-common/tests/run-command-task-shared-tests.ts @@ -23,6 +23,7 @@ function makeDBAdapter( }, close: async () => {}, getColumnNames: async () => [], + withWriteLock: async (_url, fn) => fn(undefined), }; }