Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/// <cts-enable />
import { type Cell, cell, Default, handler, lift, recipe } from "commontools";

interface Item {
id?: string;
}

const action = handler(
(_event, context: { items: Cell<Item[]>; sequence: Cell<number> }) => {
// Minimal repro: Removing either of these removes the conflict
context.items.set([]);
context.sequence.set(context.sequence.get() + 1);
},
);

export const conflictRepro = recipe<{ items: Default<Item[], []> }>(
({ items }) => {
const sequence = cell(0);

// Minimal repro: Removing the lift and the map removes the conflict
lift((item: Item[]) => item.map((_) => ({})))(items);

return {
action: action({
items,
sequence,
}),
};
},
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { describe, it } from "@std/testing/bdd";
import { runPatternScenario } from "../pattern-harness.ts";
import type { PatternIntegrationScenario } from "../pattern-harness.ts";

interface Item {
id?: string;
}

interface MinimalConflictReproArgument {
items?: Item[];
}

export const minimalLeadScoringScenario: PatternIntegrationScenario<
MinimalConflictReproArgument
> = {
name: "minimal conflict repro",
module: new URL("./minimal-conflict-repro.pattern.ts", import.meta.url),
exportName: "conflictRepro",
argument: {
items: [
{
id: "acme",
},
],
},
steps: [
{
events: [{
stream: "action",
payload: {},
}],
expect: [],
},
],
};

export const scenarios = [minimalLeadScoringScenario];

describe("minimal conflict repro", () => {
for (const scenario of scenarios) {
it(scenario.name, async () => {
await runPatternScenario(scenario);
});
}
});
13 changes: 11 additions & 2 deletions packages/runner/src/storage/transaction/chronicle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export class Chronicle {

// Validate against current state (replica + any overlapping novelty)
const rebase = this.rebase(loaded);

if (rebase.error) {
return rebase;
}
Expand Down Expand Up @@ -298,10 +299,18 @@ export class Chronicle {
// If the merged value is neither `undefined` nor the existing value,
// create an assertion referring to the loaded fact in a causal
// reference.
const factToRefer = loaded.cause ? normalizeFact(loaded) : loaded;
const causeRef = refer(factToRefer);

// Normalize the value to handle NaN and other non-JSON values
// NaN gets serialized to null in JSON, so we normalize it here to ensure
// consistent hashing between client and server
const normalizedValue = JSON.parse(JSON.stringify(merged.value));

edit.assert({
...loaded,
is: merged.value,
cause: refer(loaded.cause ? normalizeFact(loaded) : loaded),
is: normalizedValue,
cause: causeRef,
});
}
}
Expand Down
205 changes: 205 additions & 0 deletions packages/runner/test/conflict-repro.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import { afterEach, beforeEach, describe, it } from "@std/testing/bdd";
import { expect } from "@std/expect";

import { Identity } from "@commontools/identity";
import { StorageManager } from "@commontools/runner/storage/cache.deno";
import { type Cell } from "../src/builder/types.ts";
import { createBuilder } from "../src/builder/factory.ts";
import { Runtime } from "../src/runtime.ts";
import {
type IExtendedStorageTransaction,
type StorageNotification,
} from "../src/storage/interface.ts";

const signer = await Identity.fromPassphrase("test conflict");
const space = signer.did();

interface Item {
id?: string;
}

describe("Conflict Reproduction", () => {
let storageManager: ReturnType<typeof StorageManager.emulate>;
let runtime: Runtime;
let tx: IExtendedStorageTransaction;
let lift: ReturnType<typeof createBuilder>["commontools"]["lift"];
let recipe: ReturnType<typeof createBuilder>["commontools"]["recipe"];
let cell: ReturnType<typeof createBuilder>["commontools"]["cell"];
let handler: ReturnType<typeof createBuilder>["commontools"]["handler"];
let conflictErrors: Error[];

beforeEach(() => {
conflictErrors = [];
storageManager = StorageManager.emulate({ as: signer });

// Subscribe to storage notifications to capture conflicts
storageManager.subscribe({
next: (notification: StorageNotification) => {
if (
notification.type === "revert" &&
notification.reason.name === "ConflictError"
) {
conflictErrors.push(notification.reason);
}
return undefined;
},
});

runtime = new Runtime({
apiUrl: new URL(import.meta.url),
storageManager,
});

tx = runtime.edit();

const { commontools } = createBuilder(runtime);
({ lift, recipe, cell, handler } = commontools);
});

afterEach(async () => {
await tx.commit();
await runtime?.dispose();
await storageManager?.close();
});

it("should NOT have conflicts with lift (fixed)", async () => {
const action = handler<
undefined,
{ items: Cell<Item[]>; sequence: Cell<number> }
>({}, {
type: "object",
properties: {
items: {
type: "array",
items: { type: "object", properties: { id: { type: "string" } } },
asCell: true,
},
sequence: { type: "number", asCell: true },
},
required: ["items", "sequence"],
}, (_event, context) => {
// Minimal repro: Removing either of these removes the conflict
context.items.set([]);
context.sequence.set(context.sequence.get() + 1);
});

const conflictRepro = recipe<{ items: Item[] }>(
"Conflict Repro",
({ items }) => {
const sequence = cell(0);

// Minimal repro: Removing the lift and the map removes the conflict
lift((item: Item[]) => item.map((_) => ({})))(items);

return {
action: action({
items,
sequence,
}),
sequence,
};
},
);

const resultCell = runtime.getCell<{ action: any }>(
space,
"should reproduce conflict with minimal handler",
undefined,
tx,
);
const result = runtime.run(tx, conflictRepro, {
items: [{ id: "test" }],
}, resultCell);
await tx.commit();

await runtime.idle();

// This tests the condition that caused the conflict
expect(result.get().sequence).toBe(undefined);

// Trigger the handler
result.key("action").send({});
await runtime.idle();

// This tests the condition that caused the conflict
expect(result.get().sequence).toBe(null);

// Give time for async conflict notifications to be processed
// The conflict happens during the optimistic transaction retry,
// which completes asynchronously after runtime.idle()
await new Promise((resolve) => setTimeout(resolve, 100));

// Verify that conflicts were NOT captured (fixed by normalizing NaN to null)
expect(conflictErrors.length).toBe(0);
});

it("should NOT have conflicts without lift", async () => {
conflictErrors = []; // Reset

const action = handler<
undefined,
{ items: Cell<Item[]>; sequence: Cell<number> }
>({}, {
type: "object",
properties: {
items: {
type: "array",
items: { type: "object", properties: { id: { type: "string" } } },
asCell: true,
},
sequence: { type: "number", asCell: true },
},
required: ["items", "sequence"],
}, (_event, context) => {
// Same handler logic
context.items.set([]);
context.sequence.set(context.sequence.get() + 1);
});

const conflictReproNoLift = recipe<{ items: Item[] }>(
"Conflict Repro No Lift",
({ items }) => {
const sequence = cell(0);

// NO lift - this should eliminate conflicts

return {
action: action({
items,
sequence,
}),
sequence,
};
},
);

const resultCell = runtime.getCell<{ action: any }>(
space,
"should NOT have conflicts without lift",
undefined,
tx,
);
const result = runtime.run(tx, conflictReproNoLift, {
items: [{ id: "test" }],
}, resultCell);
await tx.commit();

await runtime.idle();

// This tests the condition that caused the conflict
expect(result.get().sequence).toBe(undefined);

// Trigger the handler
result.key("action").send({});
await runtime.idle();

// This tests the condition that caused the conflict
expect(result.get().sequence).toBe(null);

// Give time for async conflict notifications
await new Promise((resolve) => setTimeout(resolve, 100));

// Verify that NO conflicts were captured
expect(conflictErrors.length).toBe(0);
});
});