Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sharp-streets-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Fix bug with orderBy that caused queries to skip duplicate values and/or stall on duplicate values.
2 changes: 1 addition & 1 deletion packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class CollectionChangesManager<
})

if (options.includeInitialState) {
subscription.requestSnapshot()
subscription.requestSnapshot({ trackLoadSubsetPromise: false })
}

// Add to batched listeners
Expand Down
88 changes: 82 additions & 6 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ensureIndexForExpression } from "../indexes/auto-index.js"
import { and, gt, lt } from "../query/builder/functions.js"
import { and, eq, gt, lt } from "../query/builder/functions.js"
import { Value } from "../query/ir.js"
import { EventEmitter } from "../event-emitter.js"
import {
Expand All @@ -20,6 +20,7 @@ import type { CollectionImpl } from "./index.js"
type RequestSnapshotOptions = {
where?: BasicExpression<boolean>
optimizedOnly?: boolean
trackLoadSubsetPromise?: boolean
}

type RequestLimitedSnapshotOptions = {
Expand Down Expand Up @@ -197,7 +198,10 @@ export class CollectionSubscription
subscription: this,
})

this.trackLoadSubsetPromise(syncResult)
const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true
if (trackLoadSubsetPromise) {
this.trackLoadSubsetPromise(syncResult)
}

// Also load data immediately from the collection
const snapshot = this.collection.currentStateAsChanges(stateOpts)
Expand All @@ -218,10 +222,12 @@ export class CollectionSubscription
}

/**
* Sends a snapshot that is limited to the first `limit` rows that fulfill the `where` clause and are bigger than `minValue`.
* Sends a snapshot that fulfills the `where` clause and all rows are bigger or equal to `minValue`.
* Requires a range index to be set with `setOrderByIndex` prior to calling this method.
* It uses that range index to load the items in the order of the index.
* Note: it does not send keys that have already been sent before.
* Note 1: it may load more rows than the provided LIMIT because it loads all values equal to `minValue` + limit values greater than `minValue`.
* This is needed to ensure that it does not accidentally skip duplicate values when the limit falls in the middle of some duplicated values.
* Note 2: it does not send keys that have already been sent before.
*/
requestLimitedSnapshot({
orderBy,
Expand Down Expand Up @@ -257,12 +263,49 @@ export class CollectionSubscription

let biggestObservedValue = minValue
const changes: Array<ChangeMessage<any, string | number>> = []
let keys: Array<string | number> = index.take(limit, minValue, filterFn)

// If we have a minValue we need to handle the case
// where there might be duplicate values equal to minValue that we need to include
// because we can have data like this: [1, 2, 3, 3, 3, 4, 5]
// so if minValue is 3 then the previous snapshot may not have included all 3s
// e.g. if it was offset 0 and limit 3 it would only have loaded the first 3
// so we load all rows equal to minValue first, to be sure we don't skip any duplicate values
let keys: Array<string | number> = []
if (minValue !== undefined) {
// First, get all items with the same value as minValue
const { expression } = orderBy[0]!
const allRowsWithMinValue = this.collection.currentStateAsChanges({
where: eq(expression, new Value(minValue)),
})

if (allRowsWithMinValue) {
const keysWithMinValue = allRowsWithMinValue
.map((change) => change.key)
.filter((key) => !this.sentKeys.has(key) && filterFn(key))

// Add items with the minValue first
keys.push(...keysWithMinValue)

// Then get items greater than minValue
const keysGreaterThanMin = index.take(
limit - keys.length,
minValue,
filterFn
)
keys.push(...keysGreaterThanMin)
} else {
keys = index.take(limit, minValue, filterFn)
}
} else {
keys = index.take(limit, minValue, filterFn)
}

const valuesNeeded = () => Math.max(limit - changes.length, 0)
const collectionExhausted = () => keys.length === 0

while (valuesNeeded() > 0 && !collectionExhausted()) {
const insertedKeys = new Set<string | number>() // Track keys we add to `changes` in this iteration

for (const key of keys) {
const value = this.collection.get(key)!
changes.push({
Expand All @@ -271,6 +314,7 @@ export class CollectionSubscription
value,
})
biggestObservedValue = value
insertedKeys.add(key) // Track this key
}

keys = index.take(valuesNeeded(), biggestObservedValue, filterFn)
Expand All @@ -296,9 +340,41 @@ export class CollectionSubscription
subscription: this,
})

this.trackLoadSubsetPromise(syncResult)
// Make parallel loadSubset calls for values equal to minValue and values greater than minValue
const promises: Array<Promise<void>> = []

// First promise: load all values equal to minValue
if (typeof minValue !== `undefined`) {
const { expression } = orderBy[0]!
const exactValueFilter = eq(expression, new Value(minValue))

const equalValueResult = this.collection._sync.loadSubset({
where: exactValueFilter,
subscription: this,
})

if (equalValueResult instanceof Promise) {
promises.push(equalValueResult)
}
}

// Second promise: load values greater than minValue
if (syncResult instanceof Promise) {
promises.push(syncResult)
}

// Track the combined promise
if (promises.length > 0) {
const combinedPromise = Promise.all(promises).then(() => {})
this.trackLoadSubsetPromise(combinedPromise)
} else {
this.trackLoadSubsetPromise(syncResult)
}
}

// TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function
// and that that also works properly (i.e. does not skip duplicate values)

/**
* Filters and flips changes for keys that have not been sent yet.
* Deletes are filtered out for keys that have not been sent yet.
Expand Down
8 changes: 3 additions & 5 deletions packages/db/src/query/compiler/order-by.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,11 @@ export function processOrderBy(
orderByOptimizationInfo

setSizeCallback = (getSize: () => number) => {
optimizableOrderByCollections[followRefCollection.id] = {
...optimizableOrderByCollections[followRefCollection.id]!,
dataNeeded: () => {
optimizableOrderByCollections[followRefCollection.id]![`dataNeeded`] =
() => {
const size = getSize()
return Math.max(0, orderByOptimizationInfo!.limit - size)
},
}
}
}
}
}
Expand Down
98 changes: 26 additions & 72 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,33 @@ export class CollectionSubscriber<
)
}

const trackLoadPromise = () => {
// Guard against duplicate transitions
if (!this.subscriptionLoadingPromises.has(subscription)) {
let resolve: () => void
const promise = new Promise<void>((res) => {
resolve = res
})

this.subscriptionLoadingPromises.set(subscription, {
resolve: resolve!,
})
this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise(
promise
)
}
}

// It can be that we are not yet subscribed when the first `loadSubset` call happens (i.e. the initial query).
// So we also check the status here and if it's `loadingSubset` then we track the load promise
if (subscription.status === `loadingSubset`) {
trackLoadPromise()
}

// Subscribe to subscription status changes to propagate loading state
const statusUnsubscribe = subscription.on(`status:change`, (event) => {
// TODO: For now we are setting this loading state whenever the subscription
// status changes to 'loadingSubset'. But we have discussed it only happening
// when the the live query has it's offset/limit changed, and that triggers the
// subscription to request a snapshot. This will require more work to implement,
// and builds on https://github.com/TanStack/db/pull/663 which this PR
// does not yet depend on.
if (event.status === `loadingSubset`) {
// Guard against duplicate transitions
if (!this.subscriptionLoadingPromises.has(subscription)) {
let resolve: () => void
const promise = new Promise<void>((res) => {
resolve = res
})

this.subscriptionLoadingPromises.set(subscription, {
resolve: resolve!,
})
this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise(
promise
)
}
trackLoadPromise()
} else {
// status is 'ready'
const deferred = this.subscriptionLoadingPromises.get(subscription)
Expand Down Expand Up @@ -176,30 +180,14 @@ export class CollectionSubscriber<
whereExpression: BasicExpression<boolean> | undefined,
orderByInfo: OrderByOptimizationInfo
) {
const { orderBy, offset, limit, comparator, dataNeeded, index } =
orderByInfo
const { orderBy, offset, limit, index } = orderByInfo

const sendChangesInRange = (
changes: Iterable<ChangeMessage<any, string | number>>
) => {
// Split live updates into a delete of the old value and an insert of the new value
// and filter out changes that are bigger than the biggest value we've sent so far
// because they can't affect the topK (and if later we need more data, we will dynamically load more data)
const splittedChanges = splitUpdates(changes)
let filteredChanges = splittedChanges
if (dataNeeded && dataNeeded() === 0) {
// If the topK is full [..., maxSentValue] then we do not need to send changes > maxSentValue
// because they can never make it into the topK.
// However, if the topK isn't full yet, we need to also send changes > maxSentValue
// because they will make it into the topK
filteredChanges = filterChangesSmallerOrEqualToMax(
splittedChanges,
comparator,
this.biggest
)
}

this.sendChangesToPipelineWithTracking(filteredChanges, subscription)
this.sendChangesToPipelineWithTracking(splittedChanges, subscription)
}

// Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far
Expand Down Expand Up @@ -395,37 +383,3 @@ function* splitUpdates<
}
}
}

function* filterChanges<
T extends object = Record<string, unknown>,
TKey extends string | number = string | number,
>(
changes: Iterable<ChangeMessage<T, TKey>>,
f: (change: ChangeMessage<T, TKey>) => boolean
): Generator<ChangeMessage<T, TKey>> {
for (const change of changes) {
if (f(change)) {
yield change
}
}
}

/**
* Filters changes to only include those that are smaller or equal to the provided max value
* @param changes - Iterable of changes to filter
* @param comparator - Comparator function to use for filtering
* @param maxValue - Range to filter changes within (range boundaries are exclusive)
* @returns Iterable of changes that fall within the range
*/
function* filterChangesSmallerOrEqualToMax<
T extends object = Record<string, unknown>,
TKey extends string | number = string | number,
>(
changes: Iterable<ChangeMessage<T, TKey>>,
comparator: (a: any, b: any) => number,
maxValue: any
): Generator<ChangeMessage<T, TKey>> {
yield* filterChanges(changes, (change) => {
return !maxValue || comparator(change.value, maxValue) <= 0
})
}
Loading
Loading