From ce91599e827a4f85694105f292419b589f8a4edb Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 3 Dec 2025 14:24:15 +0000 Subject: [PATCH 1/4] feat(db,electric,query): separate cursor expressions from where clause in loadSubset - Add CursorExpressions type with whereFrom, whereCurrent, and lastKey - LoadSubsetOptions.where no longer includes cursor - passed separately via cursor property - Add offset to LoadSubsetOptions for offset-based pagination support - Electric sync layer makes two parallel requestSnapshot calls when cursor present - Query collection serialization includes offset for query key generation This allows sync layers to choose between cursor-based or offset-based pagination, and Electric can efficiently handle tie-breaking with targeted requests. test(react-db): update useLiveInfiniteQuery test mock to handle cursor expressions The test mock's loadSubset handler now handles the new cursor property in LoadSubsetOptions by combining whereCurrent (ties) and whereFrom (next page) data, deduplicating by id, and re-sorting. fix(electric): make cursor requestSnapshot calls sequential Changed parallel requestSnapshot calls to sequential to avoid potential issues with concurrent snapshot requests that may cause timeouts in CI. fix(electric): combine cursor expressions into single requestSnapshot Instead of making two separate requestSnapshot calls (one for whereFrom, one for whereCurrent), combine them using OR into a single request. This avoids potential issues with multiple sequential snapshot requests that were causing timeouts in CI. The combined expression (whereFrom OR whereCurrent) matches the original behavior where cursor was combined with the where clause. wip working? update changeset fix query test --- .changeset/cursor-pagination-loadsubset.md | 38 ++++ .../src/suites/pagination.suite.ts | 10 + packages/db/src/collection/events.ts | 16 +- packages/db/src/collection/index.ts | 1 + packages/db/src/collection/state.ts | 20 +- packages/db/src/collection/subscription.ts | 121 +++++----- packages/db/src/query/index.ts | 1 + .../src/query/live/collection-subscriber.ts | 26 ++- packages/db/src/query/predicate-utils.ts | 67 +++++- packages/db/src/types.ts | 41 +++- packages/db/tests/query/order-by.test.ts | 189 +++++++++++++--- .../db/tests/query/predicate-utils.test.ts | 213 ++++++++++++++++++ .../electric-db-collection/src/electric.ts | 61 ++++- .../tests/electric-live-query.test.ts | 23 +- .../query-db-collection/e2e/query-filter.ts | 23 +- .../query-db-collection/src/serialization.ts | 9 +- .../tests/useLiveInfiniteQuery.test.tsx | 40 +++- 17 files changed, 763 insertions(+), 136 deletions(-) create mode 100644 .changeset/cursor-pagination-loadsubset.md diff --git a/.changeset/cursor-pagination-loadsubset.md b/.changeset/cursor-pagination-loadsubset.md new file mode 100644 index 000000000..3931ca3dc --- /dev/null +++ b/.changeset/cursor-pagination-loadsubset.md @@ -0,0 +1,38 @@ +--- +"@tanstack/db": patch +"@tanstack/electric-db-collection": patch +"@tanstack/query-db-collection": patch +--- + +Enhanced LoadSubsetOptions with separate cursor expressions and offset for flexible pagination. + +**⚠️ Breaking Change for Custom Sync Layers / Query Collections:** + +`LoadSubsetOptions.where` no longer includes cursor expressions for pagination. If you have a custom sync layer or query collection that implements `loadSubset`, you must now handle pagination separately: + +- **Cursor-based pagination:** Use the new `cursor` property (`cursor.whereFrom` and `cursor.whereCurrent`) and combine them with `where` yourself +- **Offset-based pagination:** Use the new `offset` property + +Previously, cursor expressions were baked into the `where` clause. Now they are passed separately so sync layers can choose their preferred pagination strategy. + +**Changes:** + +- Added `CursorExpressions` type with `whereFrom`, `whereCurrent`, and optional `lastKey` properties +- Added `cursor` to `LoadSubsetOptions` for cursor-based pagination (separate from `where`) +- Added `offset` to `LoadSubsetOptions` for offset-based pagination support +- Electric sync layer now makes two parallel `requestSnapshot` calls when cursor is present: + - One for `whereCurrent` (all ties at boundary, no limit) + - One for `whereFrom` (rows after cursor, with limit) +- Query collection serialization now includes `offset` for query key generation +- Added `truncate` event to collections, emitted when synced data is truncated (e.g., after `must-refetch`) +- Fixed `setWindow` pagination: cursor expressions are now correctly built when paging through results +- Fixed offset tracking: `loadNextItems` now passes the correct window offset to prevent incorrect deduplication +- `CollectionSubscriber` now listens for `truncate` events to reset cursor tracking state + +**Benefits:** + +- Sync layers can choose between cursor-based or offset-based pagination strategies +- Electric can efficiently handle tie-breaking with two targeted requests +- Better separation of concerns between filtering (`where`) and pagination (`cursor`/`offset`) +- `setWindow` correctly triggers backend loading for subsequent pages in multi-column orderBy queries +- Cursor state is properly reset after truncation, preventing stale cursor data from being used diff --git a/packages/db-collection-e2e/src/suites/pagination.suite.ts b/packages/db-collection-e2e/src/suites/pagination.suite.ts index 7ccf2c60b..642dc13d6 100644 --- a/packages/db-collection-e2e/src/suites/pagination.suite.ts +++ b/packages/db-collection-e2e/src/suites/pagination.suite.ts @@ -415,6 +415,8 @@ export function createPaginationTestSuite( .limit(10), ) + console.log(`[QUERY DEBUG] query:`, query) + await query.preload() await waitForQueryData(query, { minSize: 10 }) @@ -433,11 +435,15 @@ export function createPaginationTestSuite( } } + console.log(`[QUERY DEBUG] setting window`) + // Move to second page using setWindow // IMPORTANT: setWindow returns a Promise when loading is required, // or `true` if data is already available. We verify loading occurs. const setWindowResult = query.utils.setWindow({ offset: 10, limit: 10 }) + console.log(`[QUERY DEBUG] setWindowResult:`, setWindowResult) + // In on-demand mode, moving to offset 10 should trigger loading // since only the first 10 records were initially loaded if (setWindowResult !== true) { @@ -446,10 +452,14 @@ export function createPaginationTestSuite( } await waitForQueryData(query, { minSize: 10 }) + console.log(`[QUERY DEBUG] waited for data`, setWindowResult) + // Get second page const secondPage = Array.from(query.state.values()) expect(secondPage).toHaveLength(10) + console.log(`[QUERY DEBUG] second page:`, secondPage) + // Verify second page ordering for (let i = 1; i < secondPage.length; i++) { const prev = secondPage[i - 1]! diff --git a/packages/db/src/collection/events.ts b/packages/db/src/collection/events.ts index 19c28d5ea..194943dc3 100644 --- a/packages/db/src/collection/events.ts +++ b/packages/db/src/collection/events.ts @@ -43,10 +43,19 @@ export interface CollectionLoadingSubsetChangeEvent { loadingSubsetTransition: `start` | `end` } +/** + * Event emitted when the collection is truncated (all data cleared) + */ +export interface CollectionTruncateEvent { + type: `truncate` + collection: Collection +} + export type AllCollectionEvents = { - 'status:change': CollectionStatusChangeEvent - 'subscribers:change': CollectionSubscribersChangeEvent - 'loadingSubset:change': CollectionLoadingSubsetChangeEvent + "status:change": CollectionStatusChangeEvent + "subscribers:change": CollectionSubscribersChangeEvent + "loadingSubset:change": CollectionLoadingSubsetChangeEvent + truncate: CollectionTruncateEvent } & { [K in CollectionStatus as `status:${K}`]: CollectionStatusEvent } @@ -56,6 +65,7 @@ export type CollectionEvent = | CollectionStatusChangeEvent | CollectionSubscribersChangeEvent | CollectionLoadingSubsetChangeEvent + | CollectionTruncateEvent export type CollectionEventHandler = ( event: AllCollectionEvents[T], diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index ce112024f..ecd8d70bf 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -365,6 +365,7 @@ export class CollectionImpl< lifecycle: this._lifecycle, changes: this._changes, indexes: this._indexes, + events: this._events, }) this._sync.setDeps({ collection: this, // Required for passing to config.sync callback diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index f183aa84e..ae4361bdd 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -6,11 +6,12 @@ import type { ChangeMessage, CollectionConfig, OptimisticChangeMessage, -} from '../types' -import type { CollectionImpl } from './index.js' -import type { CollectionLifecycleManager } from './lifecycle' -import type { CollectionChangesManager } from './changes' -import type { CollectionIndexesManager } from './indexes' +} from "../types" +import type { CollectionImpl } from "./index.js" +import type { CollectionLifecycleManager } from "./lifecycle" +import type { CollectionChangesManager } from "./changes" +import type { CollectionIndexesManager } from "./indexes" +import type { CollectionEventsManager } from "./events" interface PendingSyncedTransaction< T extends object = Record, @@ -37,6 +38,7 @@ export class CollectionStateManager< public lifecycle!: CollectionLifecycleManager public changes!: CollectionChangesManager public indexes!: CollectionIndexesManager + private _events!: CollectionEventsManager // Core state - make public for testing public transactions: SortedMap> @@ -79,11 +81,13 @@ export class CollectionStateManager< lifecycle: CollectionLifecycleManager changes: CollectionChangesManager indexes: CollectionIndexesManager + events: CollectionEventsManager }) { this.collection = deps.collection this.lifecycle = deps.lifecycle this.changes = deps.changes this.indexes = deps.indexes + this._events = deps.events } /** @@ -525,6 +529,12 @@ export class CollectionStateManager< for (const key of changedKeys) { currentVisibleState.delete(key) } + + // 4) Emit truncate event so subscriptions can reset their cursor tracking state + this._events.emit(`truncate`, { + type: `truncate`, + collection: this.collection, + }) } for (const operation of transaction.operations) { diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 045eb5042..44981d460 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -34,6 +34,8 @@ type RequestLimitedSnapshotOptions = { limit: number /** All column values for cursor (first value used for local index, all values for sync layer) */ minValues?: Array + /** Row offset for offset-based pagination (passed to sync layer) */ + offset?: number } type CollectionSubscriptionOptions = { @@ -63,6 +65,12 @@ export class CollectionSubscription // Keep track of the keys we've sent (needed for join and orderBy optimizations) private sentKeys = new Set() + // Track the count of rows sent via requestLimitedSnapshot for offset-based pagination + private limitedSnapshotRowCount = 0 + + // Track the last key sent via requestLimitedSnapshot for cursor-based pagination + private lastSentKey: string | number | undefined + private filteredCallback: (changes: Array>) => void private orderByIndex: IndexInterface | undefined @@ -258,6 +266,7 @@ export class CollectionSubscription orderBy, limit, minValues, + offset, }: RequestLimitedSnapshotOptions) { if (!limit) throw new Error(`limit is required`) @@ -354,77 +363,75 @@ export class CollectionSubscription keys = index.take(valuesNeeded(), biggestObservedValue, filterFn) } + // Track row count for offset-based pagination (before sending to callback) + // Use the current count as the offset for this load + const currentOffset = this.limitedSnapshotRowCount + this.callback(changes) - // Build the WHERE filter for sync layer loadSubset - // buildCursor handles both single-column and multi-column cases - let whereWithValueFilter = where + // Update the row count and last key after sending (for next call's offset/cursor) + this.limitedSnapshotRowCount += changes.length + if (changes.length > 0) { + this.lastSentKey = changes[changes.length - 1]!.key + } + + // Build cursor expressions for sync layer loadSubset + // The cursor expressions are separate from the main where clause + // so the sync layer can choose cursor-based or offset-based pagination + let cursorExpressions: + | { + whereFrom: BasicExpression + whereCurrent: BasicExpression + lastKey?: string | number + } + | undefined + if (minValues !== undefined && minValues.length > 0) { - const cursor = buildCursor(orderBy, minValues) - if (cursor) { - whereWithValueFilter = where ? and(where, cursor) : cursor + const whereFromCursor = buildCursor(orderBy, minValues) + + if (whereFromCursor) { + const { expression } = orderBy[0]! + const minValue = minValues[0] + + // Build the whereCurrent expression for the first orderBy column + // For Date values, we need to handle precision differences between JS (ms) and backends (μs) + // A JS Date represents a 1ms range, so we query for all values within that range + let whereCurrentCursor: BasicExpression + if (minValue instanceof Date) { + const minValuePlus1ms = new Date(minValue.getTime() + 1) + whereCurrentCursor = and( + gte(expression, new Value(minValue)), + lt(expression, new Value(minValuePlus1ms)), + ) + } else { + whereCurrentCursor = eq(expression, new Value(minValue)) + } + + cursorExpressions = { + whereFrom: whereFromCursor, + whereCurrent: whereCurrentCursor, + lastKey: this.lastSentKey, + } } } // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - const loadOptions1: LoadSubsetOptions = { - where: whereWithValueFilter, + // Note: `where` does NOT include cursor expressions - they are passed separately + // The sync layer can choose to use cursor-based or offset-based pagination + const loadOptions: LoadSubsetOptions = { + where, // Main filter only, no cursor limit, orderBy, + cursor: cursorExpressions, // Cursor expressions passed separately + offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset subscription: this, } - const syncResult = this.collection._sync.loadSubset(loadOptions1) + const syncResult = this.collection._sync.loadSubset(loadOptions) // Track this loadSubset call - this.loadedSubsets.push(loadOptions1) - - // Make parallel loadSubset calls for values equal to minValue and values greater than minValue - const promises: Array> = [] - - // First promise: load all values equal to minValue - if (typeof minValue !== `undefined`) { - const { expression } = orderBy[0]! - - // For Date values, we need to handle precision differences between JS (ms) and backends (μs) - // A JS Date represents a 1ms range, so we query for all values within that range - let exactValueFilter - if (minValue instanceof Date) { - const minValuePlus1ms = new Date(minValue.getTime() + 1) - exactValueFilter = and( - gte(expression, new Value(minValue)), - lt(expression, new Value(minValuePlus1ms)), - ) - } else { - exactValueFilter = eq(expression, new Value(minValue)) - } - - const loadOptions2: LoadSubsetOptions = { - where: exactValueFilter, - subscription: this, - } - const equalValueResult = this.collection._sync.loadSubset(loadOptions2) - - // Track this loadSubset call - this.loadedSubsets.push(loadOptions2) - - if (equalValueResult instanceof Promise) { - promises.push(equalValueResult) - } - } - - // Second promise: load values greater than minValue - if (syncResult instanceof Promise) { - promises.push(syncResult) - } - - // Track the combined promise - if (promises.length > 0) { - const combinedPromise = Promise.all(promises).then(() => {}) - this.trackLoadSubsetPromise(combinedPromise) - } else { - this.trackLoadSubsetPromise(syncResult) - } + this.loadedSubsets.push(loadOptions) + this.trackLoadSubsetPromise(syncResult) } // TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index 979e694a8..e798b283a 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -65,6 +65,7 @@ export { minusWherePredicates, isOrderBySubset, isLimitSubset, + isOffsetLimitSubset, isPredicateSubset, } from './predicate-utils.js' diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 8c7a01d0f..f368562cf 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -190,6 +190,17 @@ export class CollectionSubscriber< whereExpression, }) + // Listen for truncate events to reset cursor tracking state + // This ensures that after a must-refetch/truncate, we don't use stale cursor data + const truncateUnsubscribe = this.collection.on(`truncate`, () => { + this.biggest = undefined + }) + + // Clean up truncate listener when subscription is unsubscribed + subscription.on(`unsubscribed`, () => { + truncateUnsubscribe() + }) + // Normalize the orderBy clauses such that the references are relative to the collection const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias) @@ -285,7 +296,7 @@ export class CollectionSubscriber< if (!orderByInfo) { return } - const { orderBy, valueExtractorForRawRow } = orderByInfo + const { orderBy, valueExtractorForRawRow, offset } = orderByInfo const biggestSentRow = this.biggest // Extract all orderBy column values from the biggest sent row @@ -306,10 +317,12 @@ export class CollectionSubscriber< const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias) // Take the `n` items after the biggest sent value + // Pass the current window offset to ensure proper deduplication subscription.requestLimitedSnapshot({ orderBy: normalizedOrderBy, limit: n, minValues, + offset, }) } @@ -338,10 +351,13 @@ export class CollectionSubscriber< comparator: (a: any, b: any) => number, ) { for (const change of changes) { - if (!this.biggest) { - this.biggest = change.value - } else if (comparator(this.biggest, change.value) < 0) { - this.biggest = change.value + // Only track inserts/updates for cursor positioning, not deletes + if (change.type !== `delete`) { + if (!this.biggest) { + this.biggest = change.value + } else if (comparator(this.biggest, change.value) < 0) { + this.biggest = change.value + } } yield change diff --git a/packages/db/src/query/predicate-utils.ts b/packages/db/src/query/predicate-utils.ts index 672078bfd..a94e77eb3 100644 --- a/packages/db/src/query/predicate-utils.ts +++ b/packages/db/src/query/predicate-utils.ts @@ -756,6 +756,9 @@ export function isOrderBySubset( * Check if one limit is a subset of another. * Returns true if the subset limit requirements are satisfied by the superset limit. * + * Note: This function does NOT consider offset. For offset-aware subset checking, + * use `isOffsetLimitSubset` instead. + * * @example * isLimitSubset(10, 20) // true (requesting 10 items when 20 are available) * isLimitSubset(20, 10) // false (requesting 20 items when only 10 are available) @@ -785,7 +788,57 @@ export function isLimitSubset( } /** - * Check if one predicate (where + orderBy + limit) is a subset of another. + * Check if one offset+limit range is a subset of another. + * Returns true if the subset range is fully contained within the superset range. + * + * A query with `{limit: 10, offset: 0}` loads rows [0, 10). + * A query with `{limit: 10, offset: 20}` loads rows [20, 30). + * + * For subset to be satisfied by superset: + * - Superset must start at or before subset (superset.offset <= subset.offset) + * - Superset must end at or after subset (superset.offset + superset.limit >= subset.offset + subset.limit) + * + * @example + * isOffsetLimitSubset({ offset: 0, limit: 5 }, { offset: 0, limit: 10 }) // true + * isOffsetLimitSubset({ offset: 5, limit: 5 }, { offset: 0, limit: 10 }) // true (rows 5-9 within 0-9) + * isOffsetLimitSubset({ offset: 5, limit: 10 }, { offset: 0, limit: 10 }) // false (rows 5-14 exceed 0-9) + * isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0, limit: 10 }) // false (rows 20-29 outside 0-9) + * + * @param subset - The offset+limit requirements to check + * @param superset - The offset+limit that might satisfy the requirements + * @returns true if subset range is fully contained within superset range + */ +export function isOffsetLimitSubset( + subset: { offset?: number; limit?: number }, + superset: { offset?: number; limit?: number } +): boolean { + const subsetOffset = subset.offset ?? 0 + const supersetOffset = superset.offset ?? 0 + + // Superset must start at or before subset + if (supersetOffset > subsetOffset) { + return false + } + + // If superset is unlimited, it covers everything from its offset onwards + if (superset.limit === undefined) { + return true + } + + // If subset is unlimited but superset has a limit, subset can't be satisfied + if (subset.limit === undefined) { + return false + } + + // Both have limits - check if subset range is within superset range + const subsetEnd = subsetOffset + subset.limit + const supersetEnd = supersetOffset + superset.limit + + return subsetEnd <= supersetEnd +} + +/** + * Check if one predicate (where + orderBy + limit + offset) is a subset of another. * Returns true if all aspects of the subset predicate are satisfied by the superset. * * @example @@ -813,9 +866,9 @@ export function isPredicateSubset( // The top 10 items matching 'search%' might include items outside the overall top 10. // // However, if the where clauses are equal, then the subset relationship can - // be determined by orderBy and limit alone: - // Example: superset = {where: status='active', limit: 10, orderBy: desc} - // subset = {where: status='active', limit: 5, orderBy: desc} + // be determined by orderBy, limit, and offset: + // Example: superset = {where: status='active', limit: 10, offset: 0, orderBy: desc} + // subset = {where: status='active', limit: 5, offset: 0, orderBy: desc} // The top 5 active items ARE contained in the top 10 active items. if (superset.limit !== undefined) { // For limited supersets, where clauses must be equal @@ -824,15 +877,17 @@ export function isPredicateSubset( } return ( isOrderBySubset(subset.orderBy, superset.orderBy) && - isLimitSubset(subset.limit, superset.limit) + isOffsetLimitSubset(subset, superset) ) } // For unlimited supersets, use the normal subset logic + // Still need to consider offset - an unlimited query with offset only covers + // rows from that offset onwards return ( isWhereSubset(subset.where, superset.where) && isOrderBySubset(subset.orderBy, superset.orderBy) && - isLimitSubset(subset.limit, superset.limit) + isOffsetLimitSubset(subset, superset) ) } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index b6380e7e7..22f0fd75b 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -253,13 +253,52 @@ export interface Subscription extends EventEmitter { readonly status: SubscriptionStatus } +/** + * Cursor expressions for pagination, passed separately from the main `where` clause. + * The sync layer can choose to use cursor-based pagination (combining these with the where) + * or offset-based pagination (ignoring these and using the `offset` parameter). + * + * Neither expression includes the main `where` clause - they are cursor-specific only. + */ +export type CursorExpressions = { + /** + * Expression for rows greater than (after) the cursor value. + * For multi-column orderBy, this is a composite cursor using OR of conditions. + * Example for [col1 ASC, col2 DESC] with values [v1, v2]: + * or(gt(col1, v1), and(eq(col1, v1), lt(col2, v2))) + */ + whereFrom: BasicExpression + /** + * Expression for rows equal to the current cursor value (first orderBy column only). + * Used to handle tie-breaking/duplicates at the boundary. + * Example: eq(col1, v1) or for Dates: and(gte(col1, v1), lt(col1, v1+1ms)) + */ + whereCurrent: BasicExpression + /** + * The key of the last item that was loaded. + * Can be used by sync layers for tracking or deduplication. + */ + lastKey?: string | number +} + export type LoadSubsetOptions = { - /** The where expression to filter the data */ + /** The where expression to filter the data (does NOT include cursor expressions) */ where?: BasicExpression /** The order by clause to sort the data */ orderBy?: OrderBy /** The limit of the data to load */ limit?: number + /** + * Cursor expressions for cursor-based pagination. + * These are separate from `where` - the sync layer should combine them if using cursor-based pagination. + * Neither expression includes the main `where` clause. + */ + cursor?: CursorExpressions + /** + * Row offset for offset-based pagination. + * The sync layer can use this instead of `cursor` if it prefers offset-based pagination. + */ + offset?: number /** * The subscription that triggered the load. * Advanced sync implementations can use this for: diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index b9d28f84d..4ff1f4c63 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -2568,6 +2568,7 @@ describe(`OrderBy with duplicate values`, () => { it(`should correctly advance window when there are duplicate values loaded from sync layer`, async () => { // Create test data that reproduces the specific bug described: // Items with many duplicates at value 5, then normal progression + // Note: loadSubset now receives cursor expressions (whereFrom/whereCurrent) separately from where const allTestData: Array = [ { id: 1, a: 1, keep: true }, { id: 2, a: 2, keep: true }, @@ -2640,11 +2641,56 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return a slice from 0 to limit + // Apply cursor expressions if present (cursor-based pagination) + // For proper cursor-based pagination: + // - whereCurrent should load ALL ties (no limit) + // - whereFrom should load with remaining limit + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + const { limit } = options + try { + // Get ALL rows matching whereCurrent (no limit for ties) + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = + filteredData.filter(whereCurrentFn) + + // Get rows matching whereFrom with limit (for next page data) + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + const limitedFromData = limit + ? fromData.slice(0, limit) + : fromData + + // Combine: current rows + from rows (deduplicated) + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + // Re-sort after combining + filteredData.sort((a, b) => a.a - b.a) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + + // Return data (limit already applied when cursor is present) const { limit } = options - const dataToLoad = limit - ? filteredData.slice(0, limit) - : filteredData + const dataToLoad = + limit && !options.cursor + ? filteredData.slice(0, limit) + : filteredData dataToLoad.forEach((item) => { write({ @@ -2710,8 +2756,8 @@ describe(`OrderBy with duplicate values`, () => { { id: 9, a: 5, keep: true }, { id: 10, a: 5, keep: true }, ]) - // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) - expect(loadSubsetCallCount).toBe(3) + // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) + expect(loadSubsetCallCount).toBe(2) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s @@ -2739,12 +2785,13 @@ describe(`OrderBy with duplicate values`, () => { // We expect no more loadSubset calls because when we loaded the previous page // we asked for all data equal to max value and LIMIT values greater than max value // and the LIMIT values greater than max value already loaded the next page - expect(loadSubsetCallCount).toBe(3) + expect(loadSubsetCallCount).toBe(2) }) it(`should correctly advance window when there are duplicate values loaded from both local collection and sync layer`, async () => { // Create test data that reproduces the specific bug described: // Items with many duplicates at value 5, then normal progression + // Note: loadSubset now receives cursor expressions (whereFrom/whereCurrent) separately from where const allTestData: Array = [ { id: 1, a: 1, keep: true }, { id: 2, a: 2, keep: true }, @@ -2764,7 +2811,7 @@ describe(`OrderBy with duplicate values`, () => { { id: 16, a: 16, keep: true }, ] - // Start with only the first 5 items in the local collection + // Start with the first 10 items in the local collection (includes all duplicates) const initialData = allTestData.slice(0, 10) let loadSubsetCallCount = 0 @@ -2817,11 +2864,56 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return a slice from 0 to limit + // Apply cursor expressions if present (cursor-based pagination) + // For proper cursor-based pagination: + // - whereCurrent should load ALL ties (no limit) + // - whereFrom should load with remaining limit + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + const { limit } = options + try { + // Get ALL rows matching whereCurrent (no limit for ties) + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = + filteredData.filter(whereCurrentFn) + + // Get rows matching whereFrom with limit (for next page data) + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + const limitedFromData = limit + ? fromData.slice(0, limit) + : fromData + + // Combine: current rows + from rows (deduplicated) + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + // Re-sort after combining + filteredData.sort((a, b) => a.a - b.a) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + + // Return data (limit already applied when cursor is present) const { limit } = options - const dataToLoad = limit - ? filteredData.slice(0, limit) - : filteredData + const dataToLoad = + limit && !options.cursor + ? filteredData.slice(0, limit) + : filteredData dataToLoad.forEach((item) => { write({ @@ -2887,8 +2979,8 @@ describe(`OrderBy with duplicate values`, () => { { id: 9, a: 5, keep: true }, { id: 10, a: 5, keep: true }, ]) - // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) - expect(loadSubsetCallCount).toBe(3) + // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) + expect(loadSubsetCallCount).toBe(2) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s @@ -2916,7 +3008,7 @@ describe(`OrderBy with duplicate values`, () => { // We expect no more loadSubset calls because when we loaded the previous page // we asked for all data equal to max value and LIMIT values greater than max value // and the LIMIT values greater than max value already loaded the next page - expect(loadSubsetCallCount).toBe(3) + expect(loadSubsetCallCount).toBe(2) }) }) } @@ -2960,8 +3052,9 @@ describe(`OrderBy with Date values and precision differences`, () => { const initialData = testData.slice(0, 5) - // Track the WHERE clauses sent to loadSubset - const loadSubsetWhereClauses: Array = [] + // Track the cursor expressions sent to loadSubset + // Note: cursor expressions are now passed separately from where (whereFrom/whereCurrent/lastKey) + const loadSubsetCursors: Array = [] const sourceCollection = createCollection( mockSyncCollectionOptions({ @@ -2981,8 +3074,8 @@ describe(`OrderBy with Date values and precision differences`, () => { return { loadSubset: (options) => { - // Capture the WHERE clause for inspection - loadSubsetWhereClauses.push(options.where) + // Capture the cursor for inspection (now contains whereFrom/whereCurrent/lastKey) + loadSubsetCursors.push(options.cursor) return new Promise((resolve) => { setTimeout(() => { @@ -3003,6 +3096,42 @@ describe(`OrderBy with Date values and precision differences`, () => { } } + // Apply cursor expressions if present + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + try { + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = filteredData.filter(whereCurrentFn) + + // Combine and deduplicate + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of fromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + filteredData.sort( + (a, b) => + a.createdAt.getTime() - b.createdAt.getTime() + ) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + const { limit } = options const dataToLoad = limit ? filteredData.slice(0, limit) @@ -3042,21 +3171,22 @@ describe(`OrderBy with Date values and precision differences`, () => { const results = Array.from(collection.values()).sort((a, b) => a.id - b.id) expect(results.map((r) => r.id)).toEqual([1, 2, 3, 4, 5]) - // Clear tracked clauses before moving to next page - loadSubsetWhereClauses.length = 0 + // Clear tracked cursors before moving to next page + loadSubsetCursors.length = 0 // Move to next page - this should trigger the Date precision handling const moveToSecondPage = collection.utils.setWindow({ offset: 5, limit: 5 }) await moveToSecondPage - // Find the WHERE clause that queries for the "equal values" (the minValue query) - // With the fix, this should be: and(gte(createdAt, baseTime), lt(createdAt, baseTime+1ms)) + // Find the cursor that contains the "whereCurrent" expression (the minValue query) + // With the fix, whereCurrent should be: and(gte(createdAt, baseTime), lt(createdAt, baseTime+1ms)) // Without the fix, this would be: eq(createdAt, baseTime) - const equalValuesQuery = loadSubsetWhereClauses.find((clause) => { - if (!clause) return false - // Check if it's an 'and' with 'gte' and 'lt' (the fix) - if (clause.name === `and` && clause.args?.length === 2) { - const [first, second] = clause.args + const cursorWithDateRange = loadSubsetCursors.find((cursor) => { + if (!cursor?.whereCurrent) return false + const whereCurrent = cursor.whereCurrent + // Check if whereCurrent is an 'and' with 'gte' and 'lt' (the fix) + if (whereCurrent.name === `and` && whereCurrent.args?.length === 2) { + const [first, second] = whereCurrent.args return first?.name === `gte` && second?.name === `lt` } return false @@ -3064,7 +3194,8 @@ describe(`OrderBy with Date values and precision differences`, () => { // The fix should produce a range query (and(gte, lt)) for Date values // instead of an exact equality query (eq) - expect(equalValuesQuery).toBeDefined() + expect(cursorWithDateRange).toBeDefined() + const equalValuesQuery = cursorWithDateRange.whereCurrent expect(equalValuesQuery.name).toBe(`and`) expect(equalValuesQuery.args[0].name).toBe(`gte`) expect(equalValuesQuery.args[1].name).toBe(`lt`) diff --git a/packages/db/tests/query/predicate-utils.test.ts b/packages/db/tests/query/predicate-utils.test.ts index 907581fe6..d15096dd1 100644 --- a/packages/db/tests/query/predicate-utils.test.ts +++ b/packages/db/tests/query/predicate-utils.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest' import { isLimitSubset, + isOffsetLimitSubset, isOrderBySubset, isPredicateSubset, isWhereSubset, @@ -647,6 +648,86 @@ describe(`isLimitSubset`, () => { }) }) +describe(`isOffsetLimitSubset`, () => { + it(`should return true when subset range is within superset range (same offset)`, () => { + expect( + isOffsetLimitSubset({ offset: 0, limit: 5 }, { offset: 0, limit: 10 }) + ).toBe(true) + expect( + isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 0, limit: 10 }) + ).toBe(true) + }) + + it(`should return true when subset starts later but is still within superset range`, () => { + // superset loads rows [0, 10), subset loads rows [5, 10) - subset is within superset + expect( + isOffsetLimitSubset({ offset: 5, limit: 5 }, { offset: 0, limit: 10 }) + ).toBe(true) + }) + + it(`should return false when subset extends beyond superset range`, () => { + // superset loads rows [0, 10), subset loads rows [5, 15) - subset extends beyond + expect( + isOffsetLimitSubset({ offset: 5, limit: 10 }, { offset: 0, limit: 10 }) + ).toBe(false) + }) + + it(`should return false when subset is completely outside superset range`, () => { + // superset loads rows [0, 10), subset loads rows [20, 30) - no overlap + expect( + isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0, limit: 10 }) + ).toBe(false) + }) + + it(`should return false when superset starts after subset`, () => { + // superset loads rows [10, 20), subset loads rows [0, 10) - superset starts too late + expect( + isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 10, limit: 10 }) + ).toBe(false) + }) + + it(`should return true when superset is unlimited`, () => { + expect(isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 0 })).toBe( + true + ) + expect(isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0 })).toBe( + true + ) + }) + + it(`should return false when superset is unlimited but starts after subset`, () => { + // superset loads rows [10, ∞), subset loads rows [0, 10) - superset starts too late + expect(isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 10 })).toBe( + false + ) + }) + + it(`should return false when subset is unlimited but superset has a limit`, () => { + expect(isOffsetLimitSubset({ offset: 0 }, { offset: 0, limit: 10 })).toBe( + false + ) + }) + + it(`should return true when both are unlimited and superset starts at or before subset`, () => { + expect(isOffsetLimitSubset({ offset: 10 }, { offset: 0 })).toBe(true) + expect(isOffsetLimitSubset({ offset: 10 }, { offset: 10 })).toBe(true) + }) + + it(`should return false when both are unlimited but superset starts after subset`, () => { + expect(isOffsetLimitSubset({ offset: 0 }, { offset: 10 })).toBe(false) + }) + + it(`should default offset to 0 when undefined`, () => { + expect(isOffsetLimitSubset({ limit: 5 }, { limit: 10 })).toBe(true) + expect(isOffsetLimitSubset({ offset: 0, limit: 5 }, { limit: 10 })).toBe( + true + ) + expect(isOffsetLimitSubset({ limit: 5 }, { offset: 0, limit: 10 })).toBe( + true + ) + }) +}) + describe(`isPredicateSubset`, () => { it(`should check all components for unlimited superset`, () => { // For unlimited supersets, where-subset logic applies @@ -756,6 +837,138 @@ describe(`isPredicateSubset`, () => { } expect(isPredicateSubset(subset, superset)).toBe(false) }) + + describe(`with offset`, () => { + it(`should return true when subset offset+limit is within superset range`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 5, + limit: 5, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // subset loads rows [5, 10), superset loads rows [0, 10) - subset is within + expect(isPredicateSubset(subset, superset)).toBe(true) + }) + + it(`should return false when subset is at different offset outside superset range`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 20, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // subset loads rows [20, 30), superset loads rows [0, 10) - no overlap + expect(isPredicateSubset(subset, superset)).toBe(false) + }) + + it(`should return false when subset extends beyond superset even with same where`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 5, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // subset loads rows [5, 15), superset loads rows [0, 10) - subset extends beyond + expect(isPredicateSubset(subset, superset)).toBe(false) + }) + + it(`should return true for unlimited superset with any subset offset`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 100, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + // No limit - unlimited + } + expect(isPredicateSubset(subset, superset)).toBe(true) + }) + + it(`should return false when superset has offset that starts after subset needs`, () => { + const sameWhere = gt(ref(`age`), val(10)) + const subset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 5, + limit: 10, + } + // subset needs rows [0, 10), superset only has rows [5, 15) + expect(isPredicateSubset(subset, superset)).toBe(false) + }) + + it(`should handle pagination correctly - page 2 not subset of page 1`, () => { + const sameWhere = gt(ref(`age`), val(10)) + // Page 1: offset 0, limit 10 + const page1: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 10, + } + // Page 2: offset 10, limit 10 + const page2: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 10, + limit: 10, + } + // Page 2 is NOT a subset of page 1 (different rows) + expect(isPredicateSubset(page2, page1)).toBe(false) + // Page 1 is NOT a subset of page 2 (different rows) + expect(isPredicateSubset(page1, page2)).toBe(false) + }) + + it(`should return true when superset covers multiple pages`, () => { + const sameWhere = gt(ref(`age`), val(10)) + // Superset: offset 0, limit 30 (covers pages 1-3) + const superset: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 0, + limit: 30, + } + // Page 2: offset 10, limit 10 + const page2: LoadSubsetOptions = { + where: sameWhere, + orderBy: [orderByClause(ref(`age`), `asc`)], + offset: 10, + limit: 10, + } + // Page 2 IS a subset of superset (rows 10-19 within 0-29) + expect(isPredicateSubset(page2, superset)).toBe(true) + }) + }) }) describe(`minusWherePredicates`, () => { diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 73d1e6c05..d1ac7ca76 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -3,10 +3,10 @@ import { isChangeMessage, isControlMessage, isVisibleInSnapshot, -} from '@electric-sql/client' -import { Store } from '@tanstack/store' -import DebugModule from 'debug' -import { DeduplicatedLoadSubset } from '@tanstack/db' +} from "@electric-sql/client" +import { Store } from "@tanstack/store" +import DebugModule from "debug" +import { DeduplicatedLoadSubset, and } from "@tanstack/db" import { ExpectedNumberInAwaitTxIdError, StreamAbortedError, @@ -307,7 +307,12 @@ function hasTxids>( * Creates a deduplicated loadSubset handler for progressive/on-demand modes * Returns null for eager mode, or a DeduplicatedLoadSubset instance for other modes. * Handles fetching snapshots in progressive mode during buffering phase, - * and requesting snapshots in on-demand mode + * and requesting snapshots in on-demand mode. + * + * When cursor expressions are provided (whereFrom/whereCurrent), makes two + * requestSnapshot calls: + * - One for whereFrom (rows > cursor) with limit + * - One for whereCurrent (rows = cursor, for tie-breaking) without limit */ function createLoadSubsetDedupe>({ stream, @@ -382,8 +387,50 @@ function createLoadSubsetDedupe>({ return } else { // On-demand mode: use requestSnapshot - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) + // When cursor is provided, make two calls: + // 1. whereCurrent (all ties, no limit) + // 2. whereFrom (rows > cursor, with limit) + const { cursor, where, orderBy, limit } = opts + + if (cursor) { + // Make parallel requests for cursor-based pagination + const promises: Array> = [] + + // Request 1: All rows matching whereCurrent (ties at boundary, no limit) + // Combine main where with cursor.whereCurrent + const whereCurrentOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent, + orderBy, + // No limit - get all ties + } + const whereCurrentParams = compileSQL(whereCurrentOpts) + promises.push(stream.requestSnapshot(whereCurrentParams)) + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)` + ) + + // Request 2: Rows matching whereFrom (rows > cursor, with limit) + // Combine main where with cursor.whereFrom + const whereFromOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereFrom) : cursor.whereFrom, + orderBy, + limit, + } + const whereFromParams = compileSQL(whereFromOpts) + promises.push(stream.requestSnapshot(whereFromParams)) + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})` + ) + + // Wait for both requests to complete + await Promise.all(promises) + } else { + // No cursor - standard single request + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + } } } diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index ccaef84f3..bcd83d310 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -609,9 +609,8 @@ describe.each([ // Limited queries are only deduplicated when their where clauses are equal. // Both queries have the same where clause (active = true), but the second query // with limit 6 needs more data than the first query with limit 2 provided. - // The internal query system makes additional requests as it processes the data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(6) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call each. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) // Check that first it requested a limit of 2 users (from first query) expect(callArgs(0)).toMatchObject({ @@ -877,9 +876,8 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { ) // For limited queries, only requests with identical where clauses can be deduplicated. - // The internal query system may make additional requests as it processes the data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) }) it(`should pass correct WHERE clause to requestSnapshot when live query has filters`, async () => { @@ -1189,9 +1187,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // For limited queries, only requests with identical where clauses can be deduplicated. - // The internal query system may make additional requests as it processes data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) // Simulate a must-refetch (which triggers truncate and reset) subscriber([{ headers: { control: `must-refetch` } }]) @@ -1201,8 +1198,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // The existing live query re-requests its data after truncate - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(5) + // After must-refetch, the query requests data again (1 initial + 1 after truncate) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) // Create the same live query again after reset // This should NOT be deduped because the reset cleared the deduplication state, @@ -1221,8 +1218,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // Should have more calls - the different query triggered a new request - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(6) + // 1 initial + 1 after must-refetch + 1 for new query = 3 + expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) }) it(`should deduplicate unlimited queries regardless of orderBy`, async () => { diff --git a/packages/query-db-collection/e2e/query-filter.ts b/packages/query-db-collection/e2e/query-filter.ts index 4ead26397..43f4d0e87 100644 --- a/packages/query-db-collection/e2e/query-filter.ts +++ b/packages/query-db-collection/e2e/query-filter.ts @@ -69,6 +69,11 @@ export function serializeLoadSubsetOptions( result.limit = options.limit } + // Include offset for pagination support - different offsets need different query keys + if (options.offset !== undefined) { + result.offset = options.offset + } + return JSON.stringify(Object.keys(result).length === 0 ? null : result) } @@ -158,7 +163,7 @@ function isBasicExpression( } /** - * Apply LoadSubsetOptions to data (filter, sort, limit) + * Apply LoadSubsetOptions to data (filter, sort, limit, offset) */ export function applyPredicates( data: Array, @@ -172,6 +177,7 @@ export function applyPredicates( let filters: Array = [] let sorts: Array = [] let limit: number | undefined = undefined + const offset = options.offset ?? 0 // Check if where clause is simple before trying to parse const hasComplexWhere = options.where && !isSimpleExpression(options.where) @@ -232,6 +238,7 @@ export function applyPredicates( expressionSummary: analysis, hasOrderBy: Boolean(orderBy), limit: rawLimit, + offset, filtersCount: filters.length, sortsCount: sorts.length, initialSize: data.length, @@ -261,14 +268,16 @@ export function applyPredicates( } } - // Apply LIMIT - // Note: offset is NOT applied here - it's handled by the live query windowing layer - // The limit passed here already accounts for offset (e.g., offset(20).limit(10) -> limit: 30) - if (limit !== undefined) { - result = result.slice(0, limit) + // Apply OFFSET and LIMIT + // For pagination: offset skips rows, limit caps the result + if (offset > 0 || limit !== undefined) { + const start = offset + const end = limit !== undefined ? offset + limit : undefined + result = result.slice(start, end) if (DEBUG_SUMMARY) { - console.log(`[query-filter] after limit`, { + console.log(`[query-filter] after offset/limit`, { size: result.length, + offset, limit, }) } diff --git a/packages/query-db-collection/src/serialization.ts b/packages/query-db-collection/src/serialization.ts index 0d353db76..9849c4bd3 100644 --- a/packages/query-db-collection/src/serialization.ts +++ b/packages/query-db-collection/src/serialization.ts @@ -1,7 +1,9 @@ import type { IR, LoadSubsetOptions } from '@tanstack/db' /** - * Serializes LoadSubsetOptions into a stable, hashable format for query keys + * Serializes LoadSubsetOptions into a stable, hashable format for query keys. + * Includes where, orderBy, limit, and offset for pagination support. + * Note: cursor expressions are not serialized as they are backend-specific. * @internal */ export function serializeLoadSubsetOptions( @@ -43,6 +45,11 @@ export function serializeLoadSubsetOptions( result.limit = options.limit } + // Include offset for pagination support + if (options.offset !== undefined) { + result.offset = options.offset + } + return Object.keys(result).length === 0 ? undefined : JSON.stringify(result) } diff --git a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx index cd420ccfc..cd95029de 100644 --- a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx +++ b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx @@ -898,8 +898,44 @@ describe(`useLiveInfiniteQuery`, () => { }) } - // Apply limit if provided - if (opts.limit !== undefined) { + // Apply cursor expressions if present (new cursor-based pagination) + if (opts.cursor) { + const { whereFrom, whereCurrent } = opts.cursor + try { + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filtered.filter(whereFromFn) + + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = filtered.filter(whereCurrentFn) + + // Combine current (ties) with from (next page), deduplicate + const seenIds = new Set() + filtered = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filtered.push(item) + } + } + // Apply limit only to fromData + const limitedFromData = opts.limit + ? fromData.slice(0, opts.limit) + : fromData + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filtered.push(item) + } + } + // Re-sort after combining + filtered.sort((a, b) => b.createdAt - a.createdAt) + } catch { + // Fallback to original filtered if cursor parsing fails + } + } else if (opts.limit !== undefined) { + // Apply limit only if no cursor (cursor handles limit internally) filtered = filtered.slice(0, opts.limit) } From 091f21434469cc3e4ffddc9cec52b449c26c178b Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 5 Dec 2025 17:29:22 +0000 Subject: [PATCH 2/4] update docs --- docs/collections/query-collection.md | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/docs/collections/query-collection.md b/docs/collections/query-collection.md index 67331e126..cf13291a8 100644 --- a/docs/collections/query-collection.md +++ b/docs/collections/query-collection.md @@ -521,7 +521,7 @@ All direct write methods are available on `collection.utils`: ## QueryFn and Predicate Push-Down -When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, and limit) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset. +When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, limit, and offset) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset. ### How LoadSubsetOptions Are Passed @@ -530,9 +530,13 @@ LoadSubsetOptions are passed to your `queryFn` via the query context's `meta` pr ```typescript queryFn: async (ctx) => { // Extract LoadSubsetOptions from the context - const { limit, where, orderBy } = ctx.meta.loadSubsetOptions + const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions // Use these to fetch only the data you need + // - where: filter expression (AST) + // - orderBy: sort expression (AST) + // - limit: maximum number of rows + // - offset: number of rows to skip (for pagination) // ... } ``` @@ -572,7 +576,7 @@ const productsCollection = createCollection( syncMode: 'on-demand', // Enable predicate push-down queryFn: async (ctx) => { - const { limit, where, orderBy } = ctx.meta.loadSubsetOptions + const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions // Parse the expressions into simple format const parsed = parseLoadSubsetOptions({ where, orderBy, limit }) @@ -605,6 +609,11 @@ const productsCollection = createCollection( params.set('limit', String(parsed.limit)) } + // Add offset for pagination + if (offset) { + params.set('offset', String(offset)) + } + const response = await fetch(`/api/products?${params}`) return response.json() }, @@ -629,6 +638,7 @@ const affordableElectronics = createLiveQueryCollection({ // This triggers a queryFn call with: // GET /api/products?category=electronics&price_lt=100&sort=price:asc&limit=10 +// When paginating, offset is included: &offset=20 ``` ### Custom Handlers for Complex APIs @@ -731,10 +741,11 @@ queryFn: async (ctx) => { Convenience function that parses all LoadSubsetOptions at once. Good for simple use cases. ```typescript -const { filters, sorts, limit } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions) +const { filters, sorts, limit, offset } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions) // filters: [{ field: ['category'], operator: 'eq', value: 'electronics' }] // sorts: [{ field: ['price'], direction: 'asc', nulls: 'last' }] // limit: 10 +// offset: 20 (for pagination) ``` #### `parseWhereExpression(expr, options)` From cc8fccbbe745efaae4472e5ab25fc5792bcfa3a0 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:30:21 +0000 Subject: [PATCH 3/4] ci: apply automated fixes --- .changeset/cursor-pagination-loadsubset.md | 6 ++--- packages/db/src/collection/events.ts | 6 ++--- packages/db/src/collection/state.ts | 12 +++++----- packages/db/src/query/predicate-utils.ts | 2 +- packages/db/tests/query/order-by.test.ts | 2 +- .../db/tests/query/predicate-utils.test.ts | 24 +++++++++---------- .../electric-db-collection/src/electric.ts | 12 +++++----- 7 files changed, 32 insertions(+), 32 deletions(-) diff --git a/.changeset/cursor-pagination-loadsubset.md b/.changeset/cursor-pagination-loadsubset.md index 3931ca3dc..cba93c7ab 100644 --- a/.changeset/cursor-pagination-loadsubset.md +++ b/.changeset/cursor-pagination-loadsubset.md @@ -1,7 +1,7 @@ --- -"@tanstack/db": patch -"@tanstack/electric-db-collection": patch -"@tanstack/query-db-collection": patch +'@tanstack/db': patch +'@tanstack/electric-db-collection': patch +'@tanstack/query-db-collection': patch --- Enhanced LoadSubsetOptions with separate cursor expressions and offset for flexible pagination. diff --git a/packages/db/src/collection/events.ts b/packages/db/src/collection/events.ts index 194943dc3..f82a42f2c 100644 --- a/packages/db/src/collection/events.ts +++ b/packages/db/src/collection/events.ts @@ -52,9 +52,9 @@ export interface CollectionTruncateEvent { } export type AllCollectionEvents = { - "status:change": CollectionStatusChangeEvent - "subscribers:change": CollectionSubscribersChangeEvent - "loadingSubset:change": CollectionLoadingSubsetChangeEvent + 'status:change': CollectionStatusChangeEvent + 'subscribers:change': CollectionSubscribersChangeEvent + 'loadingSubset:change': CollectionLoadingSubsetChangeEvent truncate: CollectionTruncateEvent } & { [K in CollectionStatus as `status:${K}`]: CollectionStatusEvent diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index ae4361bdd..b76580c19 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -6,12 +6,12 @@ import type { ChangeMessage, CollectionConfig, OptimisticChangeMessage, -} from "../types" -import type { CollectionImpl } from "./index.js" -import type { CollectionLifecycleManager } from "./lifecycle" -import type { CollectionChangesManager } from "./changes" -import type { CollectionIndexesManager } from "./indexes" -import type { CollectionEventsManager } from "./events" +} from '../types' +import type { CollectionImpl } from './index.js' +import type { CollectionLifecycleManager } from './lifecycle' +import type { CollectionChangesManager } from './changes' +import type { CollectionIndexesManager } from './indexes' +import type { CollectionEventsManager } from './events' interface PendingSyncedTransaction< T extends object = Record, diff --git a/packages/db/src/query/predicate-utils.ts b/packages/db/src/query/predicate-utils.ts index a94e77eb3..96162e868 100644 --- a/packages/db/src/query/predicate-utils.ts +++ b/packages/db/src/query/predicate-utils.ts @@ -810,7 +810,7 @@ export function isLimitSubset( */ export function isOffsetLimitSubset( subset: { offset?: number; limit?: number }, - superset: { offset?: number; limit?: number } + superset: { offset?: number; limit?: number }, ): boolean { const subsetOffset = subset.offset ?? 0 const supersetOffset = superset.offset ?? 0 diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index 4ff1f4c63..e460c6080 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -3125,7 +3125,7 @@ describe(`OrderBy with Date values and precision differences`, () => { } filteredData.sort( (a, b) => - a.createdAt.getTime() - b.createdAt.getTime() + a.createdAt.getTime() - b.createdAt.getTime(), ) } catch (error) { console.log(`Error applying cursor:`, error) diff --git a/packages/db/tests/query/predicate-utils.test.ts b/packages/db/tests/query/predicate-utils.test.ts index d15096dd1..0cb14ec20 100644 --- a/packages/db/tests/query/predicate-utils.test.ts +++ b/packages/db/tests/query/predicate-utils.test.ts @@ -651,60 +651,60 @@ describe(`isLimitSubset`, () => { describe(`isOffsetLimitSubset`, () => { it(`should return true when subset range is within superset range (same offset)`, () => { expect( - isOffsetLimitSubset({ offset: 0, limit: 5 }, { offset: 0, limit: 10 }) + isOffsetLimitSubset({ offset: 0, limit: 5 }, { offset: 0, limit: 10 }), ).toBe(true) expect( - isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 0, limit: 10 }) + isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 0, limit: 10 }), ).toBe(true) }) it(`should return true when subset starts later but is still within superset range`, () => { // superset loads rows [0, 10), subset loads rows [5, 10) - subset is within superset expect( - isOffsetLimitSubset({ offset: 5, limit: 5 }, { offset: 0, limit: 10 }) + isOffsetLimitSubset({ offset: 5, limit: 5 }, { offset: 0, limit: 10 }), ).toBe(true) }) it(`should return false when subset extends beyond superset range`, () => { // superset loads rows [0, 10), subset loads rows [5, 15) - subset extends beyond expect( - isOffsetLimitSubset({ offset: 5, limit: 10 }, { offset: 0, limit: 10 }) + isOffsetLimitSubset({ offset: 5, limit: 10 }, { offset: 0, limit: 10 }), ).toBe(false) }) it(`should return false when subset is completely outside superset range`, () => { // superset loads rows [0, 10), subset loads rows [20, 30) - no overlap expect( - isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0, limit: 10 }) + isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0, limit: 10 }), ).toBe(false) }) it(`should return false when superset starts after subset`, () => { // superset loads rows [10, 20), subset loads rows [0, 10) - superset starts too late expect( - isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 10, limit: 10 }) + isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 10, limit: 10 }), ).toBe(false) }) it(`should return true when superset is unlimited`, () => { expect(isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 0 })).toBe( - true + true, ) expect(isOffsetLimitSubset({ offset: 20, limit: 10 }, { offset: 0 })).toBe( - true + true, ) }) it(`should return false when superset is unlimited but starts after subset`, () => { // superset loads rows [10, ∞), subset loads rows [0, 10) - superset starts too late expect(isOffsetLimitSubset({ offset: 0, limit: 10 }, { offset: 10 })).toBe( - false + false, ) }) it(`should return false when subset is unlimited but superset has a limit`, () => { expect(isOffsetLimitSubset({ offset: 0 }, { offset: 0, limit: 10 })).toBe( - false + false, ) }) @@ -720,10 +720,10 @@ describe(`isOffsetLimitSubset`, () => { it(`should default offset to 0 when undefined`, () => { expect(isOffsetLimitSubset({ limit: 5 }, { limit: 10 })).toBe(true) expect(isOffsetLimitSubset({ offset: 0, limit: 5 }, { limit: 10 })).toBe( - true + true, ) expect(isOffsetLimitSubset({ limit: 5 }, { offset: 0, limit: 10 })).toBe( - true + true, ) }) }) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index d1ac7ca76..d8a0e2bcb 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -3,10 +3,10 @@ import { isChangeMessage, isControlMessage, isVisibleInSnapshot, -} from "@electric-sql/client" -import { Store } from "@tanstack/store" -import DebugModule from "debug" -import { DeduplicatedLoadSubset, and } from "@tanstack/db" +} from '@electric-sql/client' +import { Store } from '@tanstack/store' +import DebugModule from 'debug' +import { DeduplicatedLoadSubset, and } from '@tanstack/db' import { ExpectedNumberInAwaitTxIdError, StreamAbortedError, @@ -407,7 +407,7 @@ function createLoadSubsetDedupe>({ promises.push(stream.requestSnapshot(whereCurrentParams)) debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)` + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)`, ) // Request 2: Rows matching whereFrom (rows > cursor, with limit) @@ -421,7 +421,7 @@ function createLoadSubsetDedupe>({ promises.push(stream.requestSnapshot(whereFromParams)) debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})` + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})`, ) // Wait for both requests to complete From 10ab3117e75c602cbf874c7a1f5c4413dd5a67ae Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 10 Dec 2025 10:11:07 +0000 Subject: [PATCH 4/4] fixups --- .../src/suites/pagination.suite.ts | 10 ------- packages/db/tests/query/order-by.test.ts | 26 ++++++++++++++++--- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/packages/db-collection-e2e/src/suites/pagination.suite.ts b/packages/db-collection-e2e/src/suites/pagination.suite.ts index 642dc13d6..7ccf2c60b 100644 --- a/packages/db-collection-e2e/src/suites/pagination.suite.ts +++ b/packages/db-collection-e2e/src/suites/pagination.suite.ts @@ -415,8 +415,6 @@ export function createPaginationTestSuite( .limit(10), ) - console.log(`[QUERY DEBUG] query:`, query) - await query.preload() await waitForQueryData(query, { minSize: 10 }) @@ -435,15 +433,11 @@ export function createPaginationTestSuite( } } - console.log(`[QUERY DEBUG] setting window`) - // Move to second page using setWindow // IMPORTANT: setWindow returns a Promise when loading is required, // or `true` if data is already available. We verify loading occurs. const setWindowResult = query.utils.setWindow({ offset: 10, limit: 10 }) - console.log(`[QUERY DEBUG] setWindowResult:`, setWindowResult) - // In on-demand mode, moving to offset 10 should trigger loading // since only the first 10 records were initially loaded if (setWindowResult !== true) { @@ -452,14 +446,10 @@ export function createPaginationTestSuite( } await waitForQueryData(query, { minSize: 10 }) - console.log(`[QUERY DEBUG] waited for data`, setWindowResult) - // Get second page const secondPage = Array.from(query.state.values()) expect(secondPage).toHaveLength(10) - console.log(`[QUERY DEBUG] second page:`, secondPage) - // Verify second page ordering for (let i = 1; i < secondPage.length; i++) { const prev = secondPage[i - 1]! diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index e460c6080..d5025c1e9 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -2591,6 +2591,7 @@ describe(`OrderBy with duplicate values`, () => { // Start with only the first 5 items in the local collection const initialData = allTestData.slice(0, 5) let loadSubsetCallCount = 0 + const loadSubsetCursors: Array = [] const duplicateCollection = createCollection( mockSyncCollectionOptions({ @@ -2615,6 +2616,7 @@ describe(`OrderBy with duplicate values`, () => { return { loadSubset: (options) => { loadSubsetCallCount++ + loadSubsetCursors.push(options.cursor) // Simulate async loading from remote source return new Promise((resolve) => { @@ -2685,7 +2687,8 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return data (limit already applied when cursor is present) + // Apply limit for initial page load (no cursor). + // When cursor is present, limit was already applied in the cursor block above. const { limit } = options const dataToLoad = limit && !options.cursor @@ -2738,8 +2741,10 @@ describe(`OrderBy with duplicate values`, () => { { id: 5, a: 5, keep: true }, ]) expect(loadSubsetCallCount).toBe(1) + // First loadSubset call (initial page at offset 0) has no cursor + expect(loadSubsetCursors[0]).toBeUndefined() - // Now move to next page (offset 5, limit 5) - this should trigger loadSubset + // Now move to next page (offset 5, limit 5) - this should trigger loadSubset with a cursor const moveToSecondPage = collection.utils.setWindow({ offset: 5, limit: 5, @@ -2758,6 +2763,10 @@ describe(`OrderBy with duplicate values`, () => { ]) // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) expect(loadSubsetCallCount).toBe(2) + // Second loadSubset call (pagination) has a cursor with whereFrom and whereCurrent + expect(loadSubsetCursors[1]).toBeDefined() + expect(loadSubsetCursors[1]).toHaveProperty(`whereFrom`) + expect(loadSubsetCursors[1]).toHaveProperty(`whereCurrent`) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s @@ -2814,6 +2823,7 @@ describe(`OrderBy with duplicate values`, () => { // Start with the first 10 items in the local collection (includes all duplicates) const initialData = allTestData.slice(0, 10) let loadSubsetCallCount = 0 + const loadSubsetCursors: Array = [] const duplicateCollection = createCollection( mockSyncCollectionOptions({ @@ -2838,6 +2848,7 @@ describe(`OrderBy with duplicate values`, () => { return { loadSubset: (options) => { loadSubsetCallCount++ + loadSubsetCursors.push(options.cursor) // Simulate async loading from remote source return new Promise((resolve) => { @@ -2908,7 +2919,8 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return data (limit already applied when cursor is present) + // Apply limit for initial page load (no cursor). + // When cursor is present, limit was already applied in the cursor block above. const { limit } = options const dataToLoad = limit && !options.cursor @@ -2961,8 +2973,10 @@ describe(`OrderBy with duplicate values`, () => { { id: 5, a: 5, keep: true }, ]) expect(loadSubsetCallCount).toBe(1) + // First loadSubset call (initial page at offset 0) has no cursor + expect(loadSubsetCursors[0]).toBeUndefined() - // Now move to next page (offset 5, limit 5) - this should trigger loadSubset + // Now move to next page (offset 5, limit 5) - this should trigger loadSubset with a cursor const moveToSecondPage = collection.utils.setWindow({ offset: 5, limit: 5, @@ -2981,6 +2995,10 @@ describe(`OrderBy with duplicate values`, () => { ]) // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) expect(loadSubsetCallCount).toBe(2) + // Second loadSubset call (pagination) has a cursor with whereFrom and whereCurrent + expect(loadSubsetCursors[1]).toBeDefined() + expect(loadSubsetCursors[1]).toHaveProperty(`whereFrom`) + expect(loadSubsetCursors[1]).toHaveProperty(`whereCurrent`) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s