Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nasty-stars-take.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Fixed race condition which could result in a live query throwing and becoming stuck after multiple mutations complete asynchronously.
12 changes: 6 additions & 6 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,9 +825,6 @@ export class CollectionStateManager<
public capturePreSyncVisibleState(): void {
if (this.pendingSyncedTransactions.length === 0) return

// Clear any previous capture
this.preSyncVisibleState.clear()

// Get all keys that will be affected by sync operations
const syncedKeys = new Set<TKey>()
for (const transaction of this.pendingSyncedTransactions) {
Expand All @@ -843,10 +840,13 @@ export class CollectionStateManager<

// Only capture current visible state for keys that will be affected by sync operations
// This is much more efficient than capturing the entire collection state
// Only capture keys that haven't been captured yet to preserve earlier captures
for (const key of syncedKeys) {
const currentValue = this.get(key)
if (currentValue !== undefined) {
this.preSyncVisibleState.set(key, currentValue)
if (!this.preSyncVisibleState.has(key)) {
const currentValue = this.get(key)
if (currentValue !== undefined) {
this.preSyncVisibleState.set(key, currentValue)
}
}
}
}
Expand Down
125 changes: 125 additions & 0 deletions packages/db/tests/collection-subscribe-changes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
ChangesPayload,
MutationFn,
PendingMutation,
SyncConfig,
} from "../src/types"

// Helper function to wait for changes to be processed
Expand Down Expand Up @@ -1516,4 +1517,128 @@ describe(`Collection.subscribeChanges`, () => {

subscription.unsubscribe()
})

it(`should not emit duplicate insert events when onInsert delays sync write`, async () => {
vi.useFakeTimers()

try {
const changeEvents: Array<any> = []
let syncOps:
| Parameters<
SyncConfig<{ id: string; n: number; foo?: string }, string>[`sync`]
>[0]
| undefined

const collection = createCollection<
{ id: string; n: number; foo?: string },
string
>({
id: `async-oninsert-race-test`,
getKey: (item) => item.id,
sync: {
sync: (cfg) => {
syncOps = cfg
cfg.markReady()
},
},
onInsert: async ({ transaction }) => {
// Simulate async operation (e.g., server round-trip)
await vi.advanceTimersByTimeAsync(100)

// Write modified data back via sync
const modifiedValues = transaction.mutations.map((m) => m.modified)
syncOps!.begin()
for (const value of modifiedValues) {
const existing = collection._state.syncedData.get(value.id)
syncOps!.write({
type: existing ? `update` : `insert`,
value: { ...value, foo: `abc` },
})
}
syncOps!.commit()
},
startSync: true,
})

collection.subscribeChanges((changes) => changeEvents.push(...changes))

// Insert two items rapidly - this triggers the race condition
collection.insert({ id: `0`, n: 1 })
collection.insert({ id: `1`, n: 1 })

await vi.runAllTimersAsync()

// Filter events by type
const insertEvents = changeEvents.filter((e) => e.type === `insert`)
const updateEvents = changeEvents.filter((e) => e.type === `update`)

// Expected: 2 optimistic inserts + 2 sync updates = 4 events
expect(insertEvents.length).toBe(2)
expect(updateEvents.length).toBe(2)
} finally {
vi.restoreAllMocks()
}
})

it(`should handle single insert with delayed sync correctly`, async () => {
vi.useFakeTimers()

try {
const changeEvents: Array<any> = []
let syncOps:
| Parameters<
SyncConfig<{ id: string; n: number; foo?: string }, string>[`sync`]
>[0]
| undefined

const collection = createCollection<
{ id: string; n: number; foo?: string },
string
>({
id: `single-insert-delayed-sync-test`,
getKey: (item) => item.id,
sync: {
sync: (cfg) => {
syncOps = cfg
cfg.markReady()
},
},
onInsert: async ({ transaction }) => {
await vi.advanceTimersByTimeAsync(50)

const modifiedValues = transaction.mutations.map((m) => m.modified)
syncOps!.begin()
for (const value of modifiedValues) {
const existing = collection._state.syncedData.get(value.id)
syncOps!.write({
type: existing ? `update` : `insert`,
value: { ...value, foo: `abc` },
})
}
syncOps!.commit()
},
startSync: true,
})

collection.subscribeChanges((changes) => changeEvents.push(...changes))

collection.insert({ id: `x`, n: 1 })
await vi.runAllTimersAsync()

// Should have optimistic insert + sync update
expect(changeEvents).toHaveLength(2)
expect(changeEvents[0]).toMatchObject({
type: `insert`,
key: `x`,
value: { id: `x`, n: 1 },
})
expect(changeEvents[1]).toMatchObject({
type: `update`,
key: `x`,
value: { id: `x`, n: 1, foo: `abc` },
})
} finally {
vi.restoreAllMocks()
}
})
})
Loading