Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/gold-readers-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Fixed critical bug where optimistic mutations were lost when their async handlers completed during a truncate operation. The fix captures a snapshot of optimistic state when `truncate()` is called and restores it during commit, then overlays any still-active transactions to handle late-arriving mutations. This ensures client-side optimistic state is preserved through server-initiated must-refetch scenarios.
95 changes: 51 additions & 44 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ import type { CollectionLifecycleManager } from "./lifecycle"
import type { CollectionChangesManager } from "./changes"
import type { CollectionIndexesManager } from "./indexes"

interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
interface PendingSyncedTransaction<
T extends object = Record<string, unknown>,
TKey extends string | number = string | number,
> {
committed: boolean
operations: Array<OptimisticChangeMessage<T>>
truncate?: boolean
deletedKeys: Set<string | number>
optimisticSnapshot?: {
upserts: Map<TKey, T>
deletes: Set<TKey>
}
}

export class CollectionStateManager<
Expand All @@ -33,8 +40,9 @@ export class CollectionStateManager<

// Core state - make public for testing
public transactions: SortedMap<string, Transaction<any>>
public pendingSyncedTransactions: Array<PendingSyncedTransaction<TOutput>> =
[]
public pendingSyncedTransactions: Array<
PendingSyncedTransaction<TOutput, TKey>
> = []
public syncedData: Map<TKey, TOutput> | SortedMap<TKey, TOutput>
public syncedMetadata = new Map<TKey, unknown>()

Expand Down Expand Up @@ -442,10 +450,10 @@ export class CollectionStateManager<
},
{
committedSyncedTransactions: [] as Array<
PendingSyncedTransaction<TOutput>
PendingSyncedTransaction<TOutput, TKey>
>,
uncommittedSyncedTransactions: [] as Array<
PendingSyncedTransaction<TOutput>
PendingSyncedTransaction<TOutput, TKey>
>,
hasTruncateSync: false,
}
Expand All @@ -455,6 +463,12 @@ export class CollectionStateManager<
// Set flag to prevent redundant optimistic state recalculations
this.isCommittingSyncTransactions = true

// Get the optimistic snapshot from the truncate transaction (captured when truncate() was called)
const truncateOptimisticSnapshot = hasTruncateSync
? committedSyncedTransactions.find((t) => t.truncate)
?.optimisticSnapshot
: null

