Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/bot-runner/tests/bot-runner-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ module('timeline handler', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

queuePublisher = {
Expand Down
7 changes: 7 additions & 0 deletions packages/bot-runner/tests/command-runner-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ module('command runner', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -243,6 +244,7 @@ module('command runner', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -464,6 +466,7 @@ module('command runner', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -559,6 +562,7 @@ module('command runner', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -697,6 +701,7 @@ module('command runner', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -910,6 +915,7 @@ module('command runner', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -1030,6 +1036,7 @@ module('command runner', () => {
},
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down
15 changes: 15 additions & 0 deletions packages/host/app/lib/sqlite-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type DBAdapter,
type PgPrimitive,
type ExecuteOptions,
type Querier,
Deferred,
} from '@cardstack/runtime-common';

Expand Down Expand Up @@ -57,6 +58,20 @@ export default class SQLiteAdapter implements DBAdapter {
// realm with no peers to notify, so this is intentionally a no-op.
async notify(_channel: string, _payload: string): Promise<void> {}

// SQLite has no cross-connection concurrency to coordinate, so this is
// a passthrough — `txQuerier` is always undefined; callers shouldn't
// depend on any transactional grouping here. The PG implementation
// provides the real serialization across replicas. We match the
// `DBAdapter` signature (`Querier | undefined`) rather than narrowing
// to `undefined` so callers of this concrete class get the same
// contextually-typed callback parameter as they would for any DBAdapter.
async withWriteLock<T>(
_realmUrl: string,
fn: (txQuerier: Querier | undefined) => Promise<T>,
): Promise<T> {
return await fn(undefined);
Comment thread
lukemelia marked this conversation as resolved.
}

private async internalExecute(sql: string, opts?: ExecuteOptions) {
sql = this.adjustSQL(sql);
return await this.query(sql, opts);
Expand Down
104 changes: 104 additions & 0 deletions packages/postgres/pg-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,34 @@ import {
type PgPrimitive,
type ExecuteOptions,
type Expression,
type Querier,
expressionToSql,
logger,
param,
} from '@cardstack/runtime-common';
import { createHash } from 'crypto';
import migrate from 'node-pg-migrate';
import { join } from 'path';
import { Pool, Client, type Notification } from 'pg';

import { postgresConfig } from './pg-config';
import migrationNameFixes from './scripts/migration-name-fixes.js';

// Hash a realm URL to a stable signed int64 (as a string, because JS numbers
// can't represent the full int64 range). Used as a pg advisory lock key:
// two writers for the same realm URL hash to the same key and serialize;
// writers for different URLs use different keys and run in parallel.
//
// sha256 is overkill crypto-wise but the cost is negligible, and it gives
// excellent collision resistance at the 10,000+ realm scale the project
// plans for. Returning a string (rather than BigInt) keeps the value
// parameter-compatible with our existing `query` / `param` helpers, which
// don't accept BigInt directly.
export function hashRealmUrlForAdvisoryLock(url: string): string {
const digest = createHash('sha256').update(url).digest();
return digest.readBigInt64BE(0).toString();
}

const log = logger('pg-adapter');

type MigrationNameFixes = {
Expand Down Expand Up @@ -346,6 +364,92 @@ export class PgAdapter implements DBAdapter {
}
}

// Run `fn` while holding a per-realm transaction-scoped Postgres advisory
// lock. The lock is taken with `pg_advisory_xact_lock` inside an explicit
// BEGIN / COMMIT on a pinned pool connection (via withConnection), so the
// lock is automatically released by the transaction's commit or rollback
// — there is no "unlock failed → stale lock on pooled connection" failure
// mode that a session-scoped `pg_advisory_lock` + `pg_advisory_unlock`
// pattern would expose.
//
// Concurrent callers for the same realm URL serialize (the second call
// blocks on the xact-lock until the first's transaction commits/rolls
// back). Callers for different URLs run in parallel — the hash key space
// ensures that.
//
// Reads are NOT gated by this lock; reads hit the DB directly (or go
// through in-memory caches on the realm-server) without acquiring it.
// Read-heavy paths should never call this helper.
//
// Note on the enclosing transaction: `fn` runs inside BEGIN/COMMIT here so
// the advisory lock is correctly scoped, AND so any DELETEs `fn` runs via
// the pinned `txQuerier` argument share that transaction's atomicity.
// CS-10898 plumbed the pinned querier through the realm-destruction
// helpers (removeRealmDatabaseArtifacts, removeRealmPermissions,
// deleteRegistryRowByUrl, deletePublishedRowsBySourceUrl,
// cancelRunningJobsInConcurrencyGroup); when callers pass `txQuerier` to
// those helpers, all their writes commit or roll back together with the
// advisory lock's own transaction. Queries `fn` issues through the shared
// dbAdapter still go via separate pool connections and are NOT part of
// this transaction. The data-plane mutation paths in runtime-common
// (CS-11125) intentionally do not consume `txQuerier`: their inner work
// (writing files via the FS adapter, enqueuing indexing jobs, broadcasting
// NOTIFY events) is not transactional with the lock-holder's connection.
// The lock there serves only to serialize concurrent same-URL writers
// across replicas, not to group DB statements into a single tx.
//
// Pool-exhaustion caveat: when the callback opts into the pinned querier
// for all of its DB work, only one client is checked out for the entire
// critical section. If the callback also issues queries through the
// shared dbAdapter (e.g. existence-check SELECTs), each of those checks
// out an additional pool client briefly. Under N concurrent same-URL
// writers, N-1 block on the advisory lock before doing anything — so
// this method does not itself amplify pool pressure. Under N concurrent
// different-URL writers, each pins one client; if the pool ceiling is
// less than realistic write concurrency, callbacks that need additional
// pool clients could deadlock waiting on the pool. For current scope
// (low realistic write concurrency, pool size >= concurrent writers +
// headroom) this is acceptable.
//
// Re-entrancy: callers MUST NOT re-enter the lock for the same URL while
// already holding it — a second `pg_advisory_xact_lock` on the same key
// would pin a different pool connection and block forever on its own
// transaction. Code that wraps a wider critical section around a method
// that also takes the lock must invoke the unlocked inner variant (e.g.
// realm.ts uses `_batchWriteUnlocked` inside its own withWriteLock).
async withWriteLock<T>(
realmUrl: string,
fn: (txQuerier: Querier | undefined) => Promise<T>,
): Promise<T> {
const lockKey = hashRealmUrlForAdvisoryLock(realmUrl);
return await this.withConnection(async (queryFn) => {
await queryFn(['BEGIN']);
try {
await queryFn([
`SELECT pg_advisory_xact_lock(`,
param(lockKey),
`::bigint)`,
]);
const result = await fn(queryFn);
await queryFn(['COMMIT']);
return result;
} catch (err: unknown) {
try {
await queryFn(['ROLLBACK']);
} catch (rollbackErr: unknown) {
// Rollback failed — the xact-lock is still released when the
// connection's transaction is aborted (pg will auto-rollback on
// client release), so we don't have a stale-lock problem. Log
// for visibility and rethrow the original error.
log.warn(
`ROLLBACK after withWriteLock error for ${realmUrl} failed: ${String(rollbackErr)}`,
);
}
throw err;
}
});
}

async withConnection<T>(
fn: (
query: (e: Expression) => Promise<Record<string, PgPrimitive>[]>,
Expand Down
3 changes: 1 addition & 2 deletions packages/realm-server/handlers/handle-delete-realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import {
deletePublishedRowsBySourceUrl,
deleteRegistryRowByUrl,
} from '../lib/realm-registry-writes';
import { withRealmWriteLock } from '../lib/realm-advisory-locks';

interface DeleteRealmJSON {
data: {
Expand Down Expand Up @@ -161,7 +160,7 @@ export default function handleDeleteRealm({
// soft-delete, source permissions + DB artifacts) commits atomically
// or rolls back atomically. A failure halfway through no longer
// leaves the realm half-deleted.
await withRealmWriteLock(dbAdapter, realmURL, async (txQuerier) => {
await dbAdapter.withWriteLock(realmURL, async (txQuerier) => {
// Delete the published rows first so the RETURNING set is the
// authoritative list of rows the tx will commit. Driving the
// per-published cleanup off this set (instead of a pre-lock
Expand Down
4 changes: 1 addition & 3 deletions packages/realm-server/handlers/handle-publish-realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import { registerUser } from '../synapse';
import { passwordFromSeed } from '@cardstack/runtime-common/matrix-client';
import { enqueueReindexRealmJob } from '@cardstack/runtime-common/jobs/reindex-realm';
import { upsertPublishedRealmInRegistry } from '../lib/realm-registry-writes';
import { withRealmWriteLock } from '../lib/realm-advisory-locks';

const log = logger('handle-publish');

Expand Down Expand Up @@ -398,8 +397,7 @@ export default function handlePublishRealm({
// mounts the (re-)published realm on its first request. The
// response is 202 Accepted with status:'pending'; the client polls
// /<publishedRealmURL>/_readiness-check to learn when it's ready.
let { lastPublishedAt, publishedRealmId } = await withRealmWriteLock(
dbAdapter,
let { lastPublishedAt, publishedRealmId } = await dbAdapter.withWriteLock(
publishedRealmURL,
async () => {
let existingRows = (await query(dbAdapter, [
Expand Down
21 changes: 8 additions & 13 deletions packages/realm-server/handlers/handle-unpublish-realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
removeRealmFiles,
} from './realm-destruction-utils';
import { deleteRegistryRowByUrl } from '../lib/realm-registry-writes';
import { withRealmWriteLock } from '../lib/realm-advisory-locks';

const log = logger('handle-unpublish');

Expand Down Expand Up @@ -137,18 +136,14 @@ export default function handleUnpublishRealm({
// mid-cleanup rolls back both DELETEs together. realms[] /
// virtualNetwork mutation stays in the reconciler's hands — it
// reacts to the registry DELETE + NOTIFY emitted below.
await withRealmWriteLock(
dbAdapter,
publishedRealmURL,
async (txQuerier) => {
await deleteRegistryRowByUrl(dbAdapter, publishedRealmURL, txQuerier);
await removeRealmPermissions(
dbAdapter,
new URL(publishedRealmURL),
txQuerier,
);
},
);
await dbAdapter.withWriteLock(publishedRealmURL, async (txQuerier) => {
await deleteRegistryRowByUrl(dbAdapter, publishedRealmURL, txQuerier);
await removeRealmPermissions(
dbAdapter,
new URL(publishedRealmURL),
txQuerier,
);
});

// FS removal happens after the DB transaction commits. If the rm
// fails, the realm's row + permissions are already gone and the
Expand Down
Loading
Loading