diff --git a/.changeset/clever-parks-report.md b/.changeset/clever-parks-report.md new file mode 100644 index 000000000..63a2ec563 --- /dev/null +++ b/.changeset/clever-parks-report.md @@ -0,0 +1,20 @@ +--- +"@tanstack/db-ivm": patch +"@tanstack/db": patch +--- + +Add `utils.setWindow()` method to live query collections to dynamically change limit and offset on ordered queries. + +You can now change the pagination window of an ordered live query without recreating the collection: + +```ts +const users = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.name, "asc") + .limit(10) + .offset(0) +) + +users.utils.setWindow({ offset: 10, limit: 10 }) +``` diff --git a/packages/db-ivm/src/operators/orderBy.ts b/packages/db-ivm/src/operators/orderBy.ts index 8f7a774da..ce399b3bc 100644 --- a/packages/db-ivm/src/operators/orderBy.ts +++ b/packages/db-ivm/src/operators/orderBy.ts @@ -13,6 +13,9 @@ export interface OrderByOptions { type OrderByWithFractionalIndexOptions = OrderByOptions & { setSizeCallback?: (getSize: () => number) => void + setWindowFn?: ( + windowFn: (options: { offset?: number; limit?: number }) => void + ) => void } /** @@ -147,6 +150,7 @@ export function orderByWithFractionalIndexBase< const limit = options?.limit ?? Infinity const offset = options?.offset ?? 0 const setSizeCallback = options?.setSizeCallback + const setWindowFn = options?.setWindowFn const comparator = options?.comparator ?? ((a, b) => { @@ -167,6 +171,7 @@ export function orderByWithFractionalIndexBase< limit, offset, setSizeCallback, + setWindowFn, } ), consolidate() diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 81e72b54a..3b75521e3 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -2,7 +2,12 @@ import { generateKeyBetween } from "fractional-indexing" import { DifferenceStreamWriter, UnaryOperator } from "../graph.js" import { StreamBuilder } from "../d2.js" import { MultiSet } from "../multiset.js" -import { binarySearch, globalObjectIdGenerator } from "../utils.js" +import { + binarySearch, + diffHalfOpen, + globalObjectIdGenerator, +} from "../utils.js" +import type { HRange } from "../utils.js" import type { DifferenceStreamReader } from "../graph.js" import type { IStreamBuilder, PipedOperator } from "../types.js" @@ -10,6 +15,9 @@ export interface TopKWithFractionalIndexOptions { limit?: number offset?: number setSizeCallback?: (getSize: () => number) => void + setWindowFn?: ( + windowFn: (options: { offset?: number; limit?: number }) => void + ) => void } export type TopKChanges = { @@ -19,6 +27,15 @@ export type TopKChanges = { moveOut: IndexedValue | null } +export type TopKMoveChanges = { + /** Flag that marks whether there were any changes to the topK */ + changes: boolean + /** Indicates which elements move into the topK (if any) */ + moveIns: Array> + /** Indicates which elements move out of the topK (if any) */ + moveOuts: Array> +} + /** * A topK data structure that supports insertions and deletions * and returns changes to the topK. @@ -58,6 +75,49 @@ class TopKArray implements TopK { return Math.max(0, Math.min(limit, available)) } + /** + * Moves the topK window + */ + move({ + offset, + limit, + }: { + offset?: number + limit?: number + }): TopKMoveChanges { + const oldOffset = this.#topKStart + const oldLimit = this.#topKEnd - this.#topKStart + const oldRange: HRange = [this.#topKStart, this.#topKEnd] + + this.#topKStart = offset ?? oldOffset + this.#topKEnd = this.#topKStart + (limit ?? oldLimit) + + const newRange: HRange = [this.#topKStart, this.#topKEnd] + const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange) + + const moveIns: Array> = [] + onlyInB.forEach((index) => { + const value = this.#sortedValues[index] + if (value) { + moveIns.push(value) + } + }) + + const moveOuts: Array> = [] + onlyInA.forEach((index) => { + const value = this.#sortedValues[index] + if (value) { + moveOuts.push(value) + } + }) + + // It could be that there are changes (i.e. moveIns or moveOuts) + // but that the collection is lazy so we don't have the data yet that needs to move in/out + // so `moveIns` and `moveOuts` will be empty but `changes` will be true + // this will tell the caller that it needs to run the graph to load more data + return { moveIns, moveOuts, changes: onlyInA.length + onlyInB.length > 0 } + } + insert(value: V): TopKChanges { const result: TopKChanges = { moveIn: null, moveOut: null } @@ -178,8 +238,6 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< */ #topK: TopK> - #limit: number - constructor( id: number, inputA: DifferenceStreamReader<[K, T]>, @@ -188,7 +246,7 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< options: TopKWithFractionalIndexOptions ) { super(id, inputA, output) - this.#limit = options.limit ?? Infinity + const limit = options.limit ?? Infinity const offset = options.offset ?? 0 const compareTaggedValues = ( a: TaggedValue, @@ -204,8 +262,9 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< const tieBreakerB = getTag(b) return tieBreakerA - tieBreakerB } - this.#topK = this.createTopK(offset, this.#limit, compareTaggedValues) + this.#topK = this.createTopK(offset, limit, compareTaggedValues) options.setSizeCallback?.(() => this.#topK.size) + options.setWindowFn?.(this.moveTopK.bind(this)) } protected createTopK( @@ -216,6 +275,32 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< return new TopKArray(offset, limit, comparator) } + /** + * Moves the topK window based on the provided offset and limit. + * Any changes to the topK are sent to the output. + */ + moveTopK({ offset, limit }: { offset?: number; limit?: number }) { + if (!(this.#topK instanceof TopKArray)) { + throw new Error( + `Cannot move B+-tree implementation of TopK with fractional index` + ) + } + + const result: Array<[[K, IndexedValue], number]> = [] + + const diff = this.#topK.move({ offset, limit }) + + diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) + diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) + + if (diff.changes) { + // There are changes to the topK + // it could be that moveIns and moveOuts are empty + // because the collection is lazy, so we will run the graph again to load the data + this.output.sendData(new MultiSet(result)) + } + } + run(): void { const result: Array<[[K, IndexedValue], number]> = [] for (const message of this.inputMessages()) { @@ -258,23 +343,36 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< // so it doesn't affect the topK } - if (res.moveIn) { - const index = getIndex(res.moveIn) - const taggedValue = getValue(res.moveIn) + this.handleMoveIn(res.moveIn, result) + this.handleMoveOut(res.moveOut, result) + + return + } + + private handleMoveIn( + moveIn: IndexedValue> | null, + result: Array<[[K, IndexedValue], number]> + ) { + if (moveIn) { + const index = getIndex(moveIn) + const taggedValue = getValue(moveIn) const k = getKey(taggedValue) const val = getVal(taggedValue) result.push([[k, [val, index]], 1]) } + } - if (res.moveOut) { - const index = getIndex(res.moveOut) - const taggedValue = getValue(res.moveOut) + private handleMoveOut( + moveOut: IndexedValue> | null, + result: Array<[[K, IndexedValue], number]> + ) { + if (moveOut) { + const index = getIndex(moveOut) + const taggedValue = getValue(moveOut) const k = getKey(taggedValue) const val = getVal(taggedValue) result.push([[k, [val, index]], -1]) } - - return } private getMultiplicity(key: K): number { diff --git a/packages/db-ivm/src/utils.ts b/packages/db-ivm/src/utils.ts index 902222584..8ba01bb52 100644 --- a/packages/db-ivm/src/utils.ts +++ b/packages/db-ivm/src/utils.ts @@ -144,3 +144,36 @@ export function* mapIterable( yield fn(t) } } + +export type HRange = [number, number] // half-open [start, end[ i.e. end is exclusive + +/** + * Computes the difference between two half-open ranges. + * @param a - The first half-open range + * @param b - The second half-open range + * @returns The difference between the two ranges + */ +export function diffHalfOpen(a: HRange, b: HRange) { + const [a1, a2] = a + const [b1, b2] = b + + // A \ B can be up to two segments (left and right of the overlap) + const onlyInA: Array = [ + ...range(a1, Math.min(a2, b1)), // left side of A outside B + ...range(Math.max(a1, b2), a2), // right side of A outside B + ] + + // B \ A similarly + const onlyInB: Array = [ + ...range(b1, Math.min(b2, a1)), + ...range(Math.max(b1, a2), b2), + ] + + return { onlyInA, onlyInB } +} + +function range(start: number, end: number): Array { + const out: Array = [] + for (let i = start; i < end; i++) out.push(i) + return out +} diff --git a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts index 169992ed1..5cf574757 100644 --- a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts @@ -7,18 +7,9 @@ import { } from "../../src/operators/index.js" import { orderByWithFractionalIndexBTree } from "../../src/operators/orderByBTree.js" import { loadBTree } from "../../src/operators/topKWithFractionalIndexBTree.js" -import { MessageTracker } from "../test-utils.js" +import { MessageTracker, compareFractionalIndex } from "../test-utils.js" import type { KeyValue } from "../../src/types.js" -const compareFractionalIndex = ( - r1: [string, [{ id: number; value: string }, string]], - r2: [string, [{ id: number; value: string }, string]] -) => { - const [_key1, [_value1, index1]] = r1 - const [_key2, [_value2, index2]] = r2 - return index1 < index2 ? -1 : index1 > index2 ? 1 : 0 -} - const stripFractionalIndex = ([[key, [value, _index]], multiplicity]: any) => [ key, value, @@ -488,6 +479,422 @@ describe(`Operators`, () => { ]) }) }) + + describe(`OrderByWithFractionalIndex operator with array`, () => { + test(`should support moving orderBy window past current window using setWindowFn callback`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 3, + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + [[`key6`, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 3 elements (a, b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`, `c`]) + + // Verify windowFn was set + expect(windowFn).toBeDefined() + + // Move the window to show elements d, e, f (offset: 3, limit: 3) + windowFn!({ offset: 3, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show d, e, f + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`d`, `e`, `f`]) + }) + + test(`should support moving orderBy window before current window using setWindowFn callback`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 3, + offset: 3, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + [[`key6`, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have elements d, e, f + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`d`, `e`, `f`]) + + // Verify windowFn was set + expect(windowFn).toBeDefined() + + // Move the window to show elements a, b, c (offset: 0, limit: 3) + windowFn!({ offset: 0, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show a, b, c + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`a`, `b`, `c`]) + }) + + test(`should support moving offset while keeping limit constant`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // Move offset to 1, keeping limit at 2 (should show b, c) + windowFn!({ offset: 1, limit: 2 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`]) + + // Move offset to 2, keeping limit at 2 (should show c, d) + windowFn!({ offset: 2, limit: 2 }) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`c`, `d`]) + + // Move offset back to 0, keeping limit at 2 (should show a, b) + windowFn!({ offset: 0, limit: 2 }) + graph.run() + + const moveResult3 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues3 = moveResult3.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([`a`, `b`]) + }) + + test(`should support moving limit while keeping offset constant`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 1, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + [[`key6`, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have 2 elements starting from offset 1 (b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`b`, `c`]) + + // Increase limit to 3, keeping offset at 1 (should show b, c, d) + windowFn!({ offset: 1, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`, `d`]) + + // Decrease limit to 1, keeping offset at 1 (should show just b) + windowFn!({ offset: 1, limit: 1 }) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`b`]) + }) + + test(`should handle edge cases when moving beyond available data`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - only 3 elements: a, b, c + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // Move to offset 2, limit 2 (should show only c, since we only have 3 total elements) + windowFn!({ offset: 2, limit: 2 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`]) // Only 1 element available at offset 2 + + // Move to offset 5, limit 2 (should show no elements, beyond available data) + windowFn!({ offset: 5, limit: 2 }) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([]) // No elements available at offset 5 + + // Move to a negative offset and limit (should show no elements) + windowFn!({ offset: -5, limit: 2 }) + graph.run() + + const moveResult3 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues3 = moveResult3.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([]) + + // Move back to a valid window + windowFn!({ offset: 0, limit: 2 }) + graph.run() + + const moveResult4 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues4 = moveResult4.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues4).toEqual([`a`, `b`]) + }) + }) }) /** diff --git a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts index e7c86c9c0..5739cfb36 100644 --- a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts @@ -7,7 +7,11 @@ import { topKWithFractionalIndexBTree, } from "../../src/operators/topKWithFractionalIndexBTree.js" import { output } from "../../src/operators/index.js" -import { MessageTracker, assertOnlyKeysAffected } from "../test-utils.js" +import { + MessageTracker, + assertOnlyKeysAffected, + compareFractionalIndex, +} from "../test-utils.js" // Helper function to check if indices are in lexicographic order function checkLexicographicOrder(results: Array) { @@ -722,3 +726,399 @@ describe(`Operators`, () => { }) }) }) + +describe(`Operators`, () => { + describe(`TopKWithFractionalIndex operator with array`, () => { + it(`should support moving topK window past current window using setWindowFn callback`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 3, + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 3 elements (a, b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`, `c`]) + + // Verify windowFn was set + expect(windowFn).toBeDefined() + + const numberOfMessages = tracker.getResult().messageCount + + // Move the window to show elements d, e, f (offset: 3, limit: 3) + windowFn!({ offset: 3, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show d, e, f + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`d`, `e`, `f`]) + + // Check that only the affected keys produce messages + const affectedKeys = moveResult.messages.map(([key, _]) => key[0]) + expect(affectedKeys.length).toBe(numberOfMessages + 6) + expect(affectedKeys).toEqual(expect.arrayContaining([1, 2, 3, 4, 5, 6])) + }) + + it(`should support moving topK window before current window using setWindowFn callback`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 3, + offset: 3, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 3 elements (a, b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`d`, `e`, `f`]) + + // Verify windowFn was set + expect(windowFn).toBeDefined() + + const numberOfMessages = tracker.getResult().messageCount + + // Move the window to show elements d, e, f (offset: 3, limit: 3) + windowFn!({ offset: 0, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show d, e, f + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`a`, `b`, `c`]) + + // Check that only the affected keys produce messages + const affectedKeys = moveResult.messages.map(([key, _]) => key[0]) + expect(affectedKeys.length).toBe(numberOfMessages + 6) + expect(affectedKeys).toEqual(expect.arrayContaining([1, 2, 3, 4, 5, 6])) + }) + + it(`should support moving offset while keeping limit constant`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // tracker.reset() + + // Move offset to 1, keeping limit at 2 (should show b, c) + windowFn!({ offset: 1, limit: 2 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`]) + + // tracker.reset() + + // Move offset to 2, keeping limit at 2 (should show c, d) + windowFn!({ offset: 2, limit: 2 }) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`c`, `d`]) + + // Move offset back to 0, keeping limit at 2 (should show a, b) + windowFn!({ offset: 0, limit: 2 }) + graph.run() + + const moveResult3 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues3 = moveResult3.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([`a`, `b`]) + }) + + it(`should support moving limit while keeping offset constant`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 1, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have 2 elements starting from offset 1 (b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`b`, `c`]) + + // Increase limit to 3, keeping offset at 1 (should show b, c, d) + windowFn!({ offset: 1, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`, `d`]) + + // Decrease limit to 1, keeping offset at 1 (should show just b) + windowFn!({ offset: 1, limit: 1 }) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`b`]) + }) + + it(`should handle edge cases when moving beyond available data`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - only 3 elements: a, b, c + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // Move to offset 2, limit 2 (should show only c, since we only have 3 total elements) + windowFn!({ offset: 2, limit: 2 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`]) // Only 1 element available at offset 2 + + // Move to offset 5, limit 2 (should show no elements, beyond available data) + windowFn!({ offset: 5, limit: 2 }) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([]) // No elements available at offset 5 + + // Move to a negative offset and limit (should show no elements) + windowFn!({ offset: -5, limit: 2 }) + graph.run() + + const moveResult3 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues3 = moveResult3.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([]) + + // Move back to a valid window + windowFn!({ offset: 0, limit: 2 }) + graph.run() + + const moveResult4 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues4 = moveResult4.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues4).toEqual([`a`, `b`]) + }) + }) +}) diff --git a/packages/db-ivm/tests/test-utils.ts b/packages/db-ivm/tests/test-utils.ts index 49a8684d7..a7ee0f0f5 100644 --- a/packages/db-ivm/tests/test-utils.ts +++ b/packages/db-ivm/tests/test-utils.ts @@ -299,3 +299,12 @@ export function assertOnlyKeysAffected( ) } } + +export const compareFractionalIndex = ( + r1: [unknown, [unknown, string]], + r2: [unknown, [unknown, string]] +) => { + const [_key1, [_value1, index1]] = r1 + const [_key2, [_value2, index2]] = r2 + return index1 < index2 ? -1 : index1 > index2 ? 1 : 0 +} diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index 965d02592..e638ec62c 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -48,7 +48,7 @@ import type { IndexProxy } from "../indexes/lazy-index.js" export interface Collection< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, TSchema extends StandardSchemaV1 = StandardSchemaV1, TInsertInput extends object = T, > extends CollectionImpl { @@ -131,7 +131,7 @@ export interface Collection< export function createCollection< T extends StandardSchemaV1, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig, TKey, T> & { schema: T @@ -144,7 +144,7 @@ export function createCollection< export function createCollection< T extends StandardSchemaV1, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig, TKey, T> & { schema: T @@ -158,7 +158,7 @@ export function createCollection< export function createCollection< T extends object, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig & { schema?: never // prohibit schema if an explicit type is provided @@ -171,7 +171,7 @@ export function createCollection< export function createCollection< T extends object, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig & { schema?: never // prohibit schema if an explicit type is provided diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 209e7b7a6..3858df178 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -629,3 +629,15 @@ export class MissingAliasInputsError extends QueryCompilationError { ) } } + +/** + * Error thrown when setWindow is called on a collection without an ORDER BY clause. + */ +export class SetWindowRequiresOrderByError extends QueryCompilationError { + constructor() { + super( + `setWindow() can only be called on collections with an ORDER BY clause. ` + + `Add .orderBy() to your query to enable window movement.` + ) + } +} diff --git a/packages/db/src/local-only.ts b/packages/db/src/local-only.ts index 756dab21a..11f19a43c 100644 --- a/packages/db/src/local-only.ts +++ b/packages/db/src/local-only.ts @@ -1,15 +1,12 @@ import type { BaseCollectionConfig, CollectionConfig, - DeleteMutationFn, DeleteMutationFnParams, InferSchemaOutput, - InsertMutationFn, InsertMutationFnParams, OperationType, PendingMutation, SyncConfig, - UpdateMutationFn, UpdateMutationFnParams, UtilsRecord, } from "./types" @@ -67,13 +64,7 @@ type LocalOnlyCollectionOptionsResult< T extends object, TKey extends string | number, TSchema extends StandardSchemaV1 | never = never, -> = Omit< - CollectionConfig, - `onInsert` | `onUpdate` | `onDelete` -> & { - onInsert?: InsertMutationFn - onUpdate?: UpdateMutationFn - onDelete?: DeleteMutationFn +> = CollectionConfig & { utils: LocalOnlyCollectionUtils } @@ -191,7 +182,7 @@ export function localOnlyCollectionOptions< const { initialData, onInsert, onUpdate, onDelete, ...restConfig } = config // Create the sync configuration with transaction confirmation capability - const syncResult = createLocalOnlySync(initialData) + const syncResult = createLocalOnlySync(initialData) /** * Create wrapper handlers that call user handlers first, then confirm transactions @@ -279,9 +270,11 @@ export function localOnlyCollectionOptions< onDelete: wrappedOnDelete, utils: { acceptMutations, - } as LocalOnlyCollectionUtils, + }, startSync: true, gcTime: 0, + } as LocalOnlyCollectionOptionsResult & { + schema?: StandardSchemaV1 } } diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index d143cf3bb..1cd033453 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -28,7 +28,9 @@ import type { NamespacedAndKeyedStream, ResultStream, } from "../../types.js" -import type { QueryCache, QueryMapping } from "./types.js" +import type { QueryCache, QueryMapping, WindowOptions } from "./types.js" + +export type { WindowOptions } from "./types.js" /** * Result of query compilation including both the pipeline and source-specific WHERE clauses @@ -87,6 +89,7 @@ export function compileQuery( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap() ): CompilationResult { @@ -134,6 +137,7 @@ export function compileQuery( callbacks, lazySources, optimizableOrderByCollections, + setWindowFn, cache, queryMapping, aliasToCollectionId, @@ -169,6 +173,7 @@ export function compileQuery( callbacks, lazySources, optimizableOrderByCollections, + setWindowFn, rawQuery, compileQuery, aliasToCollectionId, @@ -311,6 +316,7 @@ export function compileQuery( query.select || {}, collections[mainCollectionId]!, optimizableOrderByCollections, + setWindowFn, query.limit, query.offset ) @@ -381,6 +387,7 @@ function processFrom( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache, queryMapping: QueryMapping, aliasToCollectionId: Record, @@ -412,6 +419,7 @@ function processFrom( callbacks, lazySources, optimizableOrderByCollections, + setWindowFn, cache, queryMapping ) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index dfa826fa5..505f90bf3 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -31,7 +31,7 @@ import type { NamespacedAndKeyedStream, NamespacedRow, } from "../../types.js" -import type { QueryCache, QueryMapping } from "./types.js" +import type { QueryCache, QueryMapping, WindowOptions } from "./types.js" import type { CollectionSubscription } from "../../collection/subscription.js" /** Function type for loading specific keys into a lazy collection */ @@ -61,6 +61,7 @@ export function processJoins( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, aliasToCollectionId: Record, @@ -83,6 +84,7 @@ export function processJoins( callbacks, lazySources, optimizableOrderByCollections, + setWindowFn, rawQuery, onCompileSubquery, aliasToCollectionId, @@ -111,6 +113,7 @@ function processJoin( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, aliasToCollectionId: Record, @@ -131,6 +134,7 @@ function processJoin( callbacks, lazySources, optimizableOrderByCollections, + setWindowFn, cache, queryMapping, onCompileSubquery, @@ -421,6 +425,7 @@ function processJoinSource( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache, queryMapping: QueryMapping, onCompileSubquery: CompileQueryFn, @@ -453,6 +458,7 @@ function processJoinSource( callbacks, lazySources, optimizableOrderByCollections, + setWindowFn, cache, queryMapping ) diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index affbb23fc..679dceeda 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -5,6 +5,7 @@ import { ensureIndexForField } from "../../indexes/auto-index.js" import { findIndexForField } from "../../utils/index-optimization.js" import { compileExpression } from "./evaluators.js" import { replaceAggregatesByRefs } from "./group-by.js" +import type { WindowOptions } from "./types.js" import type { CompiledSingleRowExpression } from "./evaluators.js" import type { OrderBy, OrderByClause, QueryIR, Select } from "../ir.js" import type { NamespacedAndKeyedStream, NamespacedRow } from "../../types.js" @@ -38,6 +39,7 @@ export function processOrderBy( selectClause: Select, collection: Collection, optimizableOrderByCollections: Record, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, limit?: number, offset?: number ): IStreamBuilder> { @@ -107,6 +109,8 @@ export function processOrderBy( let setSizeCallback: ((getSize: () => number) => void) | undefined + let orderByOptimizationInfo: OrderByOptimizationInfo | undefined + // Optimize the orderBy operator to lazily load elements // by using the range index of the collection. // Only for orderBy clause on a single column for now (no composite ordering) @@ -161,7 +165,7 @@ export function processOrderBy( ? String(orderByExpression.path[0]) : rawQuery.from.alias - const orderByOptimizationInfo = { + orderByOptimizationInfo = { alias: orderByAlias, offset: offset ?? 0, limit, @@ -179,7 +183,7 @@ export function processOrderBy( ...optimizableOrderByCollections[followRefCollection.id]!, dataNeeded: () => { const size = getSize() - return Math.max(0, limit - size) + return Math.max(0, orderByOptimizationInfo!.limit - size) }, } } @@ -194,6 +198,23 @@ export function processOrderBy( offset, comparator: compare, setSizeCallback, + setWindowFn: ( + windowFn: (options: { offset?: number; limit?: number }) => void + ) => { + setWindowFn( + // We wrap the move function such that we update the orderByOptimizationInfo + // because that is used by the `dataNeeded` callback to determine if we need to load more data + (options) => { + windowFn(options) + if (orderByOptimizationInfo) { + orderByOptimizationInfo.offset = + options.offset ?? orderByOptimizationInfo.offset + orderByOptimizationInfo.limit = + options.limit ?? orderByOptimizationInfo.limit + } + } + ) + }, }) // orderByWithFractionalIndex returns [key, [value, index]] - we keep this format ) diff --git a/packages/db/src/query/compiler/types.ts b/packages/db/src/query/compiler/types.ts index 8e6080e72..f10f953c9 100644 --- a/packages/db/src/query/compiler/types.ts +++ b/packages/db/src/query/compiler/types.ts @@ -10,3 +10,8 @@ export type QueryCache = WeakMap * Mapping from optimized queries back to their original queries for caching */ export type QueryMapping = WeakMap + +export type WindowOptions = { + offset?: number + limit?: number +} diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index dac5ee8c1..5e4355a48 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -20,16 +20,20 @@ import type { Context, GetResult } from "./builder/types.js" type CollectionConfigForContext< TContext extends Context, TResult extends object, + TUtils extends UtilsRecord = {}, > = TContext extends SingleResult - ? CollectionConfigSingleRowOption & SingleResult - : CollectionConfigSingleRowOption & NonSingleResult + ? CollectionConfigSingleRowOption & + SingleResult + : CollectionConfigSingleRowOption & + NonSingleResult type CollectionForContext< TContext extends Context, TResult extends object, + TUtils extends UtilsRecord = {}, > = TContext extends SingleResult - ? Collection & SingleResult - : Collection & NonSingleResult + ? Collection & SingleResult + : Collection & NonSingleResult /** * Creates live query collection options for use with createCollection diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 7bf699268..a7952e998 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,11 +1,15 @@ import { D2, output } from "@tanstack/db-ivm" import { compileQuery } from "../compiler/index.js" import { buildQuery, getQueryIR } from "../builder/index.js" -import { MissingAliasInputsError } from "../../errors.js" +import { + MissingAliasInputsError, + SetWindowRequiresOrderByError, +} from "../../errors.js" import { transactionScopedScheduler } from "../../scheduler.js" import { getActiveTransaction } from "../../transactions.js" import { CollectionSubscriber } from "./collection-subscriber.js" import { getCollectionBuilder } from "./collection-registry.js" +import type { WindowOptions } from "../compiler/index.js" import type { SchedulerContextId } from "../../scheduler.js" import type { CollectionSubscription } from "../../collection/subscription.js" import type { RootStreamBuilder } from "@tanstack/db-ivm" @@ -32,6 +36,11 @@ import type { AllCollectionEvents } from "../../collection/events.js" export type LiveQueryCollectionUtils = UtilsRecord & { getRunCount: () => number getBuilder: () => CollectionConfigBuilder + /** + * Sets the offset and limit of an ordered query. + * Is a no-op if the query is not ordered. + */ + setWindow: (options: WindowOptions) => void } type PendingGraphRun = { @@ -81,6 +90,10 @@ export class CollectionConfigBuilder< // Reference to the live query collection for error state transitions private liveQueryCollection?: Collection + private windowFn: ((options: WindowOptions) => void) | undefined + + private maybeRunGraphFn: (() => void) | undefined + private readonly aliasDependencies: Record< string, Array> @@ -171,10 +184,20 @@ export class CollectionConfigBuilder< utils: { getRunCount: this.getRunCount.bind(this), getBuilder: () => this, + setWindow: this.setWindow.bind(this), }, } } + setWindow(options: WindowOptions) { + if (!this.windowFn) { + throw new SetWindowRequiresOrderByError() + } + + this.windowFn(options) + this.maybeRunGraphFn?.() + } + /** * Resolves a collection alias to its collection ID. * @@ -457,6 +480,8 @@ export class CollectionConfigBuilder< fullSyncState ) + this.maybeRunGraphFn = () => this.scheduleGraphRun(loadMoreDataCallbacks) + // Initial run with callback to load more data if needed this.scheduleGraphRun(loadMoreDataCallbacks) @@ -517,7 +542,10 @@ export class CollectionConfigBuilder< this.subscriptions, this.lazySourcesCallbacks, this.lazySources, - this.optimizableOrderByCollections + this.optimizableOrderByCollections, + (windowFn: (options: WindowOptions) => void) => { + this.windowFn = windowFn + } ) this.pipelineCache = compilation.pipeline diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index b48ab2f5f..5ce991a83 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -242,7 +242,7 @@ export interface InsertConfig { export type UpdateMutationFnParams< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, > = { transaction: TransactionWithMutations collection: Collection @@ -251,7 +251,7 @@ export type UpdateMutationFnParams< export type InsertMutationFnParams< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, > = { transaction: TransactionWithMutations collection: Collection @@ -259,7 +259,7 @@ export type InsertMutationFnParams< export type DeleteMutationFnParams< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, > = { transaction: TransactionWithMutations collection: Collection @@ -268,21 +268,21 @@ export type DeleteMutationFnParams< export type InsertMutationFn< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > = (params: InsertMutationFnParams) => Promise export type UpdateMutationFn< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > = (params: UpdateMutationFnParams) => Promise export type DeleteMutationFn< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > = (params: DeleteMutationFnParams) => Promise @@ -321,7 +321,7 @@ export interface BaseCollectionConfig< // then it would conflict with the overloads of createCollection which // requires either T to be provided or a schema to be provided but not both! TSchema extends StandardSchemaV1 = never, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > { // If an id isn't passed in, a UUID will be @@ -503,13 +503,16 @@ export interface BaseCollectionConfig< * } */ onDelete?: DeleteMutationFn + + utils?: TUtils } export interface CollectionConfig< T extends object = Record, TKey extends string | number = string | number, TSchema extends StandardSchemaV1 = never, -> extends BaseCollectionConfig { + TUtils extends UtilsRecord = UtilsRecord, +> extends BaseCollectionConfig { sync: SyncConfig } @@ -533,7 +536,8 @@ export type CollectionConfigSingleRowOption< T extends object = Record, TKey extends string | number = string | number, TSchema extends StandardSchemaV1 = never, -> = CollectionConfig & MaybeSingleResult + TUtils extends UtilsRecord = {}, +> = CollectionConfig & MaybeSingleResult export type ChangesPayload> = Array< ChangeMessage diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 36ee2416a..710f2122a 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -1647,7 +1647,7 @@ describe(`Collection.subscribeChanges`, () => { } }) - it(`should emit change events for multiple sync transactions before marking ready`, async () => { + it(`should emit change events for multiple sync transactions before marking ready`, () => { const changeEvents: Array = [] let testSyncFunctions: any = null @@ -1750,7 +1750,7 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.state.get(3)).toEqual({ id: 3, value: `third item` }) }) - it(`should emit change events while collection is loading for filtered subscriptions`, async () => { + it(`should emit change events while collection is loading for filtered subscriptions`, () => { const changeEvents: Array = [] let testSyncFunctions: any = null diff --git a/packages/db/tests/query/compiler/basic.test.ts b/packages/db/tests/query/compiler/basic.test.ts index d4e8afcd0..32c548d8e 100644 --- a/packages/db/tests/query/compiler/basic.test.ts +++ b/packages/db/tests/query/compiler/basic.test.ts @@ -50,7 +50,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -105,7 +106,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -182,7 +184,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -247,7 +250,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index ae291ecfb..f06e7b7f0 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -177,7 +177,8 @@ describe(`Query2 Subqueries`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -295,7 +296,8 @@ describe(`Query2 Subqueries`, () => { subscriptions, { issue: dummyCallbacks, user: dummyCallbacks }, lazySources, - {} + {}, + () => {} ) const { pipeline } = compilation @@ -395,7 +397,8 @@ describe(`Query2 Subqueries`, () => { subscriptions, { issue: dummyCallbacks, user: dummyCallbacks }, lazyCollections, - {} + {}, + () => {} ) // Verify that alias metadata includes aliases from the query @@ -439,7 +442,8 @@ describe(`Query2 Subqueries`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] diff --git a/packages/db/tests/query/compiler/subquery-caching.test.ts b/packages/db/tests/query/compiler/subquery-caching.test.ts index 3df84cbee..5360ce55c 100644 --- a/packages/db/tests/query/compiler/subquery-caching.test.ts +++ b/packages/db/tests/query/compiler/subquery-caching.test.ts @@ -68,6 +68,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache1, queryMapping1 ) @@ -87,6 +88,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2, queryMapping2 ) @@ -107,6 +109,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2, new WeakMap() ) @@ -127,6 +130,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2, new WeakMap() ) @@ -138,6 +142,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2, new WeakMap() ) @@ -173,7 +178,9 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - sharedCache + () => {}, + sharedCache, + new WeakMap() ) expect(sharedCache.has(subquery)).toBe(true) @@ -186,7 +193,9 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - sharedCache + () => {}, + sharedCache, + new WeakMap() ) expect(result1).toBe(result2) // Should be the exact same object reference }) @@ -229,7 +238,9 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - sharedCache + () => {}, + sharedCache, + new WeakMap() ) const result2 = compileQuery( subquery, @@ -239,7 +250,9 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - sharedCache + () => {}, + sharedCache, + new WeakMap() ) // Should have different results since they are different objects @@ -300,7 +313,9 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - sharedCache + () => {}, + sharedCache, + new WeakMap() ) expect(result).toBeDefined() diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 14c84d7e7..7e7a31b20 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" import { Temporal } from "temporal-polyfill" import { createCollection } from "../../src/collection/index.js" import { + and, createLiveQueryCollection, eq, liveQueryCollectionOptions, @@ -937,4 +938,529 @@ describe(`createLiveQueryCollection`, () => { }) }) }) + + describe(`move functionality`, () => { + it(`should support moving orderBy window past current window using move function`, async () => { + // Create a collection with more users for testing window movement + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `desc`) + .limit(3) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 3 users (Alice, Bob, Charlie) + expect(activeUsers.size).toBe(3) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([ + `Frank`, + `Eve`, + `David`, + ]) + + // Move the window to show users David, Eve, Frank (offset: 3, limit: 3) + activeUsers.utils.setWindow({ offset: 3, limit: 3 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = activeUsers.toArray + expect(moveResults.map((r) => r.name)).toEqual([ + `Charlie`, + `Bob`, + `Alice`, + ]) + }) + + it(`should support moving orderBy window before current window using move function`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-before`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(3) + .offset(3) + ) + + await activeUsers.preload() + + // Initial result should have users David, Eve, Frank + expect(activeUsers.size).toBe(3) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([ + `David`, + `Eve`, + `Frank`, + ]) + + // Move the window to show users Alice, Bob, Charlie (offset: 0, limit: 3) + activeUsers.utils.setWindow({ offset: 0, limit: 3 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = activeUsers.toArray + expect(moveResults.map((r) => r.name)).toEqual([ + `Alice`, + `Bob`, + `Charlie`, + ]) + }) + + it(`should support moving offset while keeping limit constant`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-offset`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 2 users (Alice, Bob) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + + // Move offset to 1, keeping limit at 2 (should show Bob, Charlie) + activeUsers.utils.setWindow({ offset: 1, limit: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + + // Move offset to 2, keeping limit at 2 (should show Charlie, David) + activeUsers.utils.setWindow({ offset: 2, limit: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Charlie`, `David`]) + + // Move offset back to 0, keeping limit at 2 (should show Alice, Bob) + activeUsers.utils.setWindow({ offset: 0, limit: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults3 = activeUsers.toArray + expect(moveResults3.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + }) + + it(`should support moving limit while keeping offset constant`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-limit`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(1) + ) + + await activeUsers.preload() + + // Initial result should have 2 users starting from offset 1 (Bob, Charlie) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + + // Increase limit to 3, keeping offset at 1 (should show Bob, Charlie, David) + activeUsers.utils.setWindow({ offset: 1, limit: 3 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([ + `Bob`, + `Charlie`, + `David`, + ]) + + // Decrease limit to 1, keeping offset at 1 (should show just Bob) + activeUsers.utils.setWindow({ offset: 1, limit: 1 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Bob`]) + }) + + it(`should support changing only offset (keeping limit the same)`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-offset-only`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 2 users (Alice, Bob) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + + // Change only offset to 2, limit should remain 2 + activeUsers.utils.setWindow({ offset: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([`Charlie`, `David`]) + + // Change only offset to 1, limit should still be 2 + activeUsers.utils.setWindow({ offset: 1 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + }) + + it(`should support changing only limit (keeping offset the same)`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-limit-only`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(1) + ) + + await activeUsers.preload() + + // Initial result should have 2 users starting from offset 1 (Bob, Charlie) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + + // Change only limit to 4, offset should remain 1 + activeUsers.utils.setWindow({ limit: 4 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([ + `Bob`, + `Charlie`, + `David`, + `Eve`, + ]) + + // Change only limit to 1, offset should still be 1 + activeUsers.utils.setWindow({ limit: 1 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Bob`]) + }) + + it(`should handle edge cases when moving beyond available data`, async () => { + const limitedUsers = createCollection( + mockSyncCollectionOptions({ + id: `limited-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: limitedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 2 users (Alice, Bob) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + + // Move to offset 2, limit 2 (should show only Charlie, since we only have 3 total users) + activeUsers.utils.setWindow({ offset: 2, limit: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([`Charlie`]) // Only 1 user available at offset 2 + + // Move to offset 5, limit 2 (should show no users, beyond available data) + activeUsers.utils.setWindow({ offset: 5, limit: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2).toEqual([]) // No users available at offset 5 + + // Move to a negative offset and limit (should show no users) + activeUsers.utils.setWindow({ offset: -5, limit: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults3 = activeUsers.toArray + expect(moveResults3).toEqual([]) + + // Move back to a valid window + activeUsers.utils.setWindow({ offset: 0, limit: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults4 = activeUsers.toArray + expect(moveResults4.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + }) + + it(`should work with descending order`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-desc`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `desc`) + .limit(3) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 3 users in descending order (Frank, Eve, David) + expect(activeUsers.size).toBe(3) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([ + `Frank`, + `Eve`, + `David`, + ]) + + // Move the window to show next 3 users (Charlie, Bob, Alice) + activeUsers.utils.setWindow({ offset: 3, limit: 3 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = activeUsers.toArray + expect(moveResults.map((r) => r.name)).toEqual([ + `Charlie`, + `Bob`, + `Alice`, + ]) + }) + + it(`should throw an error when used on non-ordered queries`, async () => { + const activeUsers = createLiveQueryCollection( + (q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)) + // No orderBy clause + ) + + await activeUsers.preload() + + // Initial result should have all active users + expect(activeUsers.size).toBe(2) + + // setWindow should throw an error for non-ordered queries + expect(() => { + activeUsers.utils.setWindow({ offset: 1, limit: 1 }) + }).toThrow( + /setWindow\(\) can only be called on collections with an ORDER BY clause/ + ) + }) + + it(`should work with complex queries including joins`, async () => { + type Post = { + id: number + title: string + authorId: number + published: boolean + } + + const posts = createCollection( + mockSyncCollectionOptions({ + id: `posts-for-move-test`, + getKey: (post) => post.id, + initialData: [ + { id: 1, title: `Post A`, authorId: 1, published: true }, + { id: 2, title: `Post B`, authorId: 2, published: true }, + { id: 3, title: `Post C`, authorId: 1, published: true }, + { id: 4, title: `Post D`, authorId: 2, published: true }, + { id: 5, title: `Post E`, authorId: 1, published: true }, + { id: 6, title: `Post F`, authorId: 2, published: true }, + ], + }) + ) + + const userPosts = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .join( + { post: posts }, + ({ user, post }) => eq(user.id, post.authorId), + `inner` + ) + .where(({ user, post }) => + and(eq(user.active, true), eq(post.published, true)) + ) + .orderBy(({ post }) => post.title, `asc`) + .limit(3) + .offset(0) + ) + + await userPosts.preload() + + // Initial result should have first 3 posts (Post A, Post B, Post C) + expect(userPosts.size).toBe(3) + const initialResults = userPosts.toArray + expect(initialResults.map((r) => r.post.title)).toEqual([ + `Post A`, + `Post B`, + `Post C`, + ]) + + // Move the window to show next 3 posts (Post D, Post E, Post F) + userPosts.utils.setWindow({ offset: 3, limit: 3 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = userPosts.toArray + expect(moveResults.map((r) => r.post.title)).toEqual([ + `Post D`, + `Post E`, + `Post F`, + ]) + }) + }) })