// First collect all keys that will be affected by sync operations
const changedKeys = new Set<TKey>()
for (const transaction of committedSyncedTransactions) {
Expand Down Expand Up @@ -484,13 +498,19 @@ export class CollectionStateManager<
// Handle truncate operations first
if (transaction.truncate) {
// TRUNCATE PHASE
// 1) Emit a delete for every currently-synced key so downstream listeners/indexes
// 1) Emit a delete for every visible key (synced + optimistic) so downstream listeners/indexes
// observe a clear-before-rebuild. We intentionally skip keys already in
// optimisticDeletes because their delete was previously emitted by the user.
for (const key of this.syncedData.keys()) {
if (this.optimisticDeletes.has(key)) continue
// Use the snapshot to ensure we emit deletes for all items that existed at truncate start.
const visibleKeys = new Set([
...this.syncedData.keys(),
...(truncateOptimisticSnapshot?.upserts.keys() || []),
])
for (const key of visibleKeys) {
if (truncateOptimisticSnapshot?.deletes.has(key)) continue
const previousValue =
this.optimisticUpserts.get(key) || this.syncedData.get(key)
truncateOptimisticSnapshot?.upserts.get(key) ||
this.syncedData.get(key)
if (previousValue !== undefined) {
events.push({ type: `delete`, key, value: previousValue })
}
Expand Down Expand Up @@ -574,41 +594,14 @@ export class CollectionStateManager<
}
}

// Build re-apply sets from ACTIVE optimistic transactions against the new synced base
// We do not copy maps; we compute intent directly from transactions to avoid drift.
const reapplyUpserts = new Map<TKey, TOutput>()
const reapplyDeletes = new Set<TKey>()

for (const tx of this.transactions.values()) {
if ([`completed`, `failed`].includes(tx.state)) continue
for (const mutation of tx.mutations) {
if (
!this.isThisCollection(mutation.collection) ||
!mutation.optimistic
)
continue
const key = mutation.key as TKey
switch (mutation.type) {
case `insert`:
reapplyUpserts.set(key, mutation.modified as TOutput)
reapplyDeletes.delete(key)
break
case `update`: {
const base = this.syncedData.get(key)
const next = base
? (Object.assign({}, base, mutation.changes) as TOutput)
: (mutation.modified as TOutput)
reapplyUpserts.set(key, next)
reapplyDeletes.delete(key)
break
}
case `delete`:
reapplyUpserts.delete(key)
reapplyDeletes.add(key)
break
}
}
}
// Build re-apply sets from the snapshot taken at the start of this function.
// This prevents losing optimistic state if transactions complete during truncate processing.
const reapplyUpserts = new Map<TKey, TOutput>(
truncateOptimisticSnapshot!.upserts
)
const reapplyDeletes = new Set<TKey>(
truncateOptimisticSnapshot!.deletes
)

// Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete.
// If the server also inserted/updated the same key in this batch, override that value
Expand Down Expand Up @@ -660,6 +653,20 @@ export class CollectionStateManager<

// Reset flag and recompute optimistic state for any remaining active transactions
this.isCommittingSyncTransactions = false

// If we had a truncate, restore the preserved optimistic state from the snapshot
// This includes items from transactions that may have completed during processing
if (hasTruncateSync && truncateOptimisticSnapshot) {
for (const [key, value] of truncateOptimisticSnapshot.upserts) {
this.optimisticUpserts.set(key, value)
}
for (const key of truncateOptimisticSnapshot.deletes) {
this.optimisticDeletes.add(key)
}
}

// Always overlay any still-active optimistic transactions so mutations that started
// after the truncate snapshot are preserved.
for (const transaction of this.transactions.values()) {
if (![`completed`, `failed`].includes(transaction.state)) {
for (const mutation of transaction.mutations) {
Expand Down
7 changes: 7 additions & 0 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ export class CollectionSyncManager<
// - Subsequent synced ops applied on the fresh base
// - Finally, optimistic mutations re-applied on top (single batch)
pendingTransaction.truncate = true

// Capture optimistic state NOW to preserve it even if transactions complete
// before this truncate transaction is committed
pendingTransaction.optimisticSnapshot = {
upserts: new Map(this.state.optimisticUpserts),
deletes: new Set(this.state.optimisticDeletes),
}
},
})
)
Expand Down
11 changes: 8 additions & 3 deletions packages/db/tests/collection-subscribe-changes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -964,11 +964,11 @@ describe(`Collection.subscribeChanges`, () => {
value: `optimistic insert`,
})

// Verify events are a single batch: deletes for synced keys (1,2), then inserts for preserved optimistic (1,3)
expect(changeEvents.length).toBe(4)
// Verify events are a single batch: deletes for ALL visible keys (1,2,3), then inserts for preserved optimistic (1,3)
expect(changeEvents.length).toBe(5)
const deletes = changeEvents.filter((e) => e.type === `delete`)
const inserts = changeEvents.filter((e) => e.type === `insert`)
expect(deletes.length).toBe(2)
expect(deletes.length).toBe(3)
expect(inserts.length).toBe(2)

const deleteByKey = new Map(deletes.map((e) => [e.key, e]))
Expand All @@ -984,6 +984,11 @@ describe(`Collection.subscribeChanges`, () => {
key: 2,
value: { id: 2, value: `initial value 2` },
})
expect(deleteByKey.get(3)).toEqual({
type: `delete`,
key: 3,
value: { id: 3, value: `optimistic insert` },
})

// Insert events for preserved optimistic entries (1 and 3)
expect(insertByKey.get(1)).toEqual({
Expand Down
Loading
Loading