diff --git a/.changeset/nasty-stars-take.md b/.changeset/nasty-stars-take.md new file mode 100644 index 000000000..abd5a120b --- /dev/null +++ b/.changeset/nasty-stars-take.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Fixed race condition which could result in a live query throwing and becoming stuck after multiple mutations complete asynchronously. diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index d7f5e19e3..fa621f83b 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -825,9 +825,6 @@ export class CollectionStateManager< public capturePreSyncVisibleState(): void { if (this.pendingSyncedTransactions.length === 0) return - // Clear any previous capture - this.preSyncVisibleState.clear() - // Get all keys that will be affected by sync operations const syncedKeys = new Set() for (const transaction of this.pendingSyncedTransactions) { @@ -843,10 +840,13 @@ export class CollectionStateManager< // Only capture current visible state for keys that will be affected by sync operations // This is much more efficient than capturing the entire collection state + // Only capture keys that haven't been captured yet to preserve earlier captures for (const key of syncedKeys) { - const currentValue = this.get(key) - if (currentValue !== undefined) { - this.preSyncVisibleState.set(key, currentValue) + if (!this.preSyncVisibleState.has(key)) { + const currentValue = this.get(key) + if (currentValue !== undefined) { + this.preSyncVisibleState.set(key, currentValue) + } } } } diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 3311e4ab2..31bf2737a 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -9,6 +9,7 @@ import type { ChangesPayload, MutationFn, PendingMutation, + SyncConfig, } from "../src/types" // Helper function to wait for changes to be processed @@ -1516,4 +1517,128 @@ describe(`Collection.subscribeChanges`, () => { subscription.unsubscribe() }) + + it(`should not emit duplicate insert events when onInsert delays sync write`, async () => { + vi.useFakeTimers() + + try { + const changeEvents: Array = [] + let syncOps: + | Parameters< + SyncConfig<{ id: string; n: number; foo?: string }, string>[`sync`] + >[0] + | undefined + + const collection = createCollection< + { id: string; n: number; foo?: string }, + string + >({ + id: `async-oninsert-race-test`, + getKey: (item) => item.id, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + }, + }, + onInsert: async ({ transaction }) => { + // Simulate async operation (e.g., server round-trip) + await vi.advanceTimersByTimeAsync(100) + + // Write modified data back via sync + const modifiedValues = transaction.mutations.map((m) => m.modified) + syncOps!.begin() + for (const value of modifiedValues) { + const existing = collection._state.syncedData.get(value.id) + syncOps!.write({ + type: existing ? `update` : `insert`, + value: { ...value, foo: `abc` }, + }) + } + syncOps!.commit() + }, + startSync: true, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + + // Insert two items rapidly - this triggers the race condition + collection.insert({ id: `0`, n: 1 }) + collection.insert({ id: `1`, n: 1 }) + + await vi.runAllTimersAsync() + + // Filter events by type + const insertEvents = changeEvents.filter((e) => e.type === `insert`) + const updateEvents = changeEvents.filter((e) => e.type === `update`) + + // Expected: 2 optimistic inserts + 2 sync updates = 4 events + expect(insertEvents.length).toBe(2) + expect(updateEvents.length).toBe(2) + } finally { + vi.restoreAllMocks() + } + }) + + it(`should handle single insert with delayed sync correctly`, async () => { + vi.useFakeTimers() + + try { + const changeEvents: Array = [] + let syncOps: + | Parameters< + SyncConfig<{ id: string; n: number; foo?: string }, string>[`sync`] + >[0] + | undefined + + const collection = createCollection< + { id: string; n: number; foo?: string }, + string + >({ + id: `single-insert-delayed-sync-test`, + getKey: (item) => item.id, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + }, + }, + onInsert: async ({ transaction }) => { + await vi.advanceTimersByTimeAsync(50) + + const modifiedValues = transaction.mutations.map((m) => m.modified) + syncOps!.begin() + for (const value of modifiedValues) { + const existing = collection._state.syncedData.get(value.id) + syncOps!.write({ + type: existing ? `update` : `insert`, + value: { ...value, foo: `abc` }, + }) + } + syncOps!.commit() + }, + startSync: true, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + + collection.insert({ id: `x`, n: 1 }) + await vi.runAllTimersAsync() + + // Should have optimistic insert + sync update + expect(changeEvents).toHaveLength(2) + expect(changeEvents[0]).toMatchObject({ + type: `insert`, + key: `x`, + value: { id: `x`, n: 1 }, + }) + expect(changeEvents[1]).toMatchObject({ + type: `update`, + key: `x`, + value: { id: `x`, n: 1, foo: `abc` }, + }) + } finally { + vi.restoreAllMocks() + } + }) })