diff --git a/.changeset/gold-readers-taste.md b/.changeset/gold-readers-taste.md new file mode 100644 index 000000000..44ade3a2b --- /dev/null +++ b/.changeset/gold-readers-taste.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Fixed critical bug where optimistic mutations were lost when their async handlers completed during a truncate operation. The fix captures a snapshot of optimistic state when `truncate()` is called and restores it during commit, then overlays any still-active transactions to handle late-arriving mutations. This ensures client-side optimistic state is preserved through server-initiated must-refetch scenarios. diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index fa621f83b..cd0b19beb 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -12,11 +12,18 @@ import type { CollectionLifecycleManager } from "./lifecycle" import type { CollectionChangesManager } from "./changes" import type { CollectionIndexesManager } from "./indexes" -interface PendingSyncedTransaction> { +interface PendingSyncedTransaction< + T extends object = Record, + TKey extends string | number = string | number, +> { committed: boolean operations: Array> truncate?: boolean deletedKeys: Set + optimisticSnapshot?: { + upserts: Map + deletes: Set + } } export class CollectionStateManager< @@ -33,8 +40,9 @@ export class CollectionStateManager< // Core state - make public for testing public transactions: SortedMap> - public pendingSyncedTransactions: Array> = - [] + public pendingSyncedTransactions: Array< + PendingSyncedTransaction + > = [] public syncedData: Map | SortedMap public syncedMetadata = new Map() @@ -442,10 +450,10 @@ export class CollectionStateManager< }, { committedSyncedTransactions: [] as Array< - PendingSyncedTransaction + PendingSyncedTransaction >, uncommittedSyncedTransactions: [] as Array< - PendingSyncedTransaction + PendingSyncedTransaction >, hasTruncateSync: false, } @@ -455,6 +463,12 @@ export class CollectionStateManager< // Set flag to prevent redundant optimistic state recalculations this.isCommittingSyncTransactions = true + // Get the optimistic snapshot from the truncate transaction (captured when truncate() was called) + const truncateOptimisticSnapshot = hasTruncateSync + ? committedSyncedTransactions.find((t) => t.truncate) + ?.optimisticSnapshot + : null + // First collect all keys that will be affected by sync operations const changedKeys = new Set() for (const transaction of committedSyncedTransactions) { @@ -484,13 +498,19 @@ export class CollectionStateManager< // Handle truncate operations first if (transaction.truncate) { // TRUNCATE PHASE - // 1) Emit a delete for every currently-synced key so downstream listeners/indexes + // 1) Emit a delete for every visible key (synced + optimistic) so downstream listeners/indexes // observe a clear-before-rebuild. We intentionally skip keys already in // optimisticDeletes because their delete was previously emitted by the user. - for (const key of this.syncedData.keys()) { - if (this.optimisticDeletes.has(key)) continue + // Use the snapshot to ensure we emit deletes for all items that existed at truncate start. + const visibleKeys = new Set([ + ...this.syncedData.keys(), + ...(truncateOptimisticSnapshot?.upserts.keys() || []), + ]) + for (const key of visibleKeys) { + if (truncateOptimisticSnapshot?.deletes.has(key)) continue const previousValue = - this.optimisticUpserts.get(key) || this.syncedData.get(key) + truncateOptimisticSnapshot?.upserts.get(key) || + this.syncedData.get(key) if (previousValue !== undefined) { events.push({ type: `delete`, key, value: previousValue }) } @@ -574,41 +594,14 @@ export class CollectionStateManager< } } - // Build re-apply sets from ACTIVE optimistic transactions against the new synced base - // We do not copy maps; we compute intent directly from transactions to avoid drift. - const reapplyUpserts = new Map() - const reapplyDeletes = new Set() - - for (const tx of this.transactions.values()) { - if ([`completed`, `failed`].includes(tx.state)) continue - for (const mutation of tx.mutations) { - if ( - !this.isThisCollection(mutation.collection) || - !mutation.optimistic - ) - continue - const key = mutation.key as TKey - switch (mutation.type) { - case `insert`: - reapplyUpserts.set(key, mutation.modified as TOutput) - reapplyDeletes.delete(key) - break - case `update`: { - const base = this.syncedData.get(key) - const next = base - ? (Object.assign({}, base, mutation.changes) as TOutput) - : (mutation.modified as TOutput) - reapplyUpserts.set(key, next) - reapplyDeletes.delete(key) - break - } - case `delete`: - reapplyUpserts.delete(key) - reapplyDeletes.add(key) - break - } - } - } + // Build re-apply sets from the snapshot taken at the start of this function. + // This prevents losing optimistic state if transactions complete during truncate processing. + const reapplyUpserts = new Map( + truncateOptimisticSnapshot!.upserts + ) + const reapplyDeletes = new Set( + truncateOptimisticSnapshot!.deletes + ) // Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete. // If the server also inserted/updated the same key in this batch, override that value @@ -660,6 +653,20 @@ export class CollectionStateManager< // Reset flag and recompute optimistic state for any remaining active transactions this.isCommittingSyncTransactions = false + + // If we had a truncate, restore the preserved optimistic state from the snapshot + // This includes items from transactions that may have completed during processing + if (hasTruncateSync && truncateOptimisticSnapshot) { + for (const [key, value] of truncateOptimisticSnapshot.upserts) { + this.optimisticUpserts.set(key, value) + } + for (const key of truncateOptimisticSnapshot.deletes) { + this.optimisticDeletes.add(key) + } + } + + // Always overlay any still-active optimistic transactions so mutations that started + // after the truncate snapshot are preserved. for (const transaction of this.transactions.values()) { if (![`completed`, `failed`].includes(transaction.state)) { for (const mutation of transaction.mutations) { diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 1a26038dc..124b54fe1 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -181,6 +181,13 @@ export class CollectionSyncManager< // - Subsequent synced ops applied on the fresh base // - Finally, optimistic mutations re-applied on top (single batch) pendingTransaction.truncate = true + + // Capture optimistic state NOW to preserve it even if transactions complete + // before this truncate transaction is committed + pendingTransaction.optimisticSnapshot = { + upserts: new Map(this.state.optimisticUpserts), + deletes: new Set(this.state.optimisticDeletes), + } }, }) ) diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 31bf2737a..1ca729f8f 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -964,11 +964,11 @@ describe(`Collection.subscribeChanges`, () => { value: `optimistic insert`, }) - // Verify events are a single batch: deletes for synced keys (1,2), then inserts for preserved optimistic (1,3) - expect(changeEvents.length).toBe(4) + // Verify events are a single batch: deletes for ALL visible keys (1,2,3), then inserts for preserved optimistic (1,3) + expect(changeEvents.length).toBe(5) const deletes = changeEvents.filter((e) => e.type === `delete`) const inserts = changeEvents.filter((e) => e.type === `insert`) - expect(deletes.length).toBe(2) + expect(deletes.length).toBe(3) expect(inserts.length).toBe(2) const deleteByKey = new Map(deletes.map((e) => [e.key, e])) @@ -984,6 +984,11 @@ describe(`Collection.subscribeChanges`, () => { key: 2, value: { id: 2, value: `initial value 2` }, }) + expect(deleteByKey.get(3)).toEqual({ + type: `delete`, + key: 3, + value: { id: 3, value: `optimistic insert` }, + }) // Insert events for preserved optimistic entries (1 and 3) expect(insertByKey.get(1)).toEqual({ diff --git a/packages/db/tests/collection-truncate.test.ts b/packages/db/tests/collection-truncate.test.ts new file mode 100644 index 000000000..f3630d0aa --- /dev/null +++ b/packages/db/tests/collection-truncate.test.ts @@ -0,0 +1,709 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" +import { createCollection } from "../src/collection/index.js" +import type { SyncConfig } from "../src/types" + +describe(`Collection truncate operations`, () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it(`should preserve optimistic inserts when truncate completes before async mutation handler`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-with-optimistic`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `initial-1` } }) + cfg.write({ type: `insert`, value: { id: 2, value: `initial-2` } }) + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async ({ transaction }) => { + await vi.advanceTimersByTimeAsync(50) + syncOps!.begin() + for (const mutation of transaction.mutations) { + syncOps!.write({ + type: `insert`, + value: mutation.modified, + }) + } + syncOps!.commit() + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes), { + includeInitialState: true, + }) + + await collection.stateWhenReady() + expect(collection.state.size).toBe(2) + changeEvents.length = 0 + + // Start optimistic insert + const tx = collection.insert({ id: 3, value: `new-item` }) + + expect(changeEvents.length).toBe(1) + expect(changeEvents[0]).toEqual({ + type: `insert`, + key: 3, + value: { id: 3, value: `new-item` }, + }) + + changeEvents.length = 0 + + // Truncate before onInsert completes + syncOps!.begin() + syncOps!.truncate() + syncOps!.write({ type: `insert`, value: { id: 1, value: `initial-1` } }) + syncOps!.write({ type: `insert`, value: { id: 2, value: `initial-2` } }) + syncOps!.commit() + + await tx.isPersisted.promise + + // Verify final state includes all items + expect(collection.state.size).toBe(3) + expect(collection.state.get(1)).toEqual({ id: 1, value: `initial-1` }) + expect(collection.state.get(2)).toEqual({ id: 2, value: `initial-2` }) + expect(collection.state.get(3)).toEqual({ id: 3, value: `new-item` }) + + // Verify only one insert event for the optimistic item + const key3Inserts = changeEvents.filter( + (e) => e.type === `insert` && e.key === 3 + ) + expect(key3Inserts.length).toBe(1) + }) + + it(`should preserve optimistic inserts when mutation handler completes during truncate processing`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-during-mutation`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `initial-1` } }) + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async ({ transaction }) => { + await vi.advanceTimersByTimeAsync(10) + syncOps!.begin() + for (const mutation of transaction.mutations) { + syncOps!.write({ + type: `insert`, + value: mutation.modified, + }) + } + syncOps!.commit() + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + changeEvents.length = 0 + + const tx = collection.insert({ id: 2, value: `new-item` }) + + changeEvents.length = 0 + + syncOps!.begin() + syncOps!.truncate() + syncOps!.write({ type: `insert`, value: { id: 1, value: `initial-1` } }) + + await tx.isPersisted.promise + + syncOps!.commit() + + // Both items should be present in final state + expect(collection.state.size).toBe(2) + expect(collection.state.has(1)).toBe(true) + expect(collection.state.has(2)).toBe(true) + expect(collection.state.get(2)).toEqual({ id: 2, value: `new-item` }) + }) + + it(`should handle truncate on empty collection followed by mutation sync`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-empty-collection`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async ({ transaction }) => { + await vi.advanceTimersByTimeAsync(100) + syncOps!.begin() + for (const mutation of transaction.mutations) { + syncOps!.write({ + type: `insert`, + value: mutation.modified, + }) + } + syncOps!.commit() + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + const tx = collection.insert({ id: 1, value: `user-item` }) + + expect(changeEvents.length).toBe(1) + expect(changeEvents[0]).toEqual({ + type: `insert`, + key: 1, + value: { id: 1, value: `user-item` }, + }) + + changeEvents.length = 0 + + // Truncate before mutation handler completes + syncOps!.begin() + syncOps!.truncate() + syncOps!.commit() + + await tx.isPersisted.promise + + // Item should be present in final state + expect(collection.state.get(1)).toEqual({ id: 1, value: `user-item` }) + + // Should not have duplicate insert events + const insertCount = changeEvents.filter( + (e) => e.type === `insert` && e.key === 1 + ).length + expect(insertCount).toBeLessThanOrEqual(1) + }) + + it(`should emit delete events for optimistic-only data during truncate`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + let resolveOnInsert: (() => void) | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-optimistic-only`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async () => { + // Keep promise pending until we explicitly resolve it + await new Promise((resolve) => { + resolveOnInsert = resolve + }) + // Don't sync - keep it optimistic only + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + const tx = collection.insert({ id: 1, value: `optimistic-only` }) + + expect(collection.state.size).toBe(1) + expect(changeEvents.length).toBe(1) + expect(changeEvents[0]).toEqual({ + type: `insert`, + key: 1, + value: { id: 1, value: `optimistic-only` }, + }) + + changeEvents.length = 0 + + // Truncate while onInsert is still pending + syncOps!.begin() + syncOps!.truncate() + syncOps!.commit() + + const deleteEvents = changeEvents.filter((e) => e.type === `delete`) + const insertEvents = changeEvents.filter((e) => e.type === `insert`) + + // Should emit delete event for the optimistic item + expect(deleteEvents.length).toBe(1) + expect(deleteEvents[0]).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `optimistic-only` }, + }) + + // Then re-insert the preserved optimistic item + expect(insertEvents.length).toBe(1) + expect(insertEvents[0]).toEqual({ + type: `insert`, + key: 1, + value: { id: 1, value: `optimistic-only` }, + }) + + // Now explicitly complete the onInsert + resolveOnInsert!() + await tx.isPersisted.promise + }) + + it(`should preserve optimistic inserts started after truncate begins`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + const onInsertResolvers: Array<() => void> = [] + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-late-optimistic`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `server-item` } }) + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async ({ transaction }) => { + await new Promise((resolve) => { + onInsertResolvers.push(resolve) + }) + syncOps!.begin() + for (const mutation of transaction.mutations) { + syncOps!.write({ + type: `insert`, + value: mutation.modified, + }) + } + syncOps!.commit() + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + changeEvents.length = 0 + + syncOps!.begin() + syncOps!.truncate() + syncOps!.write({ type: `insert`, value: { id: 1, value: `server-item` } }) + + const lateTx = collection.insert({ + id: 2, + value: `late-optimistic`, + }) + + expect(collection.state.has(2)).toBe(true) + + syncOps!.commit() + + expect(collection.state.size).toBe(2) + expect(collection.state.has(1)).toBe(true) + expect(collection.state.get(1)).toEqual({ id: 1, value: `server-item` }) + expect(collection.state.has(2)).toBe(true) + expect(collection.state.get(2)).toEqual({ + id: 2, + value: `late-optimistic`, + }) + + // Clean up the pending optimistic transactions + while (onInsertResolvers.length > 0) { + onInsertResolvers.pop()!() + } + await lateTx.isPersisted.promise + }) + + it(`should preserve all optimistic inserts when truncate occurs during async mutation handler`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-preserve-optimistic`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `server-item` } }) + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async ({ transaction }) => { + await vi.advanceTimersByTimeAsync(50) + syncOps!.begin() + for (const mutation of transaction.mutations) { + syncOps!.write({ + type: `insert`, + value: mutation.modified, + }) + } + syncOps!.commit() + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + expect(collection.state.size).toBe(1) + changeEvents.length = 0 + + const tx = collection.insert({ id: 2, value: `optimistic-item` }) + + expect(collection.state.size).toBe(2) + expect(changeEvents.length).toBe(1) + changeEvents.length = 0 + + // Truncate before mutation handler completes + syncOps!.begin() + syncOps!.truncate() + // Server re-inserts item 1 but not item 2 + syncOps!.write({ type: `insert`, value: { id: 1, value: `server-item` } }) + syncOps!.commit() + + await tx.isPersisted.promise + + // Both items should be present in final state + expect(collection.state.size).toBe(2) + expect(collection.state.has(1)).toBe(true) + expect(collection.state.has(2)).toBe(true) + expect(collection.state.get(2)).toEqual({ id: 2, value: `optimistic-item` }) + }) + + it(`should preserve optimistic delete when transaction still active during truncate`, async () => { + let syncOps: + | Parameters[`sync`]>[0] + | undefined + const onDeleteResolvers: Array<() => void> = [] + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-optimistic-delete-active`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `item-1` } }) + cfg.write({ type: `insert`, value: { id: 2, value: `item-2` } }) + cfg.commit() + cfg.markReady() + }, + }, + onDelete: async () => { + await new Promise((resolve) => onDeleteResolvers.push(resolve)) + }, + }) + + await collection.stateWhenReady() + + expect(collection.state.size).toBe(2) + + // Optimistically delete item 1 (handler stays pending) + const deleteTx = collection.delete(1) + + // Verify optimistic delete is applied + expect(collection.state.size).toBe(1) + expect(collection.state.has(1)).toBe(false) + expect(collection._state.optimisticDeletes.has(1)).toBe(true) + + // Truncate while delete transaction is still active + syncOps!.begin() + syncOps!.truncate() + // Server re-inserts both items (doesn't know about the delete yet) + syncOps!.write({ type: `insert`, value: { id: 1, value: `item-1` } }) + syncOps!.write({ type: `insert`, value: { id: 2, value: `item-2` } }) + syncOps!.commit() + + // Item 1 should still be deleted (optimistic delete preserved from snapshot) + expect(collection.state.size).toBe(1) + expect(collection.state.has(1)).toBe(false) + expect(collection.state.has(2)).toBe(true) + expect(collection._state.optimisticDeletes.has(1)).toBe(true) + + // Clean up + onDeleteResolvers.forEach((r) => r()) + await deleteTx.isPersisted.promise + }) + + it(`should preserve optimistic value over server value when transaction active during truncate`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + const onUpdateResolvers: Array<() => void> = [] + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-optimistic-vs-server`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ + type: `insert`, + value: { id: 1, value: `server-value-1` }, + }) + cfg.commit() + cfg.markReady() + }, + }, + onUpdate: async () => { + await new Promise((resolve) => onUpdateResolvers.push(resolve)) + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + expect(collection.state.get(1)).toEqual({ id: 1, value: `server-value-1` }) + changeEvents.length = 0 + + // Optimistically update item 1 (handler stays pending) + const updateTx = collection.update(1, (draft) => { + draft.value = `optimistic-value` + }) + + expect(collection.state.get(1)).toEqual({ + id: 1, + value: `optimistic-value`, + }) + changeEvents.length = 0 + + // Truncate while update transaction is still active + syncOps!.begin() + syncOps!.truncate() + // Server re-inserts with a DIFFERENT value + syncOps!.write({ + type: `insert`, + value: { id: 1, value: `server-value-2` }, + }) + syncOps!.commit() + + // Optimistic value should win (client intent preserved) + expect(collection.state.get(1)).toEqual({ + id: 1, + value: `optimistic-value`, + }) + + // Clean up + onUpdateResolvers.forEach((r) => r()) + await updateTx.isPersisted.promise + }) + + it(`should handle multiple consecutive truncate operations`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + const onInsertResolvers: Array<() => void> = [] + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-consecutive`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `initial` } }) + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async () => { + await new Promise((resolve) => onInsertResolvers.push(resolve)) + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + expect(collection.state.size).toBe(1) + changeEvents.length = 0 + + // First optimistic insert (stays pending) + const tx1 = collection.insert({ id: 2, value: `optimistic-A` }) + expect(collection.state.size).toBe(2) + + // First truncate (item 2 should be preserved because tx1 is still active) + syncOps!.begin() + syncOps!.truncate() + syncOps!.write({ type: `insert`, value: { id: 1, value: `initial` } }) + syncOps!.commit() + + expect(collection.state.size).toBe(2) + expect(collection.state.has(2)).toBe(true) + + changeEvents.length = 0 + + // Second optimistic insert (stays pending) + const tx2 = collection.insert({ id: 3, value: `optimistic-B` }) + expect(collection.state.size).toBe(3) + + // Second truncate (both items 2 and 3 should be preserved) + syncOps!.begin() + syncOps!.truncate() + syncOps!.write({ type: `insert`, value: { id: 1, value: `initial` } }) + syncOps!.commit() + + // Both optimistic items should be preserved + expect(collection.state.size).toBe(3) + expect(collection.state.has(2)).toBe(true) + expect(collection.state.has(3)).toBe(true) + + // Clean up + onInsertResolvers.forEach((r) => r()) + await Promise.all([tx1.isPersisted.promise, tx2.isPersisted.promise]) + }) + + it(`should handle new mutation on same key after truncate snapshot`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + const onUpdateResolvers: Array<() => void> = [] + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-same-key-mutation`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `initial` } }) + cfg.commit() + cfg.markReady() + }, + }, + onUpdate: async () => { + await new Promise((resolve) => onUpdateResolvers.push(resolve)) + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + changeEvents.length = 0 + + // First optimistic update + const tx1 = collection.update(1, (draft) => { + draft.value = `value-1` + }) + + expect(collection.state.get(1)).toEqual({ id: 1, value: `value-1` }) + + // Truncate is called (snapshot captures value-1) + syncOps!.begin() + syncOps!.truncate() + + // BEFORE commit, user makes another update to the same key + const tx2 = collection.update(1, (draft) => { + draft.value = `value-2` + }) + + expect(collection.state.get(1)).toEqual({ id: 1, value: `value-2` }) + + // Now commit the truncate + syncOps!.write({ type: `insert`, value: { id: 1, value: `initial` } }) + syncOps!.commit() + + // Should show value-2 (newest intent wins) + expect(collection.state.get(1)).toEqual({ id: 1, value: `value-2` }) + + // Clean up + onUpdateResolvers.forEach((r) => r()) + await Promise.all([tx1.isPersisted.promise, tx2.isPersisted.promise]) + }) + + it(`should handle transaction completing between truncate and commit`, async () => { + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + let onInsertResolver: (() => void) | null = null + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-transaction-completes`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `initial` } }) + cfg.commit() + cfg.markReady() + }, + }, + onInsert: async () => { + await new Promise((resolve) => { + onInsertResolver = resolve + }) + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + changeEvents.length = 0 + + // Optimistic insert + const tx = collection.insert({ id: 2, value: `optimistic` }) + + expect(collection.state.size).toBe(2) + expect(collection.state.has(2)).toBe(true) + + // Truncate is called (snapshot captures item 2) + syncOps!.begin() + syncOps!.truncate() + + // Transaction completes BEFORE commit + onInsertResolver!() + await tx.isPersisted.promise + + // Now commit the truncate + syncOps!.write({ type: `insert`, value: { id: 1, value: `initial` } }) + syncOps!.commit() + + // Item 2 should still be present (preserved from snapshot) + expect(collection.state.size).toBe(2) + expect(collection.state.has(2)).toBe(true) + expect(collection.state.get(2)).toEqual({ id: 2, value: `optimistic` }) + }) +})