Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 17 additions & 26 deletions packages/realm-server/handlers/create-realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { ensureDirSync, writeJSONSync } from 'fs-extra';
import * as Sentry from '@sentry/node';
import type {
DBAdapter,
QueuePublisher,
Realm,
RealmInfo,
VirtualNetwork,
Expand All @@ -19,7 +18,6 @@ import {
SupportedMimeType,
userInitiatedPriority,
} from '@cardstack/runtime-common';
import { enqueueReindexRealmJob } from '@cardstack/runtime-common/jobs/reindex-realm';
import { getMatrixUsername } from '@cardstack/runtime-common/matrix-client';
import { insertSourceRealmInRegistry } from '../lib/realm-registry-writes';
import type { RealmRegistryReconciler } from '../lib/realm-registry-reconciler';
Expand All @@ -37,7 +35,6 @@ export type CreateRealmDeps = {
serverURL: URL;
realms: Realm[];
dbAdapter: DBAdapter;
queue: QueuePublisher;
virtualNetwork: VirtualNetwork;
realmsRootPath: string;
reconciler: RealmRegistryReconciler;
Expand Down Expand Up @@ -78,7 +75,6 @@ export async function createRealm(
serverURL,
realms,
dbAdapter,
queue,
virtualNetwork,
realmsRootPath,
reconciler,
Expand Down Expand Up @@ -219,30 +215,25 @@ export async function createRealm(
virtualNetwork.addURLMapping(new URL(url), actualRealmURL);
}

// Phase 3: enqueue the from-scratch-index job at userInitiatedPriority
// so the canonical (post-coalesce) job carries that priority — even
// if reconciler.lookupOrMount below also enqueues one at the default
// systemInitiatedPriority via realm.start(). The chooseFromScratch
// coalesce JOINs same-realm jobs and keeps maxPriority.
await enqueueReindexRealmJob(
url,
ownerUsername,
queue,
dbAdapter,
userInitiatedPriority,
);

// Synchronously mount + start the realm on the *handling* instance.
// Mount the realm on the *handling* instance and let the mount
// pipeline itself drive the one-and-only from-scratch-index, at
// userInitiatedPriority so a backed-up queue of system-priority
// jobs (e.g. a deploy-triggered reindex storm) does not stall realm
// creation. lookupOrMount → ensureMounted → realm.start → #startup
// sees `isNewIndex = true` for a freshly-registered realm and
// enqueues exactly one job via publishFullIndex, which also updates
// the realm's in-memory #stats / #ignoreData / #ignoreDataVersion
// when the job completes.
//
// The 202 response with status:'pending' is for sibling instances —
// they pick up the realm via NOTIFY realm_registry and lazy-mount
// on first request. On this instance the realm is fully ready by
// the time we return: ensureMounted publishes into realms[] /
// virtualNetwork via prepareRealmFromRow and awaits realm.start(),
// which awaits the from-scratch-index job. Mounting eagerly here
// also drains the queue locally so the test framework's teardown
// (close server → drain runner → close DB) doesn't race a worker
// mid-fetch on the now-closed HTTP listener.
let realm = await reconciler.lookupOrMount(url);
// on first request. Mounting eagerly here also drains the queue
// locally so the test framework's teardown (close server → drain
// runner → close DB) doesn't race a worker mid-fetch on the now-
// closed HTTP listener.
let realm = await reconciler.lookupOrMount(url, {
fromScratchIndexPriority: userInitiatedPriority,
});
if (!realm) {
throw new Error(
`expected realm ${url} to be mounted after createRealm — registry row missing or mount failed`,
Expand Down
14 changes: 10 additions & 4 deletions packages/realm-server/lib/realm-registry-reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ export class RealmRegistryReconciler {
// would receive a not-yet-started Realm; routing it through the
// in-flight promise instead lets the caller await start() like the
// original requester.
async lookupOrMount(url: string): Promise<Realm | undefined> {
async lookupOrMount(
url: string,
opts?: { fromScratchIndexPriority?: number },
): Promise<Realm | undefined> {
const inflight = this.pendingMounts.get(url);
if (inflight) {
return inflight;
Expand All @@ -308,7 +311,7 @@ export class RealmRegistryReconciler {
}
this.knownByUrl.set(url, row);
}
return this.ensureMounted(row);
return this.ensureMounted(row, opts);
}

async #lookupRow(url: string): Promise<RealmRegistryRow | undefined> {
Expand Down Expand Up @@ -347,7 +350,10 @@ export class RealmRegistryReconciler {
// rollout safety relies on this signal — Loki/Grafana extract cold-
// mount latency, mount failure rate, and pinned-vs-lazy ratios from
// these lines.
async ensureMounted(row: RealmRegistryRow): Promise<Realm> {
async ensureMounted(
row: RealmRegistryRow,
opts?: { fromScratchIndexPriority?: number },
): Promise<Realm> {
// pendingMounts checked before mounted: see lookupOrMount() above.
// The Realm is published into mounted synchronously before its
// start() promise resolves, so a caller hitting the mounted
Expand Down Expand Up @@ -386,7 +392,7 @@ export class RealmRegistryReconciler {
this.#reconcilerOwned.add(row.url);
const promise = (async () => {
try {
await realm.start();
await realm.start(opts);
log.info(
`mount ok url=%s kind=%s pinned=%s duration_ms=%d`,
row.url,
Expand Down
1 change: 0 additions & 1 deletion packages/realm-server/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ export function createRoutes(args: CreateRoutesArgs) {
serverURL: new URL(args.serverURL),
realms: args.realms,
dbAdapter: args.dbAdapter,
queue: args.queue,
virtualNetwork: args.virtualNetwork,
realmsRootPath: args.realmsRootPath,
reconciler: args.reconciler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ module(`server-endpoints/${basename(__filename)}`, function () {
let jobs = (await context.dbAdapter.execute(
`SELECT priority FROM jobs WHERE job_type = 'from-scratch-index' AND args->>'realmURL' = '${json.data.id}'`,
)) as { priority: number }[];
assert.ok(
jobs.length > 0,
'found from-scratch index job for created realm',
);
assert.ok(
jobs.some((j) => j.priority === userInitiatedPriority),
'user initiated realm indexing uses high priority queue',
// Contract: realm creation enqueues exactly one
// from-scratch-index job, at userInitiatedPriority. A second
// job at default priority would block creation behind any
// backlog of lower-priority indexing work.
assert.deepEqual(
jobs.map((j) => j.priority),
[userInitiatedPriority],
'realm creation enqueues exactly one from-scratch index job at userInitiatedPriority',
);

let permissions = await fetchRealmPermissions(
Expand Down
18 changes: 12 additions & 6 deletions packages/runtime-common/realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1296,8 +1296,14 @@ export class Realm {
});
}

async start() {
this.#startedUp.fulfill((() => this.#startup())());
// `fromScratchIndexPriority` overrides the realm's default priority
// for the from-scratch-index job that `#startup` enqueues when the
// realm has no prior index. Callers that mount-on-demand for a
// user-initiated flow (e.g. realm creation) pass
// `userInitiatedPriority` so the resulting job jumps ahead of any
// backlog of system-priority indexing work.
async start(opts?: { fromScratchIndexPriority?: number }) {
this.#startedUp.fulfill((() => this.#startup(opts))());

if (this.#adapter.fileWatcherEnabled) {
await this.startFileWatcher();
Expand Down Expand Up @@ -2174,7 +2180,7 @@ export class Realm {
await completed;
}

async #startup() {
async #startup(opts?: { fromScratchIndexPriority?: number }) {
await Promise.resolve();
let startTime = Date.now();
if (this.#copiedFromRealm) {
Expand All @@ -2188,9 +2194,9 @@ export class Realm {
} else {
let isNewIndex = await this.#realmIndexUpdater.isNewIndex();
if (isNewIndex || this.#fullIndexOnStartup) {
let promise = this.#realmIndexUpdater.fullIndex(
this.#fromScratchIndexPriority,
);
let priority =
opts?.fromScratchIndexPriority ?? this.#fromScratchIndexPriority;
let promise = this.#realmIndexUpdater.fullIndex(priority);
if (isNewIndex) {
// we only await the full indexing at boot if this is a brand new index
await promise;
Expand Down
Loading