diff --git a/.changeset/sharp-streets-repair.md b/.changeset/sharp-streets-repair.md new file mode 100644 index 000000000..f94e5e46c --- /dev/null +++ b/.changeset/sharp-streets-repair.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Fix bug with orderBy that caused queries to skip duplicate values and/or stall on duplicate values. diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index 97e42c53b..2c9cccba5 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -108,7 +108,7 @@ export class CollectionChangesManager< }) if (options.includeInitialState) { - subscription.requestSnapshot() + subscription.requestSnapshot({ trackLoadSubsetPromise: false }) } // Add to batched listeners diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index d6a43a256..33cedb946 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -1,5 +1,5 @@ import { ensureIndexForExpression } from "../indexes/auto-index.js" -import { and, gt, lt } from "../query/builder/functions.js" +import { and, eq, gt, lt } from "../query/builder/functions.js" import { Value } from "../query/ir.js" import { EventEmitter } from "../event-emitter.js" import { @@ -20,6 +20,7 @@ import type { CollectionImpl } from "./index.js" type RequestSnapshotOptions = { where?: BasicExpression optimizedOnly?: boolean + trackLoadSubsetPromise?: boolean } type RequestLimitedSnapshotOptions = { @@ -197,7 +198,10 @@ export class CollectionSubscription subscription: this, }) - this.trackLoadSubsetPromise(syncResult) + const trackLoadSubsetPromise = opts?.trackLoadSubsetPromise ?? true + if (trackLoadSubsetPromise) { + this.trackLoadSubsetPromise(syncResult) + } // Also load data immediately from the collection const snapshot = this.collection.currentStateAsChanges(stateOpts) @@ -218,10 +222,12 @@ export class CollectionSubscription } /** - * Sends a snapshot that is limited to the first `limit` rows that fulfill the `where` clause and are bigger than `minValue`. + * Sends a snapshot that fulfills the `where` clause and all rows are bigger or equal to `minValue`. * Requires a range index to be set with `setOrderByIndex` prior to calling this method. * It uses that range index to load the items in the order of the index. - * Note: it does not send keys that have already been sent before. + * Note 1: it may load more rows than the provided LIMIT because it loads all values equal to `minValue` + limit values greater than `minValue`. + * This is needed to ensure that it does not accidentally skip duplicate values when the limit falls in the middle of some duplicated values. + * Note 2: it does not send keys that have already been sent before. */ requestLimitedSnapshot({ orderBy, @@ -257,12 +263,49 @@ export class CollectionSubscription let biggestObservedValue = minValue const changes: Array> = [] - let keys: Array = index.take(limit, minValue, filterFn) + + // If we have a minValue we need to handle the case + // where there might be duplicate values equal to minValue that we need to include + // because we can have data like this: [1, 2, 3, 3, 3, 4, 5] + // so if minValue is 3 then the previous snapshot may not have included all 3s + // e.g. if it was offset 0 and limit 3 it would only have loaded the first 3 + // so we load all rows equal to minValue first, to be sure we don't skip any duplicate values + let keys: Array = [] + if (minValue !== undefined) { + // First, get all items with the same value as minValue + const { expression } = orderBy[0]! + const allRowsWithMinValue = this.collection.currentStateAsChanges({ + where: eq(expression, new Value(minValue)), + }) + + if (allRowsWithMinValue) { + const keysWithMinValue = allRowsWithMinValue + .map((change) => change.key) + .filter((key) => !this.sentKeys.has(key) && filterFn(key)) + + // Add items with the minValue first + keys.push(...keysWithMinValue) + + // Then get items greater than minValue + const keysGreaterThanMin = index.take( + limit - keys.length, + minValue, + filterFn + ) + keys.push(...keysGreaterThanMin) + } else { + keys = index.take(limit, minValue, filterFn) + } + } else { + keys = index.take(limit, minValue, filterFn) + } const valuesNeeded = () => Math.max(limit - changes.length, 0) const collectionExhausted = () => keys.length === 0 while (valuesNeeded() > 0 && !collectionExhausted()) { + const insertedKeys = new Set() // Track keys we add to `changes` in this iteration + for (const key of keys) { const value = this.collection.get(key)! changes.push({ @@ -271,6 +314,7 @@ export class CollectionSubscription value, }) biggestObservedValue = value + insertedKeys.add(key) // Track this key } keys = index.take(valuesNeeded(), biggestObservedValue, filterFn) @@ -296,9 +340,41 @@ export class CollectionSubscription subscription: this, }) - this.trackLoadSubsetPromise(syncResult) + // Make parallel loadSubset calls for values equal to minValue and values greater than minValue + const promises: Array> = [] + + // First promise: load all values equal to minValue + if (typeof minValue !== `undefined`) { + const { expression } = orderBy[0]! + const exactValueFilter = eq(expression, new Value(minValue)) + + const equalValueResult = this.collection._sync.loadSubset({ + where: exactValueFilter, + subscription: this, + }) + + if (equalValueResult instanceof Promise) { + promises.push(equalValueResult) + } + } + + // Second promise: load values greater than minValue + if (syncResult instanceof Promise) { + promises.push(syncResult) + } + + // Track the combined promise + if (promises.length > 0) { + const combinedPromise = Promise.all(promises).then(() => {}) + this.trackLoadSubsetPromise(combinedPromise) + } else { + this.trackLoadSubsetPromise(syncResult) + } } + // TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function + // and that that also works properly (i.e. does not skip duplicate values) + /** * Filters and flips changes for keys that have not been sent yet. * Deletes are filtered out for keys that have not been sent yet. diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 679dceeda..6156ae2ce 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -179,13 +179,11 @@ export function processOrderBy( orderByOptimizationInfo setSizeCallback = (getSize: () => number) => { - optimizableOrderByCollections[followRefCollection.id] = { - ...optimizableOrderByCollections[followRefCollection.id]!, - dataNeeded: () => { + optimizableOrderByCollections[followRefCollection.id]![`dataNeeded`] = + () => { const size = getSize() return Math.max(0, orderByOptimizationInfo!.limit - size) - }, - } + } } } } diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 5835a2f0e..0eea04a60 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -73,29 +73,33 @@ export class CollectionSubscriber< ) } + const trackLoadPromise = () => { + // Guard against duplicate transitions + if (!this.subscriptionLoadingPromises.has(subscription)) { + let resolve: () => void + const promise = new Promise((res) => { + resolve = res + }) + + this.subscriptionLoadingPromises.set(subscription, { + resolve: resolve!, + }) + this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise( + promise + ) + } + } + + // It can be that we are not yet subscribed when the first `loadSubset` call happens (i.e. the initial query). + // So we also check the status here and if it's `loadingSubset` then we track the load promise + if (subscription.status === `loadingSubset`) { + trackLoadPromise() + } + // Subscribe to subscription status changes to propagate loading state const statusUnsubscribe = subscription.on(`status:change`, (event) => { - // TODO: For now we are setting this loading state whenever the subscription - // status changes to 'loadingSubset'. But we have discussed it only happening - // when the the live query has it's offset/limit changed, and that triggers the - // subscription to request a snapshot. This will require more work to implement, - // and builds on https://github.com/TanStack/db/pull/663 which this PR - // does not yet depend on. if (event.status === `loadingSubset`) { - // Guard against duplicate transitions - if (!this.subscriptionLoadingPromises.has(subscription)) { - let resolve: () => void - const promise = new Promise((res) => { - resolve = res - }) - - this.subscriptionLoadingPromises.set(subscription, { - resolve: resolve!, - }) - this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise( - promise - ) - } + trackLoadPromise() } else { // status is 'ready' const deferred = this.subscriptionLoadingPromises.get(subscription) @@ -176,30 +180,14 @@ export class CollectionSubscriber< whereExpression: BasicExpression | undefined, orderByInfo: OrderByOptimizationInfo ) { - const { orderBy, offset, limit, comparator, dataNeeded, index } = - orderByInfo + const { orderBy, offset, limit, index } = orderByInfo const sendChangesInRange = ( changes: Iterable> ) => { // Split live updates into a delete of the old value and an insert of the new value - // and filter out changes that are bigger than the biggest value we've sent so far - // because they can't affect the topK (and if later we need more data, we will dynamically load more data) const splittedChanges = splitUpdates(changes) - let filteredChanges = splittedChanges - if (dataNeeded && dataNeeded() === 0) { - // If the topK is full [..., maxSentValue] then we do not need to send changes > maxSentValue - // because they can never make it into the topK. - // However, if the topK isn't full yet, we need to also send changes > maxSentValue - // because they will make it into the topK - filteredChanges = filterChangesSmallerOrEqualToMax( - splittedChanges, - comparator, - this.biggest - ) - } - - this.sendChangesToPipelineWithTracking(filteredChanges, subscription) + this.sendChangesToPipelineWithTracking(splittedChanges, subscription) } // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far @@ -395,37 +383,3 @@ function* splitUpdates< } } } - -function* filterChanges< - T extends object = Record, - TKey extends string | number = string | number, ->( - changes: Iterable>, - f: (change: ChangeMessage) => boolean -): Generator> { - for (const change of changes) { - if (f(change)) { - yield change - } - } -} - -/** - * Filters changes to only include those that are smaller or equal to the provided max value - * @param changes - Iterable of changes to filter - * @param comparator - Comparator function to use for filtering - * @param maxValue - Range to filter changes within (range boundaries are exclusive) - * @returns Iterable of changes that fall within the range - */ -function* filterChangesSmallerOrEqualToMax< - T extends object = Record, - TKey extends string | number = string | number, ->( - changes: Iterable>, - comparator: (a: any, b: any) => number, - maxValue: any -): Generator> { - yield* filterChanges(changes, (change) => { - return !maxValue || comparator(change.value, maxValue) <= 0 - }) -} diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index d0ec8c6f4..f0cdf8307 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -10,6 +10,7 @@ import { max, not, } from "../../src/query/builder/functions.js" +import { createFilterFunctionFromExpression } from "../../src/collection/change-events.js" type Person = { id: string @@ -2301,3 +2302,469 @@ describe(`OrderBy with collection alias conflicts`, () => { expect(result[2]?.email).toBe(`third@test.com`) }) }) + +describe(`OrderBy with duplicate values`, () => { + type TestItem = { + id: number + a: number + keep: boolean + } + + function createOrderByBugTests(autoIndex: `off` | `eager`): void { + describe(`with autoIndex ${autoIndex}`, () => { + it(`should correctly advance window when there are duplicate values`, async () => { + // Create test data that reproduces the specific bug described: + // Items with many duplicates at value 5, then normal progression + const duplicateTestData: Array = [ + { id: 1, a: 1, keep: true }, + { id: 2, a: 2, keep: true }, + { id: 3, a: 3, keep: true }, + { id: 4, a: 4, keep: true }, + { id: 5, a: 5, keep: true }, + { id: 6, a: 5, keep: true }, + { id: 7, a: 5, keep: true }, + { id: 8, a: 5, keep: true }, + { id: 9, a: 5, keep: true }, + { id: 10, a: 5, keep: true }, + { id: 11, a: 11, keep: true }, + { id: 12, a: 12, keep: true }, + { id: 13, a: 13, keep: true }, + { id: 14, a: 14, keep: true }, + { id: 15, a: 15, keep: true }, + { id: 16, a: 16, keep: true }, + ] + + const duplicateCollection = createCollection( + mockSyncCollectionOptions({ + id: `test-duplicate-window-bug`, + getKey: (item) => item.id, + initialData: duplicateTestData, + autoIndex, + }) + ) + + // Create a live query with offset 0, limit 5 (first page) + const collection = createLiveQueryCollection((q) => + q + .from({ items: duplicateCollection }) + .where(({ items }) => eq(items.keep, true)) + .orderBy(({ items }) => items.a, `asc`) + .offset(0) + .limit(5) + .select(({ items }) => ({ + id: items.id, + a: items.a, + keep: items.keep, + })) + ) + await collection.preload() + + // First page should return items 1-5 + let results = Array.from(collection.values()).sort( + (a, b) => a.id - b.id + ) + expect(results).toEqual([ + { id: 1, a: 1, keep: true }, + { id: 2, a: 2, keep: true }, + { id: 3, a: 3, keep: true }, + { id: 4, a: 4, keep: true }, + { id: 5, a: 5, keep: true }, + ]) + + // Now move to next page (offset 5, limit 5) + collection.utils.setWindow({ offset: 5, limit: 5 }) + await collection.stateWhenReady() + + // Second page should return items 6-10 (all with value 5) + results = Array.from(collection.values()).sort((a, b) => a.id - b.id) + expect(results).toEqual([ + { id: 6, a: 5, keep: true }, + { id: 7, a: 5, keep: true }, + { id: 8, a: 5, keep: true }, + { id: 9, a: 5, keep: true }, + { id: 10, a: 5, keep: true }, + ]) + + // Now move to third page (offset 10, limit 5) + // It should advance past the duplicate 5s + collection.utils.setWindow({ offset: 10, limit: 5 }) + await collection.stateWhenReady() + + // Third page should return items 11-13 (the items after the duplicate 5s) + // The bug would cause this to stall and return empty or get stuck + results = Array.from(collection.values()).sort((a, b) => a.id - b.id) + expect(results).toEqual([ + { id: 11, a: 11, keep: true }, + { id: 12, a: 12, keep: true }, + { id: 13, a: 13, keep: true }, + { id: 14, a: 14, keep: true }, + { id: 15, a: 15, keep: true }, + ]) + + // Verify we can continue to next page + collection.utils.setWindow({ offset: 15, limit: 5 }) + await collection.stateWhenReady() + + // Should be empty since we've exhausted all items + results = Array.from(collection.values()) + expect(results).toEqual([{ id: 16, a: 16, keep: true }]) + }) + + it(`should correctly advance window when there are duplicate values loaded from sync layer`, async () => { + // Create test data that reproduces the specific bug described: + // Items with many duplicates at value 5, then normal progression + const allTestData: Array = [ + { id: 1, a: 1, keep: true }, + { id: 2, a: 2, keep: true }, + { id: 3, a: 3, keep: true }, + { id: 4, a: 4, keep: true }, + { id: 5, a: 5, keep: true }, + { id: 6, a: 5, keep: true }, + { id: 7, a: 5, keep: true }, + { id: 8, a: 5, keep: true }, + { id: 9, a: 5, keep: true }, + { id: 10, a: 5, keep: true }, + { id: 11, a: 11, keep: true }, + { id: 12, a: 12, keep: true }, + { id: 13, a: 13, keep: true }, + { id: 14, a: 14, keep: true }, + { id: 15, a: 15, keep: true }, + { id: 16, a: 16, keep: true }, + ] + + // Start with only the first 5 items in the local collection + const initialData = allTestData.slice(0, 5) + let loadSubsetCallCount = 0 + + const duplicateCollection = createCollection( + mockSyncCollectionOptions({ + id: `test-duplicate-sync-layer-bug`, + getKey: (item) => item.id, + initialData, + autoIndex, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + // Load initial data + begin() + initialData.forEach((item) => { + write({ + type: `insert`, + value: item, + }) + }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCallCount++ + + // Simulate async loading from remote source + return new Promise((resolve) => { + setTimeout(() => { + begin() + + // Order all test data by field 'a' in ascending order + const sortedData = [...allTestData].sort( + (a, b) => a.a - b.a + ) + + // Apply where clause filter if present + let filteredData = sortedData + if (options.where) { + try { + const filterFn = createFilterFunctionFromExpression( + options.where + ) + filteredData = sortedData.filter(filterFn) + } catch (error) { + console.log(`Error compiling where clause:`, error) + // If compilation fails, use all data + filteredData = sortedData + } + } + + // Return a slice from 0 to limit + const { limit } = options + const dataToLoad = limit + ? filteredData.slice(0, limit) + : filteredData + + dataToLoad.forEach((item) => { + write({ + type: `insert`, + value: item, + }) + }) + + commit() + resolve() + }, 10) // Small delay to simulate network + }) + }, + } + }, + }, + }) + ) + + // Create a live query with offset 0, limit 5 (first page) + const collection = createLiveQueryCollection((q) => + q + .from({ items: duplicateCollection }) + .where(({ items }) => eq(items.keep, true)) + .orderBy(({ items }) => items.a, `asc`) + .offset(0) + .limit(5) + .select(({ items }) => ({ + id: items.id, + a: items.a, + keep: items.keep, + })) + ) + await collection.preload() + + // First page should return items 1-5 (all local data) + let results = Array.from(collection.values()).sort( + (a, b) => a.id - b.id + ) + expect(results).toEqual([ + { id: 1, a: 1, keep: true }, + { id: 2, a: 2, keep: true }, + { id: 3, a: 3, keep: true }, + { id: 4, a: 4, keep: true }, + { id: 5, a: 5, keep: true }, + ]) + expect(loadSubsetCallCount).toBe(1) + + // Now move to next page (offset 5, limit 5) - this should trigger loadSubset + const moveToSecondPage = collection.utils.setWindow({ + offset: 5, + limit: 5, + }) + expect(moveToSecondPage).toBeInstanceOf(Promise) + await moveToSecondPage + + // Second page should return items 6-10 (all with value 5, loaded from sync layer) + results = Array.from(collection.values()).sort((a, b) => a.id - b.id) + expect(results).toEqual([ + { id: 6, a: 5, keep: true }, + { id: 7, a: 5, keep: true }, + { id: 8, a: 5, keep: true }, + { id: 9, a: 5, keep: true }, + { id: 10, a: 5, keep: true }, + ]) + // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) + expect(loadSubsetCallCount).toBe(3) + + // Now move to third page (offset 10, limit 5) + // It should advance past the duplicate 5s + const moveToThirdPage = collection.utils.setWindow({ + offset: 10, + limit: 5, + }) + + // Now it is `true` because we already have that page + // because when we loaded the 2nd page we loaded all the duplicate 5s and then we loaded + // values > 5 with limit 5 but since the entire 2nd page is filled with the duplicate 5s + // we in fact already loaded the third page so it is immediately available here + expect(moveToThirdPage).toBe(true) + + // Third page should return items 11-13 (the items after the duplicate 5s) + // The bug would cause this to stall and return empty or get stuck + results = Array.from(collection.values()).sort((a, b) => a.id - b.id) + expect(results).toEqual([ + { id: 11, a: 11, keep: true }, + { id: 12, a: 12, keep: true }, + { id: 13, a: 13, keep: true }, + { id: 14, a: 14, keep: true }, + { id: 15, a: 15, keep: true }, + ]) + // We expect no more loadSubset calls because when we loaded the previous page + // we asked for all data equal to max value and LIMIT values greater than max value + // and the LIMIT values greater than max value already loaded the next page + expect(loadSubsetCallCount).toBe(3) + }) + + it(`should correctly advance window when there are duplicate values loaded from both local collection and sync layer`, async () => { + // Create test data that reproduces the specific bug described: + // Items with many duplicates at value 5, then normal progression + const allTestData: Array = [ + { id: 1, a: 1, keep: true }, + { id: 2, a: 2, keep: true }, + { id: 3, a: 3, keep: true }, + { id: 4, a: 4, keep: true }, + { id: 5, a: 5, keep: true }, + { id: 6, a: 5, keep: true }, + { id: 7, a: 5, keep: true }, + { id: 8, a: 5, keep: true }, + { id: 9, a: 5, keep: true }, + { id: 10, a: 5, keep: true }, + { id: 11, a: 11, keep: true }, + { id: 12, a: 12, keep: true }, + { id: 13, a: 13, keep: true }, + { id: 14, a: 14, keep: true }, + { id: 15, a: 15, keep: true }, + { id: 16, a: 16, keep: true }, + ] + + // Start with only the first 5 items in the local collection + const initialData = allTestData.slice(0, 10) + let loadSubsetCallCount = 0 + + const duplicateCollection = createCollection( + mockSyncCollectionOptions({ + id: `test-duplicate-sync-layer-bug`, + getKey: (item) => item.id, + initialData, + autoIndex, + syncMode: `on-demand`, + sync: { + sync: ({ begin, write, commit, markReady }) => { + // Load initial data + begin() + initialData.forEach((item) => { + write({ + type: `insert`, + value: item, + }) + }) + commit() + markReady() + + return { + loadSubset: (options) => { + loadSubsetCallCount++ + + // Simulate async loading from remote source + return new Promise((resolve) => { + setTimeout(() => { + begin() + + // Order all test data by field 'a' in ascending order + const sortedData = [...allTestData].sort( + (a, b) => a.a - b.a + ) + + // Apply where clause filter if present + let filteredData = sortedData + if (options.where) { + try { + const filterFn = createFilterFunctionFromExpression( + options.where + ) + filteredData = sortedData.filter(filterFn) + } catch (error) { + console.log(`Error compiling where clause:`, error) + // If compilation fails, use all data + filteredData = sortedData + } + } + + // Return a slice from 0 to limit + const { limit } = options + const dataToLoad = limit + ? filteredData.slice(0, limit) + : filteredData + + dataToLoad.forEach((item) => { + write({ + type: `insert`, + value: item, + }) + }) + + commit() + resolve() + }, 10) // Small delay to simulate network + }) + }, + } + }, + }, + }) + ) + + // Create a live query with offset 0, limit 5 (first page) + const collection = createLiveQueryCollection((q) => + q + .from({ items: duplicateCollection }) + .where(({ items }) => eq(items.keep, true)) + .orderBy(({ items }) => items.a, `asc`) + .offset(0) + .limit(5) + .select(({ items }) => ({ + id: items.id, + a: items.a, + keep: items.keep, + })) + ) + await collection.preload() + + // First page should return items 1-5 (all local data) + let results = Array.from(collection.values()).sort( + (a, b) => a.id - b.id + ) + expect(results).toEqual([ + { id: 1, a: 1, keep: true }, + { id: 2, a: 2, keep: true }, + { id: 3, a: 3, keep: true }, + { id: 4, a: 4, keep: true }, + { id: 5, a: 5, keep: true }, + ]) + expect(loadSubsetCallCount).toBe(1) + + // Now move to next page (offset 5, limit 5) - this should trigger loadSubset + const moveToSecondPage = collection.utils.setWindow({ + offset: 5, + limit: 5, + }) + expect(moveToSecondPage).toBeInstanceOf(Promise) + await moveToSecondPage + + // Second page should return items 6-10 (all with value 5, loaded from sync layer) + results = Array.from(collection.values()).sort((a, b) => a.id - b.id) + expect(results).toEqual([ + { id: 6, a: 5, keep: true }, + { id: 7, a: 5, keep: true }, + { id: 8, a: 5, keep: true }, + { id: 9, a: 5, keep: true }, + { id: 10, a: 5, keep: true }, + ]) + // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) + expect(loadSubsetCallCount).toBe(3) + + // Now move to third page (offset 10, limit 5) + // It should advance past the duplicate 5s + const moveToThirdPage = collection.utils.setWindow({ + offset: 10, + limit: 5, + }) + + // Now it is `true` because we already have that page + // because when we loaded the 2nd page we loaded all the duplicate 5s and then we loaded + // values > 5 with limit 5 but since the entire 2nd page is filled with the duplicate 5s + // we in fact already loaded the third page so it is immediately available here + expect(moveToThirdPage).toBe(true) + + // Third page should return items 11-13 (the items after the duplicate 5s) + // The bug would cause this to stall and return empty or get stuck + results = Array.from(collection.values()).sort((a, b) => a.id - b.id) + expect(results).toEqual([ + { id: 11, a: 11, keep: true }, + { id: 12, a: 12, keep: true }, + { id: 13, a: 13, keep: true }, + { id: 14, a: 14, keep: true }, + { id: 15, a: 15, keep: true }, + ]) + // We expect no more loadSubset calls because when we loaded the previous page + // we asked for all data equal to max value and LIMIT values greater than max value + // and the LIMIT values greater than max value already loaded the next page + expect(loadSubsetCallCount).toBe(3) + }) + }) + } + + createOrderByBugTests(`eager`) +}) diff --git a/packages/db/tests/utils.ts b/packages/db/tests/utils.ts index 66c301464..98909b22f 100644 --- a/packages/db/tests/utils.ts +++ b/packages/db/tests/utils.ts @@ -172,11 +172,13 @@ export function withIndexTracking( } } -type MockSyncCollectionConfig = { +type MockSyncCollectionConfig> = { id: string initialData: Array getKey: (item: T) => string | number autoIndex?: `off` | `eager` + sync?: SyncConfig + syncMode?: `eager` | `on-demand` } export function mockSyncCollectionOptions< @@ -218,27 +220,30 @@ export function mockSyncCollectionOptions< }, } + const sync = config.sync ?? { + sync: (params: Parameters[`sync`]>[0]) => { + begin = params.begin + write = params.write + commit = params.commit + const markReady = params.markReady + + begin() + config.initialData.forEach((item) => { + write({ + type: `insert`, + value: item, + }) + }) + commit() + markReady() + }, + } + const options: CollectionConfig & { utils: typeof utils } = { - sync: { - sync: (params: Parameters[`sync`]>[0]) => { - begin = params.begin - write = params.write - commit = params.commit - const markReady = params.markReady - - begin() - config.initialData.forEach((item) => { - write({ - type: `insert`, - value: item, - }) - }) - commit() - markReady() - }, - }, + sync, + ...(config.syncMode ? { syncMode: config.syncMode } : {}), startSync: true, onInsert: async (_params: MutationFnParams) => { // TODO diff --git a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx index 1496a27f1..5dbb69c1b 100644 --- a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx +++ b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx @@ -3,6 +3,8 @@ import { act, renderHook, waitFor } from "@testing-library/react" import { createCollection, createLiveQueryCollection, eq } from "@tanstack/db" import { useLiveInfiniteQuery } from "../src/useLiveInfiniteQuery" import { mockSyncCollectionOptions } from "../../db/tests/utils" +import { createFilterFunctionFromExpression } from "../../db/src/collection/change-events" +import type { LoadSubsetOptions } from "@tanstack/db" type Post = { id: string @@ -855,7 +857,8 @@ describe(`useLiveInfiniteQuery`, () => { }) it(`should track isFetchingNextPage when async loading is triggered`, async () => { - let loadSubsetCallCount = 0 + // Define all data upfront + const allPosts = createMockPosts(30) const collection = createCollection({ id: `async-loading-test`, @@ -864,49 +867,55 @@ describe(`useLiveInfiniteQuery`, () => { startSync: true, sync: { sync: ({ markReady, begin, write, commit }) => { - // Provide initial data + // Provide initial data by slicing the first 15 elements begin() - for (let i = 1; i <= 15; i++) { + const initialPosts = allPosts.slice(0, 15) + for (const post of initialPosts) { write({ type: `insert`, - value: { - id: `${i}`, - title: `Post ${i}`, - content: `Content ${i}`, - createdAt: 1000000 - i * 1000, - category: i % 2 === 0 ? `tech` : `life`, - }, + value: post, }) } commit() markReady() return { - loadSubset: () => { - loadSubsetCallCount++ + loadSubset: (opts: LoadSubsetOptions) => { + // Filter the data array based on opts + let filtered = allPosts + + // Apply where clause if provided + if (opts.where) { + const filterFn = createFilterFunctionFromExpression(opts.where) + filtered = filtered.filter(filterFn) + } + + // Sort by createdAt descending if orderBy is provided + if (opts.orderBy && opts.orderBy.length > 0) { + filtered = filtered.sort((a, b) => { + // We know ordering is always by createdAt descending + return b.createdAt - a.createdAt + }) + } - // First few calls return true (initial load + window setup) - if (loadSubsetCallCount <= 2) { - return true + // Apply limit if provided + if (opts.limit !== undefined) { + filtered = filtered.slice(0, opts.limit) } // Subsequent calls simulate async loading with a real timeout const loadPromise = new Promise((resolve) => { setTimeout(() => { begin() - // Load more data - for (let i = 16; i <= 30; i++) { + + // Insert the requested posts + for (const post of filtered) { write({ type: `insert`, - value: { - id: `${i}`, - title: `Post ${i}`, - content: `Content ${i}`, - createdAt: 1000000 - i * 1000, - category: i % 2 === 0 ? `tech` : `life`, - }, + value: post, }) } + commit() resolve() }, 50)