Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-row-origins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/db': patch
---

Avoid full row origin snapshots during incremental collection updates and make bulk mutation merging linear.
33 changes: 28 additions & 5 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,21 @@ export class CollectionStateManager<
})
}

private snapshotRowOriginsForKeys(
keys: Iterable<TKey>,
): Map<TKey, VirtualOrigin> {
const rowOrigins = new Map<TKey, VirtualOrigin>()

for (const key of keys) {
const origin = this.rowOrigins.get(key)
if (origin !== undefined) {
rowOrigins.set(key, origin)
}
}

return rowOrigins
}

private enrichWithVirtualPropsSnapshot(
row: TOutput,
virtualProps: VirtualRowProps<TKey>,
Expand Down Expand Up @@ -476,7 +491,7 @@ export class CollectionStateManager<

const previousState = new Map(this.optimisticUpserts)
const previousDeletes = new Set(this.optimisticDeletes)
const previousRowOrigins = new Map(this.rowOrigins)
const previousRowOrigins = this.rowOrigins

// Update pending optimistic state for completed/failed transactions
for (const transaction of this.transactions.values()) {
Expand Down Expand Up @@ -857,10 +872,6 @@ export class CollectionStateManager<
// Set flag to prevent redundant optimistic state recalculations
this.isCommittingSyncTransactions = true

const previousRowOrigins = new Map(this.rowOrigins)
const previousOptimisticUpserts = new Map(this.optimisticUpserts)
const previousOptimisticDeletes = new Set(this.optimisticDeletes)

// Get the optimistic snapshot from the truncate transaction (captured when truncate() was called)
const truncateOptimisticSnapshot = hasTruncateSync
? committedSyncedTransactions.find((t) => t.truncate)
Expand All @@ -880,6 +891,18 @@ export class CollectionStateManager<
}
}

const virtualSnapshotKeys = new Set(changedKeys)
for (const key of this.pendingOptimisticDirectUpserts) {
virtualSnapshotKeys.add(key)
}
for (const key of this.pendingOptimisticDirectDeletes) {
virtualSnapshotKeys.add(key)
}
const previousRowOrigins =
this.snapshotRowOriginsForKeys(virtualSnapshotKeys)
const previousOptimisticUpserts = new Map(this.optimisticUpserts)
const previousOptimisticDeletes = new Set(this.optimisticDeletes)

// Use pre-captured state if available (from optimistic scenarios),
// otherwise capture current state (for pure sync scenarios)
let currentVisibleState = this.preSyncVisibleState
Expand Down
28 changes: 20 additions & 8 deletions packages/db/src/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,27 +334,39 @@ class Transaction<T extends object = Record<string, unknown>> {
* @param mutations - Array of new mutations to apply
*/
applyMutations(mutations: Array<PendingMutation<any>>): void {
// Merge via a globalKey-keyed map rather than a findIndex scan per
// mutation, which is O(n²) for bulk operations (e.g. inserting many rows
// in one call). Map preserves insertion order, matching the previous
// replace-in-place / remove / append semantics.
const merged = new Map<string, PendingMutation<any>>()
for (const mutation of this.mutations) {
merged.set(mutation.globalKey, mutation)
}

for (const newMutation of mutations) {
const existingIndex = this.mutations.findIndex(
(m) => m.globalKey === newMutation.globalKey,
)
const existingMutation = merged.get(newMutation.globalKey)

if (existingIndex >= 0) {
const existingMutation = this.mutations[existingIndex]!
if (existingMutation) {
const mergeResult = mergePendingMutations(existingMutation, newMutation)

if (mergeResult === null) {
// Remove the mutation (e.g., delete after insert cancels both)
this.mutations.splice(existingIndex, 1)
merged.delete(newMutation.globalKey)
} else {
// Replace with merged mutation
this.mutations[existingIndex] = mergeResult
merged.set(newMutation.globalKey, mergeResult)
}
} else {
// Insert new mutation
this.mutations.push(newMutation)
merged.set(newMutation.globalKey, newMutation)
}
}

// Rebuild in place to preserve the array's identity for external holders
this.mutations.length = 0
for (const mutation of merged.values()) {
this.mutations.push(mutation)
}
}

/**
Expand Down
Loading