Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/busy-olives-cover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@tanstack/db": patch
---

Fix live queries getting stuck during long-running sync commits by always
clearing the batching flag on forced emits, tolerating duplicate insert echoes,
and allowing optimistic recomputes to run while commits are still applying. Adds
regression coverage for concurrent optimistic inserts, queued updates, and the
offline-transactions example to ensure everything stays in sync.
14 changes: 10 additions & 4 deletions packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,20 @@ export class CollectionChangesManager<
// Either we're not batching, or we're forcing emission (user action or ending batch cycle)
let eventsToEmit = changes

// If we have batched events and this is a forced emit, combine them
if (this.batchedEvents.length > 0 && forceEmit) {
eventsToEmit = [...this.batchedEvents, ...changes]
if (forceEmit) {
// Force emit is used to end a batch (e.g. after a sync commit). Combine any
// buffered optimistic events with the final changes so subscribers see the
// whole picture, even if the sync diff is empty.
if (this.batchedEvents.length > 0) {
eventsToEmit = [...this.batchedEvents, ...changes]
}
this.batchedEvents = []
this.shouldBatchEvents = false
}

if (eventsToEmit.length === 0) return
if (eventsToEmit.length === 0) {
return
}

// Emit to all listeners
for (const subscription of this.changeSubscriptions) {
Expand Down
6 changes: 5 additions & 1 deletion packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@ export class CollectionStateManager<
triggeredByUserAction: boolean = false
): void {
// Skip redundant recalculations when we're in the middle of committing sync transactions
if (this.isCommittingSyncTransactions) {
// While the sync pipeline is replaying a large batch we still want to honour
// fresh optimistic mutations from the UI. Only skip recompute for the
// internal sync-driven redraws; user-triggered work (triggeredByUserAction)
// must run so live queries stay responsive during long commits.
if (this.isCommittingSyncTransactions && !triggeredByUserAction) {
return
}

Expand Down
19 changes: 17 additions & 2 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
SyncTransactionAlreadyCommittedError,
SyncTransactionAlreadyCommittedWriteError,
} from "../errors"
import { deepEquals } from "../utils"
import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { ChangeMessage, CollectionConfig } from "../types"
import type { CollectionImpl } from "./index.js"
Expand Down Expand Up @@ -84,6 +85,8 @@ export class CollectionSyncManager<
}
const key = this.config.getKey(messageWithoutKey.value)

let messageType = messageWithoutKey.type

// Check if an item with this key already exists when inserting
if (messageWithoutKey.type === `insert`) {
const insertingIntoExistingSynced = state.syncedData.has(key)
Expand All @@ -96,17 +99,29 @@ export class CollectionSyncManager<
!hasPendingDeleteForKey &&
!isTruncateTransaction
) {
throw new DuplicateKeySyncError(key, this.id)
const existingValue = state.syncedData.get(key)
if (
existingValue !== undefined &&
deepEquals(existingValue, messageWithoutKey.value)
) {
// The "insert" is an echo of a value we already have locally.
// Treat it as an update so we preserve optimistic intent without
// throwing a duplicate-key error during reconciliation.
messageType = `update`
} else {
throw new DuplicateKeySyncError(key, this.id)
}
}
}

const message: ChangeMessage<TOutput> = {
...messageWithoutKey,
type: messageType,
key,
}
pendingTransaction.operations.push(message)

if (messageWithoutKey.type === `delete`) {
if (messageType === `delete`) {
pendingTransaction.deletedKeys.add(key)
}
},
Expand Down
Loading
Loading