Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions packages/runner/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { refer } from "merkle-reference";
import { Immutable, isRecord } from "@commontools/utils/types";
import type {
FactAddress,
JSONValue,
MemorySpace,
SchemaContext,
Expand Down Expand Up @@ -494,10 +495,21 @@ export class Storage implements IStorage {
return;
}

// If we're already dirty, we don't need to add a promise
const docKey = `${doc.space}/${uri}`;
if (this.dirtyDocs.has(docKey)) {
return;
}
this.dirtyDocs.add(docKey);

// Track these promises for our synced call.
// We may have linked docs that storage doesn't know about
const storageProvider = this._getStorageProviderForSpace(doc.space);
const { missing } = this._queryLocal(doc.space, uri, storageProvider);
const { missing, loaded } = this._queryLocal(
Copy link
Contributor Author

@ubik2 ubik2 Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By doing the queryLocal here, before the await runtime.idle in sendDocValue, it's possible that the set of linked documents could change between this check and the value capture in sendDocValue, meaning that we may link to different docs by the time we're in there.
This PR is mostly a stopgap, though, since a transaction should contain better information about linked docs.
As mentioned below, there's also an issue with the timing of pulling cells we are linked to, but don't already have.
This can happen if you change the schema of a cell to include docs that were previously excluded. It can also occur if you have a well known cause that you used to create a cell (so you have a local unsynced copy and there's also a copy on the server).

doc.space,
uri,
storageProvider,
);
// Any missing docs need to be linked up
for (const factAddress of missing) {
// missing docs have been created in our doc map, but storage doesn't
Expand All @@ -507,25 +519,33 @@ export class Storage implements IStorage {
doc.space,
factAddress.of,
);
// we don't need to await this, since by the time we've resolved our
logger.info(() => ["calling sync cell on missing doc", factAddress.of]);
// We do need to call syncCell, because we may have pushed a new cell
// without calling syncCell (e.g. map will create these cells).
// I can modify the DocImpl class to automatically relay the doc updates
// to storage, but I also need to have storage update the docs.
// We don't need to await this, since by the time we've resolved our
// docToStoragePromise, we'll have added the loadingPromise.
// Not awaiting means that the cache may need to pull before push,
// and that can re-order writes.
this.syncCell(linkedDoc.asCell());
}

// If we're already dirty, we don't need to add a promise
const docKey = `${doc.space}/${uri}`;
if (this.dirtyDocs.has(docKey)) {
return;
}
this.dirtyDocs.add(docKey);
const docToStoragePromise = this._sendDocValue(doc, labels);
// Our queryLocal call has determined which docs are linked, so make sure
// we're sending those to storage as well. If they're redundant, they will
// be skipped by storage.
const docAddrs = [...missing, ...loaded.values().map((sv) => sv.source)];
const docToStoragePromise = this._sendDocValue(doc, docAddrs, labels);
this.docToStoragePromises.add(docToStoragePromise);
docToStoragePromise.finally(() =>
this.docToStoragePromises.delete(docToStoragePromise)
);
}

private async _sendDocValue(doc: DocImpl<unknown>, labels?: Labels) {
private async _sendDocValue(
doc: DocImpl<unknown>,
docAddrs: FactAddress[],
labels?: Labels,
) {
await this.runtime.idle();

// Wait for all _updateDoc operations to complete, then wait for runtime to
Expand All @@ -539,14 +559,34 @@ export class Storage implements IStorage {

const uri = Storage.toURI(doc.entityId);
const docKey = `${doc.space}/${uri}`;
// We remove the main doc from the dirty list, but not the others
// They can handle that in their own sync
this.dirtyDocs.delete(docKey);

const storageProvider = this._getStorageProviderForSpace(doc.space);

// Create storage value using the helper to ensure consistency
const storageValue = Storage._cellLinkToJSON(doc, labels);

await storageProvider.send([{ uri: uri, value: storageValue }]);
const docsToSend = [];
for (const docAddr of docAddrs) {
const linkedDoc = this.runtime.documentMap.getDocByEntityId(
doc.space,
docAddr.of,
false,
);
// if we don't have a local doc, the server's version should be fine
if (linkedDoc === undefined) {
continue;
}
// The main doc can tell us about the labels applied based on the schema,
// but we aren't carrying that through queryLocal, so we don't know for
// other docs.
// Also, the doc itself could be associated with multiple schemas.
// This is just the one that we used when we wrote this value to the doc.
const linkedLabels = linkedDoc === doc ? labels : undefined;
// Create storage value using the helper to ensure consistency
const linkedValue = Storage._cellLinkToJSON(linkedDoc, linkedLabels);
docsToSend.push({ uri: docAddr.of, value: linkedValue });
}
await storageProvider.send(docsToSend);
}

// Update the doc with the new value we got in storage.
Expand Down