diff --git a/packages/realm-server/tests/data-plane-write-lock-test.ts b/packages/realm-server/tests/data-plane-write-lock-test.ts new file mode 100644 index 0000000000..14b2bc95ec --- /dev/null +++ b/packages/realm-server/tests/data-plane-write-lock-test.ts @@ -0,0 +1,214 @@ +import { module, test } from 'qunit'; +import type { Test, SuperTest } from 'supertest'; +import { basename } from 'path'; +import type { Server } from 'http'; +import type { DirResult } from 'tmp'; +import type { Realm } from '@cardstack/runtime-common'; +import { + SupportedMimeType, + type LooseSingleCardDocument, +} from '@cardstack/runtime-common'; +import { + setupPermissionedRealmCached, + createJWT, + type RealmRequest, + withRealmPath, +} from './helpers'; +import '@cardstack/runtime-common/helpers/code-equality-assertion'; + +// CS-11125 regression coverage: per-realm advisory lock on the data-plane +// write paths. These tests reach the realm-server HTTP API the same way a +// real client would, and rely on Node's event-loop interleaving + the +// Postgres advisory lock to serialize critical sections that pre-CS-11125 +// raced. Without the lock applied to PATCH and /_atomic, the assertions +// below detect the lost update / TOCTOU directly. +module(basename(__filename), function () { + module('CS-11125: data-plane write serialization', function () { + let realmURL = new URL('http://127.0.0.1:4444/test/'); + let testRealm: Realm; + let request: RealmRequest; + + function onRealmSetup(args: { + testRealm: Realm; + testRealmHttpServer: Server; + request: SuperTest; + dir: DirResult; + }) { + testRealm = args.testRealm; + request = withRealmPath(args.request, realmURL); + } + + module('concurrent PATCH on same card', function (hooks) { + setupPermissionedRealmCached(hooks, { + fixture: 'simple', + realmURL, + permissions: { + '*': ['read', 'write'], + '@node-test_realm:localhost': ['read', 'realm-owner'], + }, + onRealmSetup, + }); + + test('two concurrent PATCHes preserve both non-overlapping field changes', async function (assert) { + // Replica A patches `firstName`; Replica B patches a different + // attribute path (`cardInfo.summary`). Pre-CS-11125, both + // handlers would read `indexEntry.instance` from the same + // pre-state, compute independent merges, and the second writer + // would clobber the first's field — last-writer-wins on disk + // with the loser's change silently dropped. With the per-realm + // advisory lock around the indexEntry read + writeMany, the + // second PATCH waits until the first commits, then sees the + // updated `original` and merges on top of it. + let auth = `Bearer ${createJWT(testRealm, 'user', ['read', 'write'])}`; + let firstNamePatch = request + .patch('/person-1') + .set('Accept', SupportedMimeType.CardJson) + .set('Authorization', auth) + .send({ + data: { + type: 'card', + attributes: { + firstName: 'ConcurrentA', + }, + meta: { + adoptsFrom: { module: './person.gts', name: 'Person' }, + }, + }, + }); + let summaryPatch = request + .patch('/person-1') + .set('Accept', SupportedMimeType.CardJson) + .set('Authorization', auth) + .send({ + data: { + type: 'card', + attributes: { + cardInfo: { + summary: 'ConcurrentB summary', + }, + }, + meta: { + adoptsFrom: { module: './person.gts', name: 'Person' }, + }, + }, + }); + + let [firstNameResponse, summaryResponse] = await Promise.all([ + firstNamePatch, + summaryPatch, + ]); + + assert.strictEqual( + firstNameResponse.status, + 200, + 'firstName PATCH succeeds', + ); + assert.strictEqual( + summaryResponse.status, + 200, + 'summary PATCH succeeds', + ); + + // Final state on disk must reflect BOTH patches. Without the + // lock the second writer would have written a merge based on + // the pre-first-writer state and dropped the other field. + let finalResponse = await request + .get('/person-1') + .set('Accept', SupportedMimeType.CardJson) + .set('Authorization', auth); + assert.strictEqual(finalResponse.status, 200, 'final GET succeeds'); + let finalDoc = finalResponse.body as LooseSingleCardDocument; + assert.strictEqual( + finalDoc.data.attributes?.firstName, + 'ConcurrentA', + 'firstName from PATCH A persisted', + ); + let cardInfo = finalDoc.data.attributes?.cardInfo as + | { summary?: string } + | undefined; + assert.strictEqual( + cardInfo?.summary, + 'ConcurrentB summary', + 'cardInfo.summary from PATCH B persisted', + ); + }); + }); + + module('concurrent /_atomic add on same href', function (hooks) { + setupPermissionedRealmCached(hooks, { + fixture: 'simple', + realmURL, + permissions: { + '*': ['read', 'write'], + '@node-test_realm:localhost': ['read', 'realm-owner'], + }, + onRealmSetup, + }); + + test('exactly one of two concurrent `add` ops succeeds; the other gets 409', async function (assert) { + // Pre-CS-11125, two replicas calling POST /_atomic with op=add + // for the same href could both pass `checkBeforeAtomicWrite` + // (file doesn't exist yet on either) and proceed to write — + // last writer silently wins on disk and `boxel_index` ends up + // interleaved. With the per-realm advisory lock taken at the + // handler entry, the second request blocks until the first + // commits, then re-runs the exists check inside the lock and + // correctly returns 409. + let auth = `Bearer ${createJWT(testRealm, 'user', ['read', 'write'])}`; + let makeAddBody = (firstName: string) => ({ + 'atomic:operations': [ + { + op: 'add', + href: 'concurrent-add.json', + data: { + type: 'card', + attributes: { firstName }, + meta: { + adoptsFrom: { module: './person.gts', name: 'Person' }, + }, + }, + }, + ], + }); + // `?waitForIndex=true` makes the POST return only once indexing + // has caught up to the write. Without it, `/_atomic` resolves on + // durability and the GET below races the indexer (404 or stale). + let postA = request + .post('/_atomic?waitForIndex=true') + .set('Accept', SupportedMimeType.JSONAPI) + .set('Authorization', auth) + .send(JSON.stringify(makeAddBody('AddA'))); + let postB = request + .post('/_atomic?waitForIndex=true') + .set('Accept', SupportedMimeType.JSONAPI) + .set('Authorization', auth) + .send(JSON.stringify(makeAddBody('AddB'))); + + let [respA, respB] = await Promise.all([postA, postB]); + + let statuses = [respA.status, respB.status].sort(); + assert.deepEqual( + statuses, + [201, 409], + 'exactly one add succeeds (201), the other is rejected (409); without the lock both would 201 and silently last-writer-win', + ); + + // The card that did land must come from the winning request, + // not a torn write. Both POSTs above used waitForIndex=true so + // the read here is deterministic. + let winning = respA.status === 201 ? 'AddA' : 'AddB'; + let finalResponse = await request + .get('/concurrent-add') + .set('Accept', SupportedMimeType.CardJson) + .set('Authorization', auth); + assert.strictEqual(finalResponse.status, 200, 'card readable'); + let finalDoc = finalResponse.body as LooseSingleCardDocument; + assert.strictEqual( + finalDoc.data.attributes?.firstName, + winning, + 'final card matches the winning concurrent add', + ); + }); + }); + }); +}); diff --git a/packages/realm-server/tests/index.ts b/packages/realm-server/tests/index.ts index 79c33626f8..7992f21533 100644 --- a/packages/realm-server/tests/index.ts +++ b/packages/realm-server/tests/index.ts @@ -196,6 +196,7 @@ const ALL_TEST_FILES: string[] = [ './realm-endpoints/dependencies-test', './realm-advisory-locks-test', './realm-cleanup-transaction-test', + './data-plane-write-lock-test', './realm-registry-backfill-test', './realm-registry-reconciler-test', './realm-registry-writes-test', diff --git a/packages/runtime-common/realm.ts b/packages/runtime-common/realm.ts index d04568faaf..b29a7938db 100644 --- a/packages/runtime-common/realm.ts +++ b/packages/runtime-common/realm.ts @@ -1351,6 +1351,9 @@ export class Realm { // handler's vantage point. Different from `__testOnlyClearCaches` // in that it does NOT reset the transpile counter (which is // test-only diagnostic state, unrelated to byte-correctness). + // CS-11156 will replace the publish handler's local call here with + // a cross-replica NOTIFY broadcast; this method stays as the + // bulk-invalidate primitive the receiver invokes. clearLocalCaches(): void { this.#sourceCache.clear(); this.#dropAllModuleCacheEntries(); @@ -1512,12 +1515,26 @@ export class Realm { return this.#adapter.createJWT(claims, expiration, this.#realmSecretSeed); } + // Public mutation entry points (`write`, `writeMany`, `delete`, `deleteAll`) + // serialize concurrent same-URL writers across replicas via the per-realm + // advisory lock. The lock spans the FS write + index update so two + // replicas can't both commit on top of the same pre-state. + // + // HTTP route handlers in this file that need their READ to be inside the + // same critical section as the write (the `/_atomic` precheck and + // `patchCardInstance`'s indexEntry read) take the lock themselves at the + // handler boundary and invoke `_batchWriteUnlocked` directly — re-entering + // through the public methods would deadlock (a second + // `pg_advisory_xact_lock` on the same key would block on its own pinned + // pool connection). async write( path: LocalPath, contents: string | Uint8Array, options?: WriteOptions, ): Promise { - let results = await this._batchWrite(new Map([[path, contents]]), options); + let results = await this.#dbAdapter.withWriteLock(this.url, () => + this._batchWriteUnlocked(new Map([[path, contents]]), options), + ); return results[0]; } @@ -1525,10 +1542,12 @@ export class Realm { files: Map, options?: WriteOptions, ): Promise { - return this._batchWrite(files, options); + return this.#dbAdapter.withWriteLock(this.url, () => + this._batchWriteUnlocked(files, options), + ); } - private async _batchWrite( + private async _batchWriteUnlocked( files: Map, options?: WriteOptions, ): Promise { @@ -1906,158 +1925,172 @@ export class Realm { }); } let atomicOperations = json['atomic:operations'] as AtomicOperation[]; - let atomicCheckErrors = await this.checkBeforeAtomicWrite(atomicOperations); - if (atomicCheckErrors.length > 0) { - return createResponse({ - body: JSON.stringify({ errors: atomicCheckErrors }), - init: { - status: this.lowestStatusCode(atomicCheckErrors), - headers: { 'content-type': SupportedMimeType.JSONAPI }, - }, - requestContext, - }); - } - let operations = filterAtomicOperations(atomicOperations); - let files = new Map(); - let writeResults: FileWriteResult[] = []; - - for (let operation of operations) { - let resource = operation.data; - let href = operation.href; - let localPath = this.paths.local(new URL(href, this.paths.url)); - let exists = await this.#adapter.exists(localPath); - if (operation.op === 'add' && exists) { + // Take the per-realm advisory lock from the precheck through the + // write. Without this, two replicas could both pass + // `checkBeforeAtomicWrite` for the same `add` operation (file does + // not exist), then both proceed to write — last writer wins on disk + // but indexer state is incoherent. Inside the lock we invoke + // `_batchWriteUnlocked` directly to avoid re-acquiring the same + // advisory lock through `writeMany` (which would deadlock on a + // different pinned pool connection). + return await this.#dbAdapter.withWriteLock(this.url, async () => { + let atomicCheckErrors = + await this.checkBeforeAtomicWrite(atomicOperations); + if (atomicCheckErrors.length > 0) { return createResponse({ - body: JSON.stringify({ - errors: [ - { - title: 'Resource already exists', - detail: `Resource ${href} already exists`, - status: 409, - }, - ], - }), + body: JSON.stringify({ errors: atomicCheckErrors }), init: { - status: 409, - headers: { 'content-type': SupportedMimeType.JSONAPI }, - }, - requestContext, - }); - } - if (operation.op === 'update' && !exists) { - return createResponse({ - body: JSON.stringify({ - errors: [ - { - title: 'Resource does not exist', - detail: `Resource ${href} does not exist`, - status: 404, - }, - ], - }), - init: { - status: 404, + status: this.lowestStatusCode(atomicCheckErrors), headers: { 'content-type': SupportedMimeType.JSONAPI }, }, requestContext, }); } - if (isModuleResource(resource)) { - let content = resource.attributes?.content ?? ''; - this.assertWriteSize(content, 'file'); - files.set(localPath, content); - } else if (isCardResource(resource)) { - let doc = { - data: resource, - }; - let jsonString = JSON.stringify(doc, null, 2); - this.assertWriteSize(jsonString, 'card'); - files.set(localPath, jsonString); - } else { - return createResponse({ - body: JSON.stringify({ - errors: [ - { - status: 400, - title: 'Invalid resource', - detail: `Operation data is not a valid card resource or module resource`, - }, - ], - }), - init: { - status: 400, - headers: { 'content-type': SupportedMimeType.JSONAPI }, - }, - requestContext, - }); + + let operations = filterAtomicOperations(atomicOperations); + let files = new Map(); + let writeResults: FileWriteResult[] = []; + + for (let operation of operations) { + let resource = operation.data; + let href = operation.href; + let localPath = this.paths.local(new URL(href, this.paths.url)); + let exists = await this.#adapter.exists(localPath); + if (operation.op === 'add' && exists) { + return createResponse({ + body: JSON.stringify({ + errors: [ + { + title: 'Resource already exists', + detail: `Resource ${href} already exists`, + status: 409, + }, + ], + }), + init: { + status: 409, + headers: { 'content-type': SupportedMimeType.JSONAPI }, + }, + requestContext, + }); + } + if (operation.op === 'update' && !exists) { + return createResponse({ + body: JSON.stringify({ + errors: [ + { + title: 'Resource does not exist', + detail: `Resource ${href} does not exist`, + status: 404, + }, + ], + }), + init: { + status: 404, + headers: { 'content-type': SupportedMimeType.JSONAPI }, + }, + requestContext, + }); + } + if (isModuleResource(resource)) { + let content = resource.attributes?.content ?? ''; + this.assertWriteSize(content, 'file'); + files.set(localPath, content); + } else if (isCardResource(resource)) { + let doc = { + data: resource, + }; + let jsonString = JSON.stringify(doc, null, 2); + this.assertWriteSize(jsonString, 'card'); + files.set(localPath, jsonString); + } else { + return createResponse({ + body: JSON.stringify({ + errors: [ + { + status: 400, + title: 'Invalid resource', + detail: `Operation data is not a valid card resource or module resource`, + }, + ], + }), + init: { + status: 400, + headers: { 'content-type': SupportedMimeType.JSONAPI }, + }, + requestContext, + }); + } } - } - if (files.size > 0) { - try { - // /_atomic returns once writes are durable, not once they are - // indexed. Callers that need indexed state must drain via - // realm.incrementalIndexing() (server-side), wait on the matrix - // 'index' incremental event (client-side), or opt-in to a - // synchronous response by passing `?waitForIndex=true` on the - // POST URL. The query-param path is intended for one-shot CLI / - // agent flows where Matrix subscription is impractical and a - // search poll-loop would race indexing latency. Mixed - // module+instance batches are still serialized correctly: the - // in-loop intermediate flush in _batchWrite at the - // lastWriteType === 'module' && currentWriteType === 'instance' - // gate is always awaited, so an instance's fileSerialization - // sees its module already indexed. - let waitForIndex = - new URL(request.url).searchParams.get('waitForIndex') === 'true'; - writeResults = await this.writeMany(files, { - clientRequestId: request.headers.get('X-Boxel-Client-Request-Id'), - serializeFile: true, - waitForIndex, - }); - } catch (e: any) { - if (e instanceof CardError) { - return responseWithError(e, requestContext); + if (files.size > 0) { + try { + // /_atomic returns once writes are durable, not once they are + // indexed. Callers that need indexed state must drain via + // realm.incrementalIndexing() (server-side), wait on the + // matrix 'index' incremental event (client-side), or opt-in + // to a synchronous response by passing `?waitForIndex=true` + // on the POST URL. The query-param path is intended for + // one-shot CLI / agent flows where Matrix subscription is + // impractical and a search poll-loop would race indexing + // latency. Mixed module+instance batches are still + // serialized correctly: the in-loop intermediate flush in + // _batchWriteUnlocked at the `lastWriteType === 'module' && + // currentWriteType === 'instance'` gate is always awaited, + // so an instance's fileSerialization sees its module already + // indexed. + let waitForIndex = + new URL(request.url).searchParams.get('waitForIndex') === 'true'; + writeResults = await this._batchWriteUnlocked(files, { + clientRequestId: request.headers.get('X-Boxel-Client-Request-Id'), + serializeFile: true, + waitForIndex, + }); + } catch (e: any) { + if (e instanceof CardError) { + return responseWithError(e, requestContext); + } + // Log the underlying exception before returning 500 — + // otherwise callers only see "Write Error" and the original + // stack trace is lost, making atomic-batch failures + // effectively undebuggable. + this.#log.error( + `Atomic write failed: ${e.message}\n${e.stack ?? '(no stack)'}`, + ); + return createResponse({ + body: JSON.stringify({ + errors: [{ title: 'Write Error', detail: e.message }], + }), + init: { + status: 500, + headers: { 'content-type': SupportedMimeType.JSONAPI }, + }, + requestContext, + }); } - // Log the underlying exception before returning 500 — otherwise - // callers only see "Write Error" and the original stack trace is - // lost, making atomic-batch failures effectively undebuggable. - this.#log.error( - `Atomic write failed: ${e.message}\n${e.stack ?? '(no stack)'}`, - ); - return createResponse({ - body: JSON.stringify({ - errors: [{ title: 'Write Error', detail: e.message }], - }), - init: { - status: 500, - headers: { 'content-type': SupportedMimeType.JSONAPI }, - }, - requestContext, - }); } - } - let results: AtomicOperationResult[] = writeResults.map( - ({ path, created }) => ({ - data: { - id: this.paths.fileURL(path).href, - }, - meta: { - created, - }, - }), - ); - return createResponse({ - body: JSON.stringify({ 'atomic:results': results }, null, 2), - init: { - status: 201, - headers: { - 'content-type': SupportedMimeType.JSONAPI, + let results: AtomicOperationResult[] = writeResults.map( + ({ path, created }) => ({ + data: { + id: this.paths.fileURL(path).href, + }, + meta: { + created, + }, + }), + ); + return createResponse({ + body: JSON.stringify({ 'atomic:results': results }, null, 2), + init: { + status: 201, + headers: { + 'content-type': SupportedMimeType.JSONAPI, + }, }, - }, - requestContext, + requestContext, + }); }); } @@ -2112,6 +2145,12 @@ export class Realm { } async delete(path: LocalPath): Promise { + await this.#dbAdapter.withWriteLock(this.url, () => + this._deleteUnlocked(path), + ); + } + + private async _deleteUnlocked(path: LocalPath): Promise { let url = this.paths.fileURL(path); this.sendIndexInitiationEvent(url.href); await this.trackOwnWrite(path, { isDelete: true }); @@ -2132,6 +2171,12 @@ export class Realm { } async deleteAll(paths: LocalPath[]): Promise { + await this.#dbAdapter.withWriteLock(this.url, () => + this._deleteAllUnlocked(paths), + ); + } + + private async _deleteAllUnlocked(paths: LocalPath[]): Promise { let urls: URL[] = []; let trackPromises: Promise[] = []; let removePromises: Promise[] = []; @@ -3926,7 +3971,6 @@ export class Realm { if (await this.nonJsonFileExists(localPath)) { return unsupportedMediaType(request, requestContext); } - let primarySerialization: LooseSingleCardDocument | undefined; if (localPath.startsWith('_')) { return methodNotAllowed(request, requestContext); } @@ -3936,12 +3980,6 @@ export class Realm { let url = this.paths.fileURL(localPath); let instanceURL = url.href.replace(/\.json$/, ''); - let indexEntry = await this.#realmIndexQueryEngine.instance(url, { - includeErrors: true, - }); - if (!indexEntry) { - return notFound(request, requestContext); - } let { data: patch, included: maybeIncluded } = await request.json(); if (!isCardResource(patch)) { @@ -3966,242 +4004,267 @@ export class Realm { } } } - let original = cloneDeep( - indexEntry.instance ?? { - type: 'card', - meta: { adoptsFrom: patch.meta.adoptsFrom }, - }, - ) as CardResource; - original.meta ??= { adoptsFrom: patch.meta.adoptsFrom }; - original.meta.adoptsFrom = - original.meta.adoptsFrom ?? patch.meta.adoptsFrom; - delete original.meta.lastModified; - let originalClone = cloneDeep(original); - if ( - originalClone.meta?.adoptsFrom && - internalKeyFor(patch.meta.adoptsFrom, url) !== - internalKeyFor(originalClone.meta.adoptsFrom, url) - ) { - return badRequest({ - message: `Cannot change card instance type to ${JSON.stringify( - patch.meta.adoptsFrom, - )}`, - requestContext, - id: instanceURL, + // CS-11125: serialize concurrent PATCHes against the same realm so the + // indexEntry read, merge, and write are all inside one critical + // section. Without the lock, two replicas could both read the same + // `original` from boxel_index, compute independent merges, and the + // second writer's merge would silently lose the first's changes. + // writeMany below uses the default `waitForIndex: true`, so once the + // lock releases the index reflects the just-committed state and the + // next waiter's `indexEntry` read sees it. + // + // Inside the lock we invoke `_batchWriteUnlocked` rather than the + // public `writeMany` — re-entering the lock through the public method + // would block on a different pinned pool connection. + return await this.#dbAdapter.withWriteLock(this.url, async () => { + let primarySerialization: LooseSingleCardDocument | undefined; + let indexEntry = await this.#realmIndexQueryEngine.instance(url, { + includeErrors: true, }); - } - let included = (maybeIncluded ?? []) as CardResource[]; + if (!indexEntry) { + return notFound(request, requestContext); + } + let original = cloneDeep( + indexEntry.instance ?? { + type: 'card', + meta: { adoptsFrom: patch.meta.adoptsFrom }, + }, + ) as CardResource; + original.meta ??= { adoptsFrom: patch.meta.adoptsFrom }; + original.meta.adoptsFrom = + original.meta.adoptsFrom ?? patch.meta.adoptsFrom; + delete original.meta.lastModified; + let originalClone = cloneDeep(original); - delete (patch as any).type; - delete (patch as any).meta.realmInfo; - delete (patch as any).meta.realmURL; + if ( + originalClone.meta?.adoptsFrom && + internalKeyFor(patch.meta.adoptsFrom, url) !== + internalKeyFor(originalClone.meta.adoptsFrom, url) + ) { + return badRequest({ + message: `Cannot change card instance type to ${JSON.stringify( + patch.meta.adoptsFrom, + )}`, + requestContext, + id: instanceURL, + }); + } + let included = (maybeIncluded ?? []) as CardResource[]; - promoteLocalIdsToRemoteIds({ - resource: patch, - included, - realmURL: new URL(this.url), - }); + delete (patch as any).type; + delete (patch as any).meta.realmInfo; + delete (patch as any).meta.realmURL; - let primaryResource = mergeWith( - originalClone, - patch, - (_objectValue: any, sourceValue: any) => { - // a patched array should overwrite the original array instead of merging - // into an original array, otherwise we won't be able to remove items in - // the original array - return Array.isArray(sourceValue) ? sourceValue : undefined; - }, - ); + promoteLocalIdsToRemoteIds({ + resource: patch, + included, + realmURL: new URL(this.url), + }); - if (primaryResource.relationships || patch.relationships) { - let merged = mergeRelationships( - primaryResource.relationships, - patch.relationships, + let primaryResource = mergeWith( + originalClone, + patch, + (_objectValue: any, sourceValue: any) => { + // a patched array should overwrite the original array instead of merging + // into an original array, otherwise we won't be able to remove items in + // the original array + return Array.isArray(sourceValue) ? sourceValue : undefined; + }, ); - if (merged && Object.keys(merged).length !== 0) { - primaryResource.relationships = merged; + if (primaryResource.relationships || patch.relationships) { + let merged = mergeRelationships( + primaryResource.relationships, + patch.relationships, + ); + + if (merged && Object.keys(merged).length !== 0) { + primaryResource.relationships = merged; + } } - } - // If the patch makes no semantic changes and doesn't include side-loaded - // resources, short-circuit to avoid touching the file (and changing mtime). - if (included.length === 0 && isEqual(primaryResource, original)) { - let entry = await this.#realmIndexQueryEngine.cardDocument( - new URL(instanceURL), - { loadLinks: true }, - ); - if (entry && entry.type !== 'error') { - let existingDoc = merge({}, entry.doc, { - data: { - links: { self: instanceURL }, - meta: { lastModified: entry.doc.data.meta.lastModified }, - }, - }); - let createdAt = await this.getCreatedTime( - this.paths.local(url) + '.json', + // If the patch makes no semantic changes and doesn't include side-loaded + // resources, short-circuit to avoid touching the file (and changing mtime). + if (included.length === 0 && isEqual(primaryResource, original)) { + let entry = await this.#realmIndexQueryEngine.cardDocument( + new URL(instanceURL), + { loadLinks: true }, ); - // entry.doc came from cardDocument(), which already called - // attachRealmInfo() and (re)populated the realm-info cache — - // so the cached hash is current as of this response. - await this.getRealmInfo(); - let foreignDeps = this.hasForeignRealmDeps(entry.deps); - let etag = foreignDeps - ? undefined - : buildCardJsonEtag(entry.indexedAt, this.getCachedRealmInfoHash()); - return createResponse({ - body: JSON.stringify(existingDoc, null, 2), - init: { - headers: { - 'content-type': SupportedMimeType.CardJson, - 'cache-control': this.cardJsonCacheControl(requestContext), - ...(etag ? { etag } : {}), - ...etagSuppressedHeader(foreignDeps), - ...lastModifiedHeader(existingDoc), - ...(createdAt != null - ? { 'x-created': formatRFC7231(createdAt * 1000) } - : {}), + if (entry && entry.type !== 'error') { + let existingDoc = merge({}, entry.doc, { + data: { + links: { self: instanceURL }, + meta: { lastModified: entry.doc.data.meta.lastModified }, }, - }, - requestContext, - }); + }); + let createdAt = await this.getCreatedTime( + this.paths.local(url) + '.json', + ); + // entry.doc came from cardDocument(), which already called + // attachRealmInfo() and (re)populated the realm-info cache — + // so the cached hash is current as of this response. + await this.getRealmInfo(); + let foreignDeps = this.hasForeignRealmDeps(entry.deps); + let etag = foreignDeps + ? undefined + : buildCardJsonEtag(entry.indexedAt, this.getCachedRealmInfoHash()); + return createResponse({ + body: JSON.stringify(existingDoc, null, 2), + init: { + headers: { + 'content-type': SupportedMimeType.CardJson, + 'cache-control': this.cardJsonCacheControl(requestContext), + ...(etag ? { etag } : {}), + ...etagSuppressedHeader(foreignDeps), + ...lastModifiedHeader(existingDoc), + ...(createdAt != null + ? { 'x-created': formatRFC7231(createdAt * 1000) } + : {}), + }, + }, + requestContext, + }); + } } - } - delete (primaryResource as any).id; // don't write the ID to the file - let files = new Map(); - let resources = [primaryResource, ...included]; - for (let [i, resource] of resources.entries()) { - if ( - (i > 0 && typeof resource.lid !== 'string') || - (resource.meta.realmURL && resource.meta.realmURL !== this.url) - ) { - continue; - } - let name = getCardDirectoryName(resource.meta?.adoptsFrom, this.paths); - let fileURL = - i === 0 - ? new URL(`${url}.json`) - : this.paths.fileURL( - `/${join(new URL(this.url).pathname, name, (resource.lid ?? uuidV4()) + '.json')}`, + delete (primaryResource as any).id; // don't write the ID to the file + let files = new Map(); + let resources = [primaryResource, ...included]; + for (let [i, resource] of resources.entries()) { + if ( + (i > 0 && typeof resource.lid !== 'string') || + (resource.meta.realmURL && resource.meta.realmURL !== this.url) + ) { + continue; + } + let name = getCardDirectoryName(resource.meta?.adoptsFrom, this.paths); + let fileURL = + i === 0 + ? new URL(`${url}.json`) + : this.paths.fileURL( + `/${join(new URL(this.url).pathname, name, (resource.lid ?? uuidV4()) + '.json')}`, + ); + // we already did this one + if (i !== 0) { + promoteLocalIdsToRemoteIds({ + resource, + included, + realmURL: new URL(this.url), + }); + visitModuleDeps(resource, (moduleURL, setModuleURL) => { + setModuleURL( + resolveCardReference( + moduleURL, + instanceURL, + ) as RealmResourceIdentifier, ); - // we already did this one - if (i !== 0) { - promoteLocalIdsToRemoteIds({ - resource, - included, - realmURL: new URL(this.url), - }); - visitModuleDeps(resource, (moduleURL, setModuleURL) => { - setModuleURL( - resolveCardReference( - moduleURL, - instanceURL, - ) as RealmResourceIdentifier, + }); + } + let fileSerialization: LooseSingleCardDocument | undefined; + try { + fileSerialization = await this.fileSerialization( + { + data: merge(resource, { meta: { realmURL: this.url } }), + }, + fileURL, ); - }); + } catch (err: any) { + if (err.message.startsWith('field validation error')) { + return badRequest({ + message: err.message, + requestContext, + id: instanceURL, + }); + } else { + return systemError({ + requestContext, + message: err.message, + additionalError: err, + id: instanceURL, + }); + } + } + let path = this.paths.local(fileURL); + files.set(path, JSON.stringify(fileSerialization, null, 2)); + if (i === 0) { + primarySerialization = fileSerialization; + } } - let fileSerialization: LooseSingleCardDocument | undefined; - try { - fileSerialization = await this.fileSerialization( - { - data: merge(resource, { meta: { realmURL: this.url } }), - }, - fileURL, - ); - } catch (err: any) { - if (err.message.startsWith('field validation error')) { - return badRequest({ - message: err.message, - requestContext, - id: instanceURL, - }); + // Use the unlocked inner write so we don't re-enter + // withWriteLock (which would block on a different pinned pool + // connection). + let [{ lastModified, created }] = await this._batchWriteUnlocked(files, { + clientRequestId: request.headers.get('X-Boxel-Client-Request-Id'), + }); + let entry = await this.#realmIndexQueryEngine.cardDocument( + new URL(instanceURL), + { + loadLinks: true, + }, + ); + let doc: SingleCardDocument; + if (!entry || entry?.type === 'error') { + if ( + primarySerialization && + isBrowserTestEnv() && + !(globalThis as any).__emulateServerPatchFailure + ) { + doc = merge({}, primarySerialization, { + data: { + id: instanceURL, + links: { self: instanceURL }, + meta: { + ...(primarySerialization.data.meta ?? {}), + lastModified, + }, + }, + }) as SingleCardDocument; } else { return systemError({ requestContext, - message: err.message, - additionalError: err, + message: `Unable to index card: can't find patched instance, ${instanceURL} in index`, id: instanceURL, + additionalError: entry + ? CardError.fromSerializableError(entry.error) + : undefined, }); } - } - let path = this.paths.local(fileURL); - files.set(path, JSON.stringify(fileSerialization, null, 2)); - if (i === 0) { - primarySerialization = fileSerialization; - } - } - let [{ lastModified, created }] = await this.writeMany(files, { - clientRequestId: request.headers.get('X-Boxel-Client-Request-Id'), - }); - let entry = await this.#realmIndexQueryEngine.cardDocument( - new URL(instanceURL), - { - loadLinks: true, - }, - ); - let doc: SingleCardDocument; - if (!entry || entry?.type === 'error') { - if ( - primarySerialization && - isBrowserTestEnv() && - !(globalThis as any).__emulateServerPatchFailure - ) { - doc = merge({}, primarySerialization, { + } else { + doc = merge({}, entry.doc, { data: { - id: instanceURL, links: { self: instanceURL }, - meta: { - ...(primarySerialization.data.meta ?? {}), - lastModified, - }, + meta: { lastModified }, }, - }) as SingleCardDocument; - } else { - return systemError({ - requestContext, - message: `Unable to index card: can't find patched instance, ${instanceURL} in index`, - id: instanceURL, - additionalError: entry - ? CardError.fromSerializableError(entry.error) - : undefined, }); } - } else { - doc = merge({}, entry.doc, { - data: { - links: { self: instanceURL }, - meta: { lastModified }, + // Same rationale as the no-op short-circuit branch above: + // cardDocument() above primed the realm-info cache via + // attachRealmInfo(), but only when entry was a non-error doc. + // On the error fallback we may still need to populate it. + await this.getRealmInfo(); + let foreignDeps = + entry && entry.type !== 'error' + ? this.hasForeignRealmDeps(entry.deps) + : false; + let etag = + entry && entry.type !== 'error' && !foreignDeps + ? buildCardJsonEtag(entry.indexedAt, this.getCachedRealmInfoHash()) + : undefined; + return createResponse({ + body: JSON.stringify(doc, null, 2), + init: { + headers: { + 'content-type': SupportedMimeType.CardJson, + 'cache-control': this.cardJsonCacheControl(requestContext), + ...(etag ? { etag } : {}), + ...etagSuppressedHeader(foreignDeps), + ...lastModifiedHeader(doc), + ...(created ? { 'x-created': formatRFC7231(created * 1000) } : {}), + }, }, + requestContext, }); - } - // Same rationale as the no-op short-circuit branch above: - // cardDocument() above primed the realm-info cache via - // attachRealmInfo(), but only when entry was a non-error doc. - // On the error fallback we may still need to populate it. - await this.getRealmInfo(); - let foreignDeps = - entry && entry.type !== 'error' - ? this.hasForeignRealmDeps(entry.deps) - : false; - let etag = - entry && entry.type !== 'error' && !foreignDeps - ? buildCardJsonEtag(entry.indexedAt, this.getCachedRealmInfoHash()) - : undefined; - return createResponse({ - body: JSON.stringify(doc, null, 2), - init: { - headers: { - 'content-type': SupportedMimeType.CardJson, - 'cache-control': this.cardJsonCacheControl(requestContext), - ...(etag ? { etag } : {}), - ...etagSuppressedHeader(foreignDeps), - ...lastModifiedHeader(doc), - ...(created ? { 'x-created': formatRFC7231(created * 1000) } : {}), - }, - }, - requestContext, }); }