From 2ddccf612b7fced0e44732c266ddf2fd868c92d2 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 10:26:30 +0200 Subject: [PATCH 01/15] Mutable window for topK operator + unit tests --- .../src/operators/topKWithFractionalIndex.ts | 92 +++- .../operators/topKWithFractionalIndex.test.ts | 485 ++++++++++++++++++ 2 files changed, 564 insertions(+), 13 deletions(-) diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 81e72b54a..07aa05917 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -2,7 +2,12 @@ import { generateKeyBetween } from "fractional-indexing" import { DifferenceStreamWriter, UnaryOperator } from "../graph.js" import { StreamBuilder } from "../d2.js" import { MultiSet } from "../multiset.js" -import { binarySearch, globalObjectIdGenerator } from "../utils.js" +import { + binarySearch, + diffHalfOpen, + globalObjectIdGenerator, +} from "../utils.js" +import type { HRange } from "../utils.js" import type { DifferenceStreamReader } from "../graph.js" import type { IStreamBuilder, PipedOperator } from "../types.js" @@ -10,6 +15,7 @@ export interface TopKWithFractionalIndexOptions { limit?: number offset?: number setSizeCallback?: (getSize: () => number) => void + setMoveFn?: (moveFn: (offset: number, limit: number) => void) => void } export type TopKChanges = { @@ -19,6 +25,13 @@ export type TopKChanges = { moveOut: IndexedValue | null } +export type TopKMoveChanges = { + /** Indicates which elements move into the topK (if any) */ + moveIns: Array> + /** Indicates which elements move out of the topK (if any) */ + moveOuts: Array> +} + /** * A topK data structure that supports insertions and deletions * and returns changes to the topK. @@ -58,6 +71,24 @@ class TopKArray implements TopK { return Math.max(0, Math.min(limit, available)) } + /** + * Moves the topK window + */ + move(offset: number, limit: number): TopKMoveChanges { + const oldRange: HRange = [this.#topKStart, this.#topKEnd] + + this.#topKStart = offset + this.#topKEnd = offset + limit + + const newRange: HRange = [this.#topKStart, this.#topKEnd] + const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange) + + const moveIns = onlyInB.map((index) => this.#sortedValues[index]!) + const moveOuts = onlyInA.map((index) => this.#sortedValues[index]!) + + return { moveIns, moveOuts } + } + insert(value: V): TopKChanges { const result: TopKChanges = { moveIn: null, moveOut: null } @@ -178,8 +209,6 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< */ #topK: TopK> - #limit: number - constructor( id: number, inputA: DifferenceStreamReader<[K, T]>, @@ -188,7 +217,7 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< options: TopKWithFractionalIndexOptions ) { super(id, inputA, output) - this.#limit = options.limit ?? Infinity + const limit = options.limit ?? Infinity const offset = options.offset ?? 0 const compareTaggedValues = ( a: TaggedValue, @@ -204,8 +233,9 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< const tieBreakerB = getTag(b) return tieBreakerA - tieBreakerB } - this.#topK = this.createTopK(offset, this.#limit, compareTaggedValues) + this.#topK = this.createTopK(offset, limit, compareTaggedValues) options.setSizeCallback?.(() => this.#topK.size) + options.setMoveFn?.(this.moveTopK.bind(this)) } protected createTopK( @@ -216,6 +246,29 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< return new TopKArray(offset, limit, comparator) } + /** + * Moves the topK window based on the provided offset and limit. + * Any changes to the topK are sent to the output. + */ + moveTopK(offset: number, limit: number) { + if (!(this.#topK instanceof TopKArray)) { + throw new Error( + `Cannot move B+-tree implementation of TopK with fractional index` + ) + } + + const result: Array<[[K, IndexedValue], number]> = [] + + const changes = this.#topK.move(offset, limit) + + changes.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) + changes.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) + + if (result.length > 0) { + this.output.sendData(new MultiSet(result)) + } + } + run(): void { const result: Array<[[K, IndexedValue], number]> = [] for (const message of this.inputMessages()) { @@ -258,23 +311,36 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< // so it doesn't affect the topK } - if (res.moveIn) { - const index = getIndex(res.moveIn) - const taggedValue = getValue(res.moveIn) + this.handleMoveIn(res.moveIn, result) + this.handleMoveOut(res.moveOut, result) + + return + } + + private handleMoveIn( + moveIn: IndexedValue> | null, + result: Array<[[K, IndexedValue], number]> + ) { + if (moveIn) { + const index = getIndex(moveIn) + const taggedValue = getValue(moveIn) const k = getKey(taggedValue) const val = getVal(taggedValue) result.push([[k, [val, index]], 1]) } + } - if (res.moveOut) { - const index = getIndex(res.moveOut) - const taggedValue = getValue(res.moveOut) + private handleMoveOut( + moveOut: IndexedValue> | null, + result: Array<[[K, IndexedValue], number]> + ) { + if (moveOut) { + const index = getIndex(moveOut) + const taggedValue = getValue(moveOut) const k = getKey(taggedValue) const val = getVal(taggedValue) result.push([[k, [val, index]], -1]) } - - return } private getMultiplicity(key: K): number { diff --git a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts index e7c86c9c0..6a38524ef 100644 --- a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts @@ -722,3 +722,488 @@ describe(`Operators`, () => { }) }) }) + +describe(`Operators`, () => { + describe(`TopKWithFractionalIndex operator with array`, () => { + it(`should support moving topK window past current window using setMoveFn callback`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | undefined + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 3, + offset: 0, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 3 elements (a, b, c) + const initialResult = tracker.getResult() + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] // fractional index + const bIndex = b[1][1] // fractional index + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const initialSortedValues = initialSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`, `c`]) + + // Verify moveFn was set + expect(moveFn).toBeDefined() + + const numberOfMessages = tracker.getResult().messageCount + + // Move the window to show elements d, e, f (offset: 3, limit: 3) + moveFn!(3, 3) + graph.run() + + const moveResult = tracker.getResult() + + // Should now show d, e, f + const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues = moveSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`d`, `e`, `f`]) + + // Check that only the affected keys produce messages + const affectedKeys = moveResult.messages.map(([key, _]) => key[0]) + expect(affectedKeys.length).toBe(numberOfMessages + 6) + expect(affectedKeys).toEqual(expect.arrayContaining([1, 2, 3, 4, 5, 6])) + }) + + it(`should support moving topK window before current window using setMoveFn callback`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | undefined + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 3, + offset: 3, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 3 elements (a, b, c) + const initialResult = tracker.getResult() + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] // fractional index + const bIndex = b[1][1] // fractional index + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const initialSortedValues = initialSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`d`, `e`, `f`]) + + // Verify moveFn was set + expect(moveFn).toBeDefined() + + const numberOfMessages = tracker.getResult().messageCount + + // Move the window to show elements d, e, f (offset: 3, limit: 3) + moveFn!(0, 3) + graph.run() + + const moveResult = tracker.getResult() + + // Should now show d, e, f + const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues = moveSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`a`, `b`, `c`]) + + // Check that only the affected keys produce messages + const affectedKeys = moveResult.messages.map(([key, _]) => key[0]) + expect(affectedKeys.length).toBe(numberOfMessages + 6) + expect(affectedKeys).toEqual(expect.arrayContaining([1, 2, 3, 4, 5, 6])) + }) + + it(`should support moving offset while keeping limit constant`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | null = null + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 0, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult() + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const initialSortedValues = initialSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // tracker.reset() + + // Move offset to 1, keeping limit at 2 (should show b, c) + moveFn!(1, 2) + graph.run() + + const moveResult = tracker.getResult() + // expect(moveResult.messageCount).toBeGreaterThan(0) + + const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues = moveSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`]) + + // tracker.reset() + + // Move offset to 2, keeping limit at 2 (should show c, d) + moveFn!(2, 2) + graph.run() + + const moveResult2 = tracker.getResult() + // expect(moveResult2.messageCount).toBeGreaterThan(0) + + const moveSortedByIndex2 = moveResult2.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues2 = moveSortedByIndex2.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`c`, `d`]) + + // Move offset back to 0, keeping limit at 2 (should show a, b) + moveFn!(0, 2) + graph.run() + + const moveResult3 = tracker.getResult() + // expect(moveResult3.messageCount).toBeGreaterThan(0) + + const moveSortedByIndex3 = moveResult3.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues3 = moveSortedByIndex3.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([`a`, `b`]) + }) + + it(`should support moving limit while keeping offset constant`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | null = null + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 1, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have 2 elements starting from offset 1 (b, c) + const initialResult = tracker.getResult() + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const initialSortedValues = initialSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`b`, `c`]) + + // Increase limit to 3, keeping offset at 1 (should show b, c, d) + moveFn!(1, 3) + graph.run() + + const moveResult = tracker.getResult() + + const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues = moveSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`, `d`]) + + // Decrease limit to 1, keeping offset at 1 (should show just b) + moveFn!(1, 1) + graph.run() + + const moveResult2 = tracker.getResult() + + const moveSortedByIndex2 = moveResult2.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues2 = moveSortedByIndex2.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`b`]) + }) + + it(`should handle edge cases when moving beyond available data`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | null = null + + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 0, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - only 3 elements: a, b, c + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult() + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const initialSortedValues = initialSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // Move to offset 2, limit 2 (should show only c, since we only have 3 total elements) + moveFn!(2, 2) + graph.run() + + const moveResult = tracker.getResult() + + const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues = moveSortedByIndex.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`]) // Only 1 element available at offset 2 + + // Move to offset 5, limit 2 (should show no elements, beyond available data) + moveFn!(5, 2) + graph.run() + + const moveResult2 = tracker.getResult() + + const moveSortedByIndex2 = moveResult2.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues2 = moveSortedByIndex2.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([]) // No elements available at offset 5 + + // Move to a negative offset and limit (should show no elements) + moveFn!(-5, 2) + graph.run() + + const moveResult3 = tracker.getResult() + + const moveSortedByIndex3 = moveResult3.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues3 = moveSortedByIndex3.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([]) + + // Move back to a valid window + moveFn!(0, 2) + graph.run() + + const moveResult4 = tracker.getResult() + + const moveSortedByIndex4 = moveResult4.sortedResults.sort((a, b) => { + const aIndex = a[1][1] + const bIndex = b[1][1] + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 + }) + + const moveSortedValues4 = moveSortedByIndex4.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues4).toEqual([`a`, `b`]) + }) + }) +}) From d60d9e8239a9ad6a33091514b7fa45b0ab0b6500 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 12:03:16 +0200 Subject: [PATCH 02/15] Make window mutable in orderBy operator + unit tests --- packages/db-ivm/src/operators/orderBy.ts | 3 + .../orderByWithFractionalIndex.test.ts | 406 ++++++++++++++++++ 2 files changed, 409 insertions(+) diff --git a/packages/db-ivm/src/operators/orderBy.ts b/packages/db-ivm/src/operators/orderBy.ts index 8f7a774da..f13d03fc1 100644 --- a/packages/db-ivm/src/operators/orderBy.ts +++ b/packages/db-ivm/src/operators/orderBy.ts @@ -13,6 +13,7 @@ export interface OrderByOptions { type OrderByWithFractionalIndexOptions = OrderByOptions & { setSizeCallback?: (getSize: () => number) => void + setMoveFn?: (moveFn: (offset: number, limit: number) => void) => void } /** @@ -147,6 +148,7 @@ export function orderByWithFractionalIndexBase< const limit = options?.limit ?? Infinity const offset = options?.offset ?? 0 const setSizeCallback = options?.setSizeCallback + const setMoveFn = options?.setMoveFn const comparator = options?.comparator ?? ((a, b) => { @@ -167,6 +169,7 @@ export function orderByWithFractionalIndexBase< limit, offset, setSizeCallback, + setMoveFn, } ), consolidate() diff --git a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts index 169992ed1..9f6b7c83b 100644 --- a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts @@ -488,6 +488,412 @@ describe(`Operators`, () => { ]) }) }) + + describe(`OrderByWithFractionalIndex operator with array`, () => { + test(`should support moving orderBy window past current window using setMoveFn callback`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | undefined + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 3, + offset: 0, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + [[`key6`, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 3 elements (a, b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`, `c`]) + + // Verify moveFn was set + expect(moveFn).toBeDefined() + + // Move the window to show elements d, e, f (offset: 3, limit: 3) + moveFn!(3, 3) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show d, e, f + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`d`, `e`, `f`]) + }) + + test(`should support moving orderBy window before current window using setMoveFn callback`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | undefined + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 3, + offset: 3, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + [[`key6`, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have elements d, e, f + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + expect(initialResult.messageCount).toBeLessThanOrEqual(6) + + // Verify initial order + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`d`, `e`, `f`]) + + // Verify moveFn was set + expect(moveFn).toBeDefined() + + // Move the window to show elements a, b, c (offset: 0, limit: 3) + moveFn!(0, 3) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show a, b, c + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`a`, `b`, `c`]) + }) + + test(`should support moving offset while keeping limit constant`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | null = null + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 0, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // Move offset to 1, keeping limit at 2 (should show b, c) + moveFn!(1, 2) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`]) + + // Move offset to 2, keeping limit at 2 (should show c, d) + moveFn!(2, 2) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`c`, `d`]) + + // Move offset back to 0, keeping limit at 2 (should show a, b) + moveFn!(0, 2) + graph.run() + + const moveResult3 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues3 = moveResult3.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([`a`, `b`]) + }) + + test(`should support moving limit while keeping offset constant`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | null = null + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 1, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + [[`key4`, { id: 4, value: `d` }], 1], + [[`key5`, { id: 5, value: `e` }], 1], + [[`key6`, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have 2 elements starting from offset 1 (b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`b`, `c`]) + + // Increase limit to 3, keeping offset at 1 (should show b, c, d) + moveFn!(1, 3) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`, `d`]) + + // Decrease limit to 1, keeping offset at 1 (should show just b) + moveFn!(1, 1) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([`b`]) + }) + + test(`should handle edge cases when moving beyond available data`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + string, + { + id: number + value: string + } + > + >() + const tracker = new MessageTracker< + [string, [{ id: number; value: string }, string]] + >() + + let moveFn: ((offset: number, limit: number) => void) | null = null + + input.pipe( + orderByWithFractionalIndex((item) => item.value, { + limit: 2, + offset: 0, + setMoveFn: (fn) => { + moveFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - only 3 elements: a, b, c + input.sendData( + new MultiSet([ + [[`key1`, { id: 1, value: `a` }], 1], + [[`key2`, { id: 2, value: `b` }], 1], + [[`key3`, { id: 3, value: `c` }], 1], + ]) + ) + graph.run() + + // Initial result should have first 2 elements (a, b) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`]) + + // Move to offset 2, limit 2 (should show only c, since we only have 3 total elements) + moveFn!(2, 2) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`]) // Only 1 element available at offset 2 + + // Move to offset 5, limit 2 (should show no elements, beyond available data) + moveFn!(5, 2) + graph.run() + + const moveResult2 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues2 = moveResult2.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues2).toEqual([]) // No elements available at offset 5 + + // Move to a negative offset and limit (should show no elements) + moveFn!(-5, 2) + graph.run() + + const moveResult3 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues3 = moveResult3.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues3).toEqual([]) + + // Move back to a valid window + moveFn!(0, 2) + graph.run() + + const moveResult4 = tracker.getResult(compareFractionalIndex) + + const moveSortedValues4 = moveResult4.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues4).toEqual([`a`, `b`]) + }) + }) }) /** From 180c801d6c88db5eb36211fbf45fbdb39a2d34e6 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 12:08:16 +0200 Subject: [PATCH 03/15] Use compareFractionalIndex helper function in topKWithFractionalIndex tests --- .../orderByWithFractionalIndex.test.ts | 11 +- .../operators/topKWithFractionalIndex.test.ts | 169 ++++-------------- packages/db-ivm/tests/test-utils.ts | 9 + 3 files changed, 47 insertions(+), 142 deletions(-) diff --git a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts index 9f6b7c83b..928bba2fa 100644 --- a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts @@ -7,18 +7,9 @@ import { } from "../../src/operators/index.js" import { orderByWithFractionalIndexBTree } from "../../src/operators/orderByBTree.js" import { loadBTree } from "../../src/operators/topKWithFractionalIndexBTree.js" -import { MessageTracker } from "../test-utils.js" +import { MessageTracker, compareFractionalIndex } from "../test-utils.js" import type { KeyValue } from "../../src/types.js" -const compareFractionalIndex = ( - r1: [string, [{ id: number; value: string }, string]], - r2: [string, [{ id: number; value: string }, string]] -) => { - const [_key1, [_value1, index1]] = r1 - const [_key2, [_value2, index2]] = r2 - return index1 < index2 ? -1 : index1 > index2 ? 1 : 0 -} - const stripFractionalIndex = ([[key, [value, _index]], multiplicity]: any) => [ key, value, diff --git a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts index 6a38524ef..5bdbfd558 100644 --- a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts @@ -7,7 +7,11 @@ import { topKWithFractionalIndexBTree, } from "../../src/operators/topKWithFractionalIndexBTree.js" import { output } from "../../src/operators/index.js" -import { MessageTracker, assertOnlyKeysAffected } from "../test-utils.js" +import { + MessageTracker, + assertOnlyKeysAffected, + compareFractionalIndex, +} from "../test-utils.js" // Helper function to check if indices are in lexicographic order function checkLexicographicOrder(results: Array) { @@ -763,18 +767,12 @@ describe(`Operators`, () => { graph.run() // Initial result should have first 3 elements (a, b, c) - const initialResult = tracker.getResult() + const initialResult = tracker.getResult(compareFractionalIndex) expect(initialResult.sortedResults.length).toBe(3) expect(initialResult.messageCount).toBeLessThanOrEqual(6) // Verify initial order - const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] // fractional index - const bIndex = b[1][1] // fractional index - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const initialSortedValues = initialSortedByIndex.map( + const initialSortedValues = initialResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(initialSortedValues).toEqual([`a`, `b`, `c`]) @@ -788,16 +786,10 @@ describe(`Operators`, () => { moveFn!(3, 3) graph.run() - const moveResult = tracker.getResult() + const moveResult = tracker.getResult(compareFractionalIndex) // Should now show d, e, f - const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const moveSortedValues = moveSortedByIndex.map( + const moveSortedValues = moveResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues).toEqual([`d`, `e`, `f`]) @@ -846,18 +838,12 @@ describe(`Operators`, () => { graph.run() // Initial result should have first 3 elements (a, b, c) - const initialResult = tracker.getResult() + const initialResult = tracker.getResult(compareFractionalIndex) expect(initialResult.sortedResults.length).toBe(3) expect(initialResult.messageCount).toBeLessThanOrEqual(6) // Verify initial order - const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] // fractional index - const bIndex = b[1][1] // fractional index - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const initialSortedValues = initialSortedByIndex.map( + const initialSortedValues = initialResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(initialSortedValues).toEqual([`d`, `e`, `f`]) @@ -871,16 +857,10 @@ describe(`Operators`, () => { moveFn!(0, 3) graph.run() - const moveResult = tracker.getResult() + const moveResult = tracker.getResult(compareFractionalIndex) // Should now show d, e, f - const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const moveSortedValues = moveSortedByIndex.map( + const moveSortedValues = moveResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues).toEqual([`a`, `b`, `c`]) @@ -928,16 +908,10 @@ describe(`Operators`, () => { graph.run() // Initial result should have first 2 elements (a, b) - const initialResult = tracker.getResult() + const initialResult = tracker.getResult(compareFractionalIndex) expect(initialResult.sortedResults.length).toBe(2) - const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const initialSortedValues = initialSortedByIndex.map( + const initialSortedValues = initialResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(initialSortedValues).toEqual([`a`, `b`]) @@ -948,16 +922,9 @@ describe(`Operators`, () => { moveFn!(1, 2) graph.run() - const moveResult = tracker.getResult() - // expect(moveResult.messageCount).toBeGreaterThan(0) - - const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) + const moveResult = tracker.getResult(compareFractionalIndex) - const moveSortedValues = moveSortedByIndex.map( + const moveSortedValues = moveResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues).toEqual([`b`, `c`]) @@ -968,16 +935,9 @@ describe(`Operators`, () => { moveFn!(2, 2) graph.run() - const moveResult2 = tracker.getResult() - // expect(moveResult2.messageCount).toBeGreaterThan(0) - - const moveSortedByIndex2 = moveResult2.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) + const moveResult2 = tracker.getResult(compareFractionalIndex) - const moveSortedValues2 = moveSortedByIndex2.map( + const moveSortedValues2 = moveResult2.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues2).toEqual([`c`, `d`]) @@ -986,16 +946,9 @@ describe(`Operators`, () => { moveFn!(0, 2) graph.run() - const moveResult3 = tracker.getResult() - // expect(moveResult3.messageCount).toBeGreaterThan(0) + const moveResult3 = tracker.getResult(compareFractionalIndex) - const moveSortedByIndex3 = moveResult3.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const moveSortedValues3 = moveSortedByIndex3.map( + const moveSortedValues3 = moveResult3.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues3).toEqual([`a`, `b`]) @@ -1039,16 +992,10 @@ describe(`Operators`, () => { graph.run() // Initial result should have 2 elements starting from offset 1 (b, c) - const initialResult = tracker.getResult() + const initialResult = tracker.getResult(compareFractionalIndex) expect(initialResult.sortedResults.length).toBe(2) - const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const initialSortedValues = initialSortedByIndex.map( + const initialSortedValues = initialResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(initialSortedValues).toEqual([`b`, `c`]) @@ -1057,15 +1004,9 @@ describe(`Operators`, () => { moveFn!(1, 3) graph.run() - const moveResult = tracker.getResult() - - const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) + const moveResult = tracker.getResult(compareFractionalIndex) - const moveSortedValues = moveSortedByIndex.map( + const moveSortedValues = moveResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues).toEqual([`b`, `c`, `d`]) @@ -1074,15 +1015,9 @@ describe(`Operators`, () => { moveFn!(1, 1) graph.run() - const moveResult2 = tracker.getResult() + const moveResult2 = tracker.getResult(compareFractionalIndex) - const moveSortedByIndex2 = moveResult2.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const moveSortedValues2 = moveSortedByIndex2.map( + const moveSortedValues2 = moveResult2.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues2).toEqual([`b`]) @@ -1123,16 +1058,10 @@ describe(`Operators`, () => { graph.run() // Initial result should have first 2 elements (a, b) - const initialResult = tracker.getResult() + const initialResult = tracker.getResult(compareFractionalIndex) expect(initialResult.sortedResults.length).toBe(2) - const initialSortedByIndex = initialResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const initialSortedValues = initialSortedByIndex.map( + const initialSortedValues = initialResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(initialSortedValues).toEqual([`a`, `b`]) @@ -1141,15 +1070,9 @@ describe(`Operators`, () => { moveFn!(2, 2) graph.run() - const moveResult = tracker.getResult() + const moveResult = tracker.getResult(compareFractionalIndex) - const moveSortedByIndex = moveResult.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const moveSortedValues = moveSortedByIndex.map( + const moveSortedValues = moveResult.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues).toEqual([`c`]) // Only 1 element available at offset 2 @@ -1158,15 +1081,9 @@ describe(`Operators`, () => { moveFn!(5, 2) graph.run() - const moveResult2 = tracker.getResult() + const moveResult2 = tracker.getResult(compareFractionalIndex) - const moveSortedByIndex2 = moveResult2.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) - - const moveSortedValues2 = moveSortedByIndex2.map( + const moveSortedValues2 = moveResult2.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues2).toEqual([]) // No elements available at offset 5 @@ -1175,15 +1092,9 @@ describe(`Operators`, () => { moveFn!(-5, 2) graph.run() - const moveResult3 = tracker.getResult() - - const moveSortedByIndex3 = moveResult3.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) + const moveResult3 = tracker.getResult(compareFractionalIndex) - const moveSortedValues3 = moveSortedByIndex3.map( + const moveSortedValues3 = moveResult3.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues3).toEqual([]) @@ -1192,15 +1103,9 @@ describe(`Operators`, () => { moveFn!(0, 2) graph.run() - const moveResult4 = tracker.getResult() - - const moveSortedByIndex4 = moveResult4.sortedResults.sort((a, b) => { - const aIndex = a[1][1] - const bIndex = b[1][1] - return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 - }) + const moveResult4 = tracker.getResult(compareFractionalIndex) - const moveSortedValues4 = moveSortedByIndex4.map( + const moveSortedValues4 = moveResult4.sortedResults.map( ([_key, [value, _index]]) => value.value ) expect(moveSortedValues4).toEqual([`a`, `b`]) diff --git a/packages/db-ivm/tests/test-utils.ts b/packages/db-ivm/tests/test-utils.ts index 49a8684d7..a7ee0f0f5 100644 --- a/packages/db-ivm/tests/test-utils.ts +++ b/packages/db-ivm/tests/test-utils.ts @@ -299,3 +299,12 @@ export function assertOnlyKeysAffected( ) } } + +export const compareFractionalIndex = ( + r1: [unknown, [unknown, string]], + r2: [unknown, [unknown, string]] +) => { + const [_key1, [_value1, index1]] = r1 + const [_key2, [_value2, index2]] = r2 + return index1 < index2 ? -1 : index1 > index2 ? 1 : 0 +} From dc259e568d048d1a522f859d02ef7c58df66fd8d Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 12:08:47 +0200 Subject: [PATCH 04/15] Helper function used by topK to compute diffs between windows --- packages/db-ivm/src/utils.ts | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/packages/db-ivm/src/utils.ts b/packages/db-ivm/src/utils.ts index 902222584..8ba01bb52 100644 --- a/packages/db-ivm/src/utils.ts +++ b/packages/db-ivm/src/utils.ts @@ -144,3 +144,36 @@ export function* mapIterable( yield fn(t) } } + +export type HRange = [number, number] // half-open [start, end[ i.e. end is exclusive + +/** + * Computes the difference between two half-open ranges. + * @param a - The first half-open range + * @param b - The second half-open range + * @returns The difference between the two ranges + */ +export function diffHalfOpen(a: HRange, b: HRange) { + const [a1, a2] = a + const [b1, b2] = b + + // A \ B can be up to two segments (left and right of the overlap) + const onlyInA: Array = [ + ...range(a1, Math.min(a2, b1)), // left side of A outside B + ...range(Math.max(a1, b2), a2), // right side of A outside B + ] + + // B \ A similarly + const onlyInB: Array = [ + ...range(b1, Math.min(b2, a1)), + ...range(Math.max(b1, a2), b2), + ] + + return { onlyInA, onlyInB } +} + +function range(start: number, end: number): Array { + const out: Array = [] + for (let i = start; i < end; i++) out.push(i) + return out +} From fb9d6bb6a4192e3c8be38bb6ccdb1b5e9997cfca Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 13:51:14 +0200 Subject: [PATCH 05/15] Thread move function from ts/db through to the underlying db-ivm operators --- packages/db/src/query/compiler/index.ts | 6 ++++ packages/db/src/query/compiler/joins.ts | 6 ++++ packages/db/src/query/compiler/order-by.ts | 2 ++ .../db/src/query/live-query-collection.ts | 36 ++++++++++++------- .../query/live/collection-config-builder.ts | 23 ++++++++++-- packages/db/src/types.ts | 16 +++++++-- 6 files changed, 72 insertions(+), 17 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index b7d060abf..e1c27e1c7 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -58,6 +58,7 @@ export function compileQuery( callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, + setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap() ): CompilationResult { @@ -94,6 +95,7 @@ export function compileQuery( callbacks, lazyCollections, optimizableOrderByCollections, + setMoveFn, cache, queryMapping ) @@ -127,6 +129,7 @@ export function compileQuery( callbacks, lazyCollections, optimizableOrderByCollections, + setMoveFn, rawQuery, compileQuery ) @@ -267,6 +270,7 @@ export function compileQuery( query.select || {}, collections[mainCollectionId]!, optimizableOrderByCollections, + setMoveFn, query.limit, query.offset ) @@ -332,6 +336,7 @@ function processFrom( callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, + setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, cache: QueryCache, queryMapping: QueryMapping ): { alias: string; input: KeyedStream; collectionId: string } { @@ -356,6 +361,7 @@ function processFrom( callbacks, lazyCollections, optimizableOrderByCollections, + setMoveFn, cache, queryMapping ) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 1beb93728..7635c81d4 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -62,6 +62,7 @@ export function processJoins( callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, + setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn ): NamespacedAndKeyedStream { @@ -82,6 +83,7 @@ export function processJoins( callbacks, lazyCollections, optimizableOrderByCollections, + setMoveFn, rawQuery, onCompileSubquery ) @@ -107,6 +109,7 @@ function processJoin( callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, + setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn ): NamespacedAndKeyedStream { @@ -123,6 +126,7 @@ function processJoin( callbacks, lazyCollections, optimizableOrderByCollections, + setMoveFn, cache, queryMapping, onCompileSubquery @@ -395,6 +399,7 @@ function processJoinSource( callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, + setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, cache: QueryCache, queryMapping: QueryMapping, onCompileSubquery: CompileQueryFn @@ -420,6 +425,7 @@ function processJoinSource( callbacks, lazyCollections, optimizableOrderByCollections, + setMoveFn, cache, queryMapping ) diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index fe3bb7040..56ca40c0c 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -37,6 +37,7 @@ export function processOrderBy( selectClause: Select, collection: Collection, optimizableOrderByCollections: Record, + setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, limit?: number, offset?: number ): IStreamBuilder> { @@ -187,6 +188,7 @@ export function processOrderBy( offset, comparator: compare, setSizeCallback, + setMoveFn, }) // orderByWithFractionalIndex returns [key, [value, index]] - we keep this format ) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index c73b12b37..76a1e93e4 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -6,6 +6,7 @@ import type { Collection } from "../collection/index.js" import type { CollectionConfig, CollectionConfigSingleRowOption, + MoveUtils, NonSingleResult, SingleResult, UtilsRecord, @@ -15,16 +16,20 @@ import type { Context, GetResult } from "./builder/types.js" type CollectionConfigForContext< TContext extends Context, TResult extends object, + TUtils extends UtilsRecord = {}, > = TContext extends SingleResult - ? CollectionConfigSingleRowOption & SingleResult - : CollectionConfigSingleRowOption & NonSingleResult + ? CollectionConfigSingleRowOption & + SingleResult + : CollectionConfigSingleRowOption & + NonSingleResult type CollectionForContext< TContext extends Context, TResult extends object, + TUtils extends UtilsRecord = {}, > = TContext extends SingleResult - ? Collection & SingleResult - : Collection & NonSingleResult + ? Collection & SingleResult + : Collection & NonSingleResult /** * Creates live query collection options for use with createCollection @@ -55,14 +60,15 @@ export function liveQueryCollectionOptions< TResult extends object = GetResult, >( config: LiveQueryCollectionConfig -): CollectionConfigForContext { +): CollectionConfigForContext { const collectionConfigBuilder = new CollectionConfigBuilder< TContext, TResult >(config) return collectionConfigBuilder.getConfig() as CollectionConfigForContext< TContext, - TResult + TResult, + MoveUtils > } @@ -106,7 +112,7 @@ export function createLiveQueryCollection< TResult extends object = GetResult, >( query: (q: InitialQueryBuilder) => QueryBuilder -): CollectionForContext +): CollectionForContext // Overload 2: Accept full config object with optional utilities export function createLiveQueryCollection< @@ -115,7 +121,7 @@ export function createLiveQueryCollection< TUtils extends UtilsRecord = {}, >( config: LiveQueryCollectionConfig & { utils?: TUtils } -): CollectionForContext +): CollectionForContext // Implementation export function createLiveQueryCollection< @@ -126,7 +132,7 @@ export function createLiveQueryCollection< configOrQuery: | (LiveQueryCollectionConfig & { utils?: TUtils }) | ((q: InitialQueryBuilder) => QueryBuilder) -): CollectionForContext { +): CollectionForContext { // Determine if the argument is a function (query) or a config object if (typeof configOrQuery === `function`) { // Simple query function case @@ -138,7 +144,8 @@ export function createLiveQueryCollection< const options = liveQueryCollectionOptions(config) return bridgeToCreateCollection(options) as CollectionForContext< TContext, - TResult + TResult, + MoveUtils > } else { // Config object case @@ -149,8 +156,11 @@ export function createLiveQueryCollection< const options = liveQueryCollectionOptions(config) return bridgeToCreateCollection({ ...options, - utils: config.utils, - }) as CollectionForContext + utils: { + ...config.utils, + ...options.utils, + }, + }) as CollectionForContext } } @@ -162,7 +172,7 @@ function bridgeToCreateCollection< TResult extends object, TUtils extends UtilsRecord = {}, >( - options: CollectionConfig & { utils?: TUtils } + options: CollectionConfig ): Collection { // This is the only place we need a type assertion, hidden from user API return createCollection(options as any) as unknown as Collection< diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 6e68a5faa..26ea1e3c5 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -9,6 +9,7 @@ import type { Collection } from "../../collection/index.js" import type { CollectionConfigSingleRowOption, KeyedStream, + MoveUtils, ResultStream, SyncConfig, } from "../../types.js" @@ -54,6 +55,8 @@ export class CollectionConfigBuilder< // Reference to the live query collection for error state transitions private liveQueryCollection?: Collection + + private moveFn: undefined | ((offset: number, limit: number) => void) private graphCache: D2 | undefined private inputsCache: Record> | undefined @@ -90,7 +93,9 @@ export class CollectionConfigBuilder< this.compileBasePipeline() } - getConfig(): CollectionConfigSingleRowOption { + getConfig(): CollectionConfigSingleRowOption & { + utils: MoveUtils + } { return { id: this.id, getKey: @@ -105,9 +110,20 @@ export class CollectionConfigBuilder< onDelete: this.config.onDelete, startSync: this.config.startSync, singleResult: this.query.singleResult, + utils: { + move: this.move.bind(this), + }, } } + move(offset: number, limit: number) { + if (!this.moveFn) { + throw new Error(`Move function not set`) + } + + this.moveFn(offset, limit) + } + // The callback function is called after the graph has run. // This gives the callback a chance to load more data if needed, // that's used to optimize orderBy operators that set a limit, @@ -229,7 +245,10 @@ export class CollectionConfigBuilder< this.subscriptions, this.lazyCollectionsCallbacks, this.lazyCollections, - this.optimizableOrderByCollections + this.optimizableOrderByCollections, + (moveFn: (offset: number, limit: number) => void) => { + this.moveFn = moveFn + } ) this.pipelineCache = pipelineCache diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index b48ab2f5f..b499f6fea 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -503,13 +503,16 @@ export interface BaseCollectionConfig< * } */ onDelete?: DeleteMutationFn + + utils?: TUtils } export interface CollectionConfig< T extends object = Record, TKey extends string | number = string | number, TSchema extends StandardSchemaV1 = never, -> extends BaseCollectionConfig { + TUtils extends UtilsRecord = {}, +> extends BaseCollectionConfig { sync: SyncConfig } @@ -528,12 +531,21 @@ export type MaybeSingleResult = { singleResult?: true } +export type MoveUtils = { + /** + * Moves the offset and limit of an ordered query. + * Is a no-op if the query is not ordered. + */ + move: (offset: number, limit: number) => void +} + // Only used for live query collections export type CollectionConfigSingleRowOption< T extends object = Record, TKey extends string | number = string | number, TSchema extends StandardSchemaV1 = never, -> = CollectionConfig & MaybeSingleResult + TUtils extends UtilsRecord = {}, +> = CollectionConfig & MaybeSingleResult export type ChangesPayload> = Array< ChangeMessage From 677b417ab3d269c1af21c8b70f8697ca904fbe13 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 15:17:07 +0200 Subject: [PATCH 06/15] Fixes to ensure that lazy collections load missing data when an ordered query is moved --- .../src/operators/topKWithFractionalIndex.ts | 36 +++++++++++++++---- packages/db/src/query/compiler/order-by.ts | 20 +++++++++-- .../query/live/collection-config-builder.ts | 15 ++++++-- 3 files changed, 58 insertions(+), 13 deletions(-) diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 07aa05917..bebe82b10 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -26,6 +26,8 @@ export type TopKChanges = { } export type TopKMoveChanges = { + /** Flag that marks whether there were any changes to the topK */ + changes: boolean /** Indicates which elements move into the topK (if any) */ moveIns: Array> /** Indicates which elements move out of the topK (if any) */ @@ -83,10 +85,27 @@ class TopKArray implements TopK { const newRange: HRange = [this.#topKStart, this.#topKEnd] const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange) - const moveIns = onlyInB.map((index) => this.#sortedValues[index]!) - const moveOuts = onlyInA.map((index) => this.#sortedValues[index]!) + const moveIns: Array> = [] + onlyInB.forEach((index) => { + const value = this.#sortedValues[index] + if (value) { + moveIns.push(value) + } + }) + + const moveOuts: Array> = [] + onlyInA.forEach((index) => { + const value = this.#sortedValues[index] + if (value) { + moveOuts.push(value) + } + }) - return { moveIns, moveOuts } + // It could be that there are changes (i.e. moveIns or moveOuts) + // but that the collection is lazy so we don't have the data yet that needs to move in/out + // so `moveIns` and `moveOuts` will be empty but `changes` will be true + // this will tell the caller that it needs to run the graph to load more data + return { moveIns, moveOuts, changes: onlyInA.length + onlyInB.length > 0 } } insert(value: V): TopKChanges { @@ -259,12 +278,15 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< const result: Array<[[K, IndexedValue], number]> = [] - const changes = this.#topK.move(offset, limit) + const diff = this.#topK.move(offset, limit) - changes.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) - changes.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) + diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) + diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) - if (result.length > 0) { + if (diff.changes) { + // There are changes to the topK + // it could be that moveIns and moveOuts are empty + // because the collection is lazy, so we will run the graph again to load the data this.output.sendData(new MultiSet(result)) } } diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 56ca40c0c..2abf46b9a 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -107,6 +107,8 @@ export function processOrderBy( let setSizeCallback: ((getSize: () => number) => void) | undefined + let orderByOptimizationInfo: OrderByOptimizationInfo | undefined + // Optimize the orderBy operator to lazily load elements // by using the range index of the collection. // Only for orderBy clause on a single column for now (no composite ordering) @@ -156,7 +158,7 @@ export function processOrderBy( if (index && index.supports(`gt`)) { // We found an index that we can use to lazily load ordered data - const orderByOptimizationInfo = { + orderByOptimizationInfo = { offset: offset ?? 0, limit, comparator, @@ -173,7 +175,7 @@ export function processOrderBy( ...optimizableOrderByCollections[followRefCollection.id]!, dataNeeded: () => { const size = getSize() - return Math.max(0, limit - size) + return Math.max(0, orderByOptimizationInfo!.limit - size) }, } } @@ -188,7 +190,19 @@ export function processOrderBy( offset, comparator: compare, setSizeCallback, - setMoveFn, + setMoveFn: (moveFn: (offset: number, limit: number) => void) => { + setMoveFn( + // We wrap the move function such that we update the orderByOptimizationInfo + // because that is used by the `dataNeeded` callback to determine if we need to load more data + (offset, limit) => { + moveFn(offset, limit) + if (orderByOptimizationInfo) { + orderByOptimizationInfo.offset = offset + orderByOptimizationInfo.limit = limit + } + } + ) + }, }) // orderByWithFractionalIndex returns [key, [value, index]] - we keep this format ) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 26ea1e3c5..76ff4dfa7 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -56,7 +56,9 @@ export class CollectionConfigBuilder< // Reference to the live query collection for error state transitions private liveQueryCollection?: Collection - private moveFn: undefined | ((offset: number, limit: number) => void) + private moveFn: ((offset: number, limit: number) => void) | undefined + + private maybeRunGraphFn: (() => void) | undefined private graphCache: D2 | undefined private inputsCache: Record> | undefined @@ -118,10 +120,14 @@ export class CollectionConfigBuilder< move(offset: number, limit: number) { if (!this.moveFn) { - throw new Error(`Move function not set`) + console.log( + `This collection can't be moved because no move function was set` + ) + return } this.moveFn(offset, limit) + this.maybeRunGraphFn?.() } // The callback function is called after the graph has run. @@ -204,8 +210,11 @@ export class CollectionConfigBuilder< fullSyncState ) + this.maybeRunGraphFn = () => + this.maybeRunGraph(config, fullSyncState, loadMoreDataCallbacks) + // Initial run with callback to load more data if needed - this.maybeRunGraph(config, fullSyncState, loadMoreDataCallbacks) + this.maybeRunGraphFn() // Return the unsubscribe function return () => { From d610a21006b105cc5d654ce1d876ca1c3897ccc3 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 15:20:40 +0200 Subject: [PATCH 07/15] Unit tests for moving a live query --- .../tests/query/live-query-collection.test.ts | 425 ++++++++++++++++++ 1 file changed, 425 insertions(+) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 14c84d7e7..7827af5be 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -937,4 +937,429 @@ describe(`createLiveQueryCollection`, () => { }) }) }) + + describe(`move functionality`, () => { + it(`should support moving orderBy window past current window using move function`, async () => { + // Create a collection with more users for testing window movement + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `desc`) + .limit(3) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 3 users (Alice, Bob, Charlie) + expect(activeUsers.size).toBe(3) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([ + `Frank`, + `Eve`, + `David`, + ]) + + // Move the window to show users David, Eve, Frank (offset: 3, limit: 3) + activeUsers.utils.move(3, 3) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = activeUsers.toArray + expect(moveResults.map((r) => r.name)).toEqual([ + `Charlie`, + `Bob`, + `Alice`, + ]) + }) + + it(`should support moving orderBy window before current window using move function`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-before`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(3) + .offset(3) + ) + + await activeUsers.preload() + + // Initial result should have users David, Eve, Frank + expect(activeUsers.size).toBe(3) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([ + `David`, + `Eve`, + `Frank`, + ]) + + // Move the window to show users Alice, Bob, Charlie (offset: 0, limit: 3) + activeUsers.utils.move(0, 3) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = activeUsers.toArray + expect(moveResults.map((r) => r.name)).toEqual([ + `Alice`, + `Bob`, + `Charlie`, + ]) + }) + + it(`should support moving offset while keeping limit constant`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-offset`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 2 users (Alice, Bob) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + + // Move offset to 1, keeping limit at 2 (should show Bob, Charlie) + activeUsers.utils.move(1, 2) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + + // Move offset to 2, keeping limit at 2 (should show Charlie, David) + activeUsers.utils.move(2, 2) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Charlie`, `David`]) + + // Move offset back to 0, keeping limit at 2 (should show Alice, Bob) + activeUsers.utils.move(0, 2) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults3 = activeUsers.toArray + expect(moveResults3.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + }) + + it(`should support moving limit while keeping offset constant`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-limit`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(1) + ) + + await activeUsers.preload() + + // Initial result should have 2 users starting from offset 1 (Bob, Charlie) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + + // Increase limit to 3, keeping offset at 1 (should show Bob, Charlie, David) + activeUsers.utils.move(1, 3) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([ + `Bob`, + `Charlie`, + `David`, + ]) + + // Decrease limit to 1, keeping offset at 1 (should show just Bob) + activeUsers.utils.move(1, 1) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Bob`]) + }) + + it(`should handle edge cases when moving beyond available data`, async () => { + const limitedUsers = createCollection( + mockSyncCollectionOptions({ + id: `limited-users`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: limitedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 2 users (Alice, Bob) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + + // Move to offset 2, limit 2 (should show only Charlie, since we only have 3 total users) + activeUsers.utils.move(2, 2) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([`Charlie`]) // Only 1 user available at offset 2 + + // Move to offset 5, limit 2 (should show no users, beyond available data) + activeUsers.utils.move(5, 2) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2).toEqual([]) // No users available at offset 5 + + // Move to a negative offset and limit (should show no users) + activeUsers.utils.move(-5, 2) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults3 = activeUsers.toArray + expect(moveResults3).toEqual([]) + + // Move back to a valid window + activeUsers.utils.move(0, 2) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults4 = activeUsers.toArray + expect(moveResults4.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + }) + + it(`should work with descending order`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-desc`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `desc`) + .limit(3) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 3 users in descending order (Frank, Eve, David) + expect(activeUsers.size).toBe(3) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([ + `Frank`, + `Eve`, + `David`, + ]) + + // Move the window to show next 3 users (Charlie, Bob, Alice) + activeUsers.utils.move(3, 3) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = activeUsers.toArray + expect(moveResults.map((r) => r.name)).toEqual([ + `Charlie`, + `Bob`, + `Alice`, + ]) + }) + + it(`should be a no-op when used on non-ordered queries`, async () => { + const activeUsers = createLiveQueryCollection( + (q) => + q + .from({ user: usersCollection }) + .where(({ user }) => eq(user.active, true)) + // No orderBy clause + ) + + await activeUsers.preload() + + // Initial result should have all active users + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.length).toBe(2) + + // Move should be a no-op for non-ordered queries + activeUsers.utils.move(1, 1) + + // Wait a bit to ensure no changes occur + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = activeUsers.toArray + expect(moveResults.length).toBe(2) // Should still have the same number of results + expect(moveResults).toEqual(initialResults) // Should be the same results + }) + + it(`should work with complex queries including joins`, async () => { + type Post = { + id: number + title: string + authorId: number + published: boolean + } + + const posts = createCollection( + mockSyncCollectionOptions({ + id: `posts-for-move-test`, + getKey: (post) => post.id, + initialData: [ + { id: 1, title: `Post A`, authorId: 1, published: true }, + { id: 2, title: `Post B`, authorId: 2, published: true }, + { id: 3, title: `Post C`, authorId: 1, published: true }, + { id: 4, title: `Post D`, authorId: 2, published: true }, + { id: 5, title: `Post E`, authorId: 1, published: true }, + { id: 6, title: `Post F`, authorId: 2, published: true }, + ], + }) + ) + + const userPosts = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .join( + { post: posts }, + ({ user, post }) => eq(user.id, post.authorId), + `inner` + ) + .where( + ({ user, post }) => + eq(user.active, true) && eq(post.published, true) + ) + .orderBy(({ post }) => post.title, `asc`) + .limit(3) + .offset(0) + ) + + await userPosts.preload() + + // Initial result should have first 3 posts (Post A, Post B, Post C) + expect(userPosts.size).toBe(3) + const initialResults = userPosts.toArray + expect(initialResults.map((r) => r.post.title)).toEqual([ + `Post A`, + `Post B`, + `Post C`, + ]) + + // Move the window to show next 3 posts (Post D, Post E, Post F) + userPosts.utils.move(3, 3) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults = userPosts.toArray + expect(moveResults.map((r) => r.post.title)).toEqual([ + `Post D`, + `Post E`, + `Post F`, + ]) + }) + }) }) From 7163960af19553d94ca87e68bfd04ab568b71817 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 15:26:18 +0200 Subject: [PATCH 08/15] Pass necessary setMoveFn to the compileQuery calls in unit tests --- packages/db/tests/query/compiler/basic.test.ts | 12 ++++++++---- packages/db/tests/query/compiler/subqueries.test.ts | 9 ++++++--- .../db/tests/query/compiler/subquery-caching.test.ts | 10 ++++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/packages/db/tests/query/compiler/basic.test.ts b/packages/db/tests/query/compiler/basic.test.ts index d4e8afcd0..32c548d8e 100644 --- a/packages/db/tests/query/compiler/basic.test.ts +++ b/packages/db/tests/query/compiler/basic.test.ts @@ -50,7 +50,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -105,7 +106,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -182,7 +184,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -247,7 +250,8 @@ describe(`Query2 Compiler`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index 9903f7a6c..4f5998516 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -176,7 +176,8 @@ describe(`Query2 Subqueries`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] @@ -291,7 +292,8 @@ describe(`Query2 Subqueries`, () => { }, { issues: dummyCallbacks, users: dummyCallbacks }, lazyCollections, - {} + {}, + () => {} ) // Since we're doing a left join, the collection on the right should be handled lazily @@ -359,7 +361,8 @@ describe(`Query2 Subqueries`, () => { {}, {}, new Set(), - {} + {}, + () => {} ) const messages: Array> = [] diff --git a/packages/db/tests/query/compiler/subquery-caching.test.ts b/packages/db/tests/query/compiler/subquery-caching.test.ts index 211452336..87442b43d 100644 --- a/packages/db/tests/query/compiler/subquery-caching.test.ts +++ b/packages/db/tests/query/compiler/subquery-caching.test.ts @@ -55,6 +55,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache1 ) @@ -72,6 +73,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2 ) @@ -91,6 +93,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2 ) @@ -110,6 +113,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2 ) const subqueryResult2 = compileQuery( @@ -120,6 +124,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, cache2 ) @@ -156,6 +161,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, sharedCache ) expect(sharedCache.has(subquery)).toBe(true) @@ -169,6 +175,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, sharedCache ) expect(result1).toBe(result2) // Should be the exact same object reference @@ -214,6 +221,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, sharedCache ) const result2 = compileQuery( @@ -224,6 +232,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, sharedCache ) @@ -287,6 +296,7 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, + () => {}, sharedCache ) expect(result).toBeDefined() From aca1b3d9016219e71cbda5b8f5f0e28e160267ee Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 15:31:50 +0200 Subject: [PATCH 09/15] changeset --- .changeset/clever-parks-report.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/clever-parks-report.md diff --git a/.changeset/clever-parks-report.md b/.changeset/clever-parks-report.md new file mode 100644 index 000000000..5fc399af1 --- /dev/null +++ b/.changeset/clever-parks-report.md @@ -0,0 +1,6 @@ +--- +"@tanstack/db-ivm": patch +"@tanstack/db": patch +--- + +Make limit and offset mutable on ordered live queries. From be835c4a6b098f9c2a1be068f3c8c5330ba688f0 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 9 Oct 2025 15:40:06 +0200 Subject: [PATCH 10/15] linting --- packages/db/src/query/compiler/order-by.ts | 8 ++++---- packages/db/tests/collection-subscribe-changes.test.ts | 4 ++-- packages/db/tests/query/live-query-collection.test.ts | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 2abf46b9a..21affbe6c 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -194,11 +194,11 @@ export function processOrderBy( setMoveFn( // We wrap the move function such that we update the orderByOptimizationInfo // because that is used by the `dataNeeded` callback to determine if we need to load more data - (offset, limit) => { - moveFn(offset, limit) + (newOffset, newLimit) => { + moveFn(newOffset, newLimit) if (orderByOptimizationInfo) { - orderByOptimizationInfo.offset = offset - orderByOptimizationInfo.limit = limit + orderByOptimizationInfo.offset = newOffset + orderByOptimizationInfo.limit = newLimit } } ) diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 36ee2416a..710f2122a 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -1647,7 +1647,7 @@ describe(`Collection.subscribeChanges`, () => { } }) - it(`should emit change events for multiple sync transactions before marking ready`, async () => { + it(`should emit change events for multiple sync transactions before marking ready`, () => { const changeEvents: Array = [] let testSyncFunctions: any = null @@ -1750,7 +1750,7 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.state.get(3)).toEqual({ id: 3, value: `third item` }) }) - it(`should emit change events while collection is loading for filtered subscriptions`, async () => { + it(`should emit change events while collection is loading for filtered subscriptions`, () => { const changeEvents: Array = [] let testSyncFunctions: any = null diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 7827af5be..458e0f0b6 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" import { Temporal } from "temporal-polyfill" import { createCollection } from "../../src/collection/index.js" import { + and, createLiveQueryCollection, eq, liveQueryCollectionOptions, @@ -1328,9 +1329,8 @@ describe(`createLiveQueryCollection`, () => { ({ user, post }) => eq(user.id, post.authorId), `inner` ) - .where( - ({ user, post }) => - eq(user.active, true) && eq(post.published, true) + .where(({ user, post }) => + and(eq(user.active, true), eq(post.published, true)) ) .orderBy(({ post }) => post.title, `asc`) .limit(3) From a3e8576f217d39d105d299e0912841628348c26f Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 14 Oct 2025 19:15:00 +0100 Subject: [PATCH 11/15] fold MoveUtils into LiveQueryCollectionUtils --- packages/db/src/query/live-query-collection.ts | 15 +++++++-------- .../src/query/live/collection-config-builder.ts | 8 ++++++-- packages/db/src/types.ts | 8 -------- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 13c62416e..5e4355a48 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -11,7 +11,6 @@ import type { Collection } from "../collection/index.js" import type { CollectionConfig, CollectionConfigSingleRowOption, - MoveUtils, NonSingleResult, SingleResult, UtilsRecord, @@ -66,7 +65,7 @@ export function liveQueryCollectionOptions< >( config: LiveQueryCollectionConfig ): CollectionConfigForContext & { - utils: LiveQueryCollectionUtils & MoveUtils + utils: LiveQueryCollectionUtils } { const collectionConfigBuilder = new CollectionConfigBuilder< TContext, @@ -75,7 +74,7 @@ export function liveQueryCollectionOptions< return collectionConfigBuilder.getConfig() as CollectionConfigForContext< TContext, TResult - > & { utils: LiveQueryCollectionUtils & MoveUtils } + > & { utils: LiveQueryCollectionUtils } } /** @@ -119,7 +118,7 @@ export function createLiveQueryCollection< >( query: (q: InitialQueryBuilder) => QueryBuilder ): CollectionForContext & { - utils: LiveQueryCollectionUtils & MoveUtils + utils: LiveQueryCollectionUtils } // Overload 2: Accept full config object with optional utilities @@ -130,7 +129,7 @@ export function createLiveQueryCollection< >( config: LiveQueryCollectionConfig & { utils?: TUtils } ): CollectionForContext & { - utils: LiveQueryCollectionUtils & TUtils & MoveUtils + utils: LiveQueryCollectionUtils & TUtils } // Implementation @@ -143,7 +142,7 @@ export function createLiveQueryCollection< | (LiveQueryCollectionConfig & { utils?: TUtils }) | ((q: InitialQueryBuilder) => QueryBuilder) ): CollectionForContext & { - utils: LiveQueryCollectionUtils & TUtils & MoveUtils + utils: LiveQueryCollectionUtils & TUtils } { // Determine if the argument is a function (query) or a config object if (typeof configOrQuery === `function`) { @@ -157,7 +156,7 @@ export function createLiveQueryCollection< return bridgeToCreateCollection(options) as CollectionForContext< TContext, TResult - > & { utils: LiveQueryCollectionUtils & TUtils & MoveUtils } + > & { utils: LiveQueryCollectionUtils & TUtils } } else { // Config object case const config = configOrQuery as LiveQueryCollectionConfig< @@ -174,7 +173,7 @@ export function createLiveQueryCollection< return bridgeToCreateCollection(options) as CollectionForContext< TContext, TResult - > & { utils: LiveQueryCollectionUtils & TUtils & MoveUtils } + > & { utils: LiveQueryCollectionUtils & TUtils } } } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index b62f99b7d..b5699b087 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -14,7 +14,6 @@ import type { Collection } from "../../collection/index.js" import type { CollectionConfigSingleRowOption, KeyedStream, - MoveUtils, ResultStream, SyncConfig, UtilsRecord, @@ -33,6 +32,11 @@ import type { AllCollectionEvents } from "../../collection/events.js" export type LiveQueryCollectionUtils = UtilsRecord & { getRunCount: () => number getBuilder: () => CollectionConfigBuilder + /** + * Moves the offset and limit of an ordered query. + * Is a no-op if the query is not ordered. + */ + move: (offset: number, limit: number) => void } type PendingGraphRun = { @@ -157,7 +161,7 @@ export class CollectionConfigBuilder< } getConfig(): CollectionConfigSingleRowOption & { - utils: LiveQueryCollectionUtils & MoveUtils + utils: LiveQueryCollectionUtils } { return { id: this.id, diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index b499f6fea..fc444cdf0 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -531,14 +531,6 @@ export type MaybeSingleResult = { singleResult?: true } -export type MoveUtils = { - /** - * Moves the offset and limit of an ordered query. - * Is a no-op if the query is not ordered. - */ - move: (offset: number, limit: number) => void -} - // Only used for live query collections export type CollectionConfigSingleRowOption< T extends object = Record, From 53a982573b63d902d6298fc1b6563e5ae0c7d07c Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 14 Oct 2025 19:56:24 +0100 Subject: [PATCH 12/15] fix utils types --- packages/db/src/collection/index.ts | 10 +++++----- packages/db/src/types.ts | 16 ++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index 965d02592..e638ec62c 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -48,7 +48,7 @@ import type { IndexProxy } from "../indexes/lazy-index.js" export interface Collection< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, TSchema extends StandardSchemaV1 = StandardSchemaV1, TInsertInput extends object = T, > extends CollectionImpl { @@ -131,7 +131,7 @@ export interface Collection< export function createCollection< T extends StandardSchemaV1, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig, TKey, T> & { schema: T @@ -144,7 +144,7 @@ export function createCollection< export function createCollection< T extends StandardSchemaV1, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig, TKey, T> & { schema: T @@ -158,7 +158,7 @@ export function createCollection< export function createCollection< T extends object, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig & { schema?: never // prohibit schema if an explicit type is provided @@ -171,7 +171,7 @@ export function createCollection< export function createCollection< T extends object, TKey extends string | number = string | number, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, >( options: CollectionConfig & { schema?: never // prohibit schema if an explicit type is provided diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index fc444cdf0..5ce991a83 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -242,7 +242,7 @@ export interface InsertConfig { export type UpdateMutationFnParams< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, > = { transaction: TransactionWithMutations collection: Collection @@ -251,7 +251,7 @@ export type UpdateMutationFnParams< export type InsertMutationFnParams< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, > = { transaction: TransactionWithMutations collection: Collection @@ -259,7 +259,7 @@ export type InsertMutationFnParams< export type DeleteMutationFnParams< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, > = { transaction: TransactionWithMutations collection: Collection @@ -268,21 +268,21 @@ export type DeleteMutationFnParams< export type InsertMutationFn< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > = (params: InsertMutationFnParams) => Promise export type UpdateMutationFn< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > = (params: UpdateMutationFnParams) => Promise export type DeleteMutationFn< T extends object = Record, TKey extends string | number = string | number, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > = (params: DeleteMutationFnParams) => Promise @@ -321,7 +321,7 @@ export interface BaseCollectionConfig< // then it would conflict with the overloads of createCollection which // requires either T to be provided or a schema to be provided but not both! TSchema extends StandardSchemaV1 = never, - TUtils extends UtilsRecord = Record, + TUtils extends UtilsRecord = UtilsRecord, TReturn = any, > { // If an id isn't passed in, a UUID will be @@ -511,7 +511,7 @@ export interface CollectionConfig< T extends object = Record, TKey extends string | number = string | number, TSchema extends StandardSchemaV1 = never, - TUtils extends UtilsRecord = {}, + TUtils extends UtilsRecord = UtilsRecord, > extends BaseCollectionConfig { sync: SyncConfig } From 86f74cef8d718ef72a77d78ec356504c46976aea Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 15 Oct 2025 09:26:17 +0100 Subject: [PATCH 13/15] rename move fn --- packages/db-ivm/src/operators/orderBy.ts | 8 +- .../src/operators/topKWithFractionalIndex.ts | 24 ++++-- .../orderByWithFractionalIndex.test.ts | 74 +++++++++++-------- .../operators/topKWithFractionalIndex.test.ts | 74 +++++++++++-------- packages/db/src/query/compiler/index.ts | 16 ++-- packages/db/src/query/compiler/joins.ts | 14 ++-- packages/db/src/query/compiler/order-by.ts | 19 +++-- packages/db/src/query/compiler/types.ts | 5 ++ .../query/live/collection-config-builder.ts | 19 ++--- .../tests/query/live-query-collection.test.ts | 28 +++---- 10 files changed, 163 insertions(+), 118 deletions(-) diff --git a/packages/db-ivm/src/operators/orderBy.ts b/packages/db-ivm/src/operators/orderBy.ts index f13d03fc1..ce399b3bc 100644 --- a/packages/db-ivm/src/operators/orderBy.ts +++ b/packages/db-ivm/src/operators/orderBy.ts @@ -13,7 +13,9 @@ export interface OrderByOptions { type OrderByWithFractionalIndexOptions = OrderByOptions & { setSizeCallback?: (getSize: () => number) => void - setMoveFn?: (moveFn: (offset: number, limit: number) => void) => void + setWindowFn?: ( + windowFn: (options: { offset?: number; limit?: number }) => void + ) => void } /** @@ -148,7 +150,7 @@ export function orderByWithFractionalIndexBase< const limit = options?.limit ?? Infinity const offset = options?.offset ?? 0 const setSizeCallback = options?.setSizeCallback - const setMoveFn = options?.setMoveFn + const setWindowFn = options?.setWindowFn const comparator = options?.comparator ?? ((a, b) => { @@ -169,7 +171,7 @@ export function orderByWithFractionalIndexBase< limit, offset, setSizeCallback, - setMoveFn, + setWindowFn, } ), consolidate() diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index bebe82b10..3b75521e3 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -15,7 +15,9 @@ export interface TopKWithFractionalIndexOptions { limit?: number offset?: number setSizeCallback?: (getSize: () => number) => void - setMoveFn?: (moveFn: (offset: number, limit: number) => void) => void + setWindowFn?: ( + windowFn: (options: { offset?: number; limit?: number }) => void + ) => void } export type TopKChanges = { @@ -76,11 +78,19 @@ class TopKArray implements TopK { /** * Moves the topK window */ - move(offset: number, limit: number): TopKMoveChanges { + move({ + offset, + limit, + }: { + offset?: number + limit?: number + }): TopKMoveChanges { + const oldOffset = this.#topKStart + const oldLimit = this.#topKEnd - this.#topKStart const oldRange: HRange = [this.#topKStart, this.#topKEnd] - this.#topKStart = offset - this.#topKEnd = offset + limit + this.#topKStart = offset ?? oldOffset + this.#topKEnd = this.#topKStart + (limit ?? oldLimit) const newRange: HRange = [this.#topKStart, this.#topKEnd] const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange) @@ -254,7 +264,7 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< } this.#topK = this.createTopK(offset, limit, compareTaggedValues) options.setSizeCallback?.(() => this.#topK.size) - options.setMoveFn?.(this.moveTopK.bind(this)) + options.setWindowFn?.(this.moveTopK.bind(this)) } protected createTopK( @@ -269,7 +279,7 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< * Moves the topK window based on the provided offset and limit. * Any changes to the topK are sent to the output. */ - moveTopK(offset: number, limit: number) { + moveTopK({ offset, limit }: { offset?: number; limit?: number }) { if (!(this.#topK instanceof TopKArray)) { throw new Error( `Cannot move B+-tree implementation of TopK with fractional index` @@ -278,7 +288,7 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< const result: Array<[[K, IndexedValue], number]> = [] - const diff = this.#topK.move(offset, limit) + const diff = this.#topK.move({ offset, limit }) diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) diff --git a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts index 928bba2fa..5cf574757 100644 --- a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts @@ -481,7 +481,7 @@ describe(`Operators`, () => { }) describe(`OrderByWithFractionalIndex operator with array`, () => { - test(`should support moving orderBy window past current window using setMoveFn callback`, () => { + test(`should support moving orderBy window past current window using setWindowFn callback`, () => { const graph = new D2() const input = graph.newInput< KeyValue< @@ -496,14 +496,16 @@ describe(`Operators`, () => { [string, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | undefined + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined input.pipe( orderByWithFractionalIndex((item) => item.value, { limit: 3, offset: 0, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -537,11 +539,11 @@ describe(`Operators`, () => { ) expect(initialSortedValues).toEqual([`a`, `b`, `c`]) - // Verify moveFn was set - expect(moveFn).toBeDefined() + // Verify windowFn was set + expect(windowFn).toBeDefined() // Move the window to show elements d, e, f (offset: 3, limit: 3) - moveFn!(3, 3) + windowFn!({ offset: 3, limit: 3 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -553,7 +555,7 @@ describe(`Operators`, () => { expect(moveSortedValues).toEqual([`d`, `e`, `f`]) }) - test(`should support moving orderBy window before current window using setMoveFn callback`, () => { + test(`should support moving orderBy window before current window using setWindowFn callback`, () => { const graph = new D2() const input = graph.newInput< KeyValue< @@ -568,14 +570,16 @@ describe(`Operators`, () => { [string, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | undefined + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined input.pipe( orderByWithFractionalIndex((item) => item.value, { limit: 3, offset: 3, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -609,11 +613,11 @@ describe(`Operators`, () => { ) expect(initialSortedValues).toEqual([`d`, `e`, `f`]) - // Verify moveFn was set - expect(moveFn).toBeDefined() + // Verify windowFn was set + expect(windowFn).toBeDefined() // Move the window to show elements a, b, c (offset: 0, limit: 3) - moveFn!(0, 3) + windowFn!({ offset: 0, limit: 3 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -640,14 +644,16 @@ describe(`Operators`, () => { [string, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | null = null + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null input.pipe( orderByWithFractionalIndex((item) => item.value, { limit: 2, offset: 0, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -679,7 +685,7 @@ describe(`Operators`, () => { expect(initialSortedValues).toEqual([`a`, `b`]) // Move offset to 1, keeping limit at 2 (should show b, c) - moveFn!(1, 2) + windowFn!({ offset: 1, limit: 2 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -690,7 +696,7 @@ describe(`Operators`, () => { expect(moveSortedValues).toEqual([`b`, `c`]) // Move offset to 2, keeping limit at 2 (should show c, d) - moveFn!(2, 2) + windowFn!({ offset: 2, limit: 2 }) graph.run() const moveResult2 = tracker.getResult(compareFractionalIndex) @@ -701,7 +707,7 @@ describe(`Operators`, () => { expect(moveSortedValues2).toEqual([`c`, `d`]) // Move offset back to 0, keeping limit at 2 (should show a, b) - moveFn!(0, 2) + windowFn!({ offset: 0, limit: 2 }) graph.run() const moveResult3 = tracker.getResult(compareFractionalIndex) @@ -727,14 +733,16 @@ describe(`Operators`, () => { [string, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | null = null + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null input.pipe( orderByWithFractionalIndex((item) => item.value, { limit: 2, offset: 1, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -767,7 +775,7 @@ describe(`Operators`, () => { expect(initialSortedValues).toEqual([`b`, `c`]) // Increase limit to 3, keeping offset at 1 (should show b, c, d) - moveFn!(1, 3) + windowFn!({ offset: 1, limit: 3 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -778,7 +786,7 @@ describe(`Operators`, () => { expect(moveSortedValues).toEqual([`b`, `c`, `d`]) // Decrease limit to 1, keeping offset at 1 (should show just b) - moveFn!(1, 1) + windowFn!({ offset: 1, limit: 1 }) graph.run() const moveResult2 = tracker.getResult(compareFractionalIndex) @@ -804,14 +812,16 @@ describe(`Operators`, () => { [string, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | null = null + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null input.pipe( orderByWithFractionalIndex((item) => item.value, { limit: 2, offset: 0, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -841,7 +851,7 @@ describe(`Operators`, () => { expect(initialSortedValues).toEqual([`a`, `b`]) // Move to offset 2, limit 2 (should show only c, since we only have 3 total elements) - moveFn!(2, 2) + windowFn!({ offset: 2, limit: 2 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -852,7 +862,7 @@ describe(`Operators`, () => { expect(moveSortedValues).toEqual([`c`]) // Only 1 element available at offset 2 // Move to offset 5, limit 2 (should show no elements, beyond available data) - moveFn!(5, 2) + windowFn!({ offset: 5, limit: 2 }) graph.run() const moveResult2 = tracker.getResult(compareFractionalIndex) @@ -863,7 +873,7 @@ describe(`Operators`, () => { expect(moveSortedValues2).toEqual([]) // No elements available at offset 5 // Move to a negative offset and limit (should show no elements) - moveFn!(-5, 2) + windowFn!({ offset: -5, limit: 2 }) graph.run() const moveResult3 = tracker.getResult(compareFractionalIndex) @@ -874,7 +884,7 @@ describe(`Operators`, () => { expect(moveSortedValues3).toEqual([]) // Move back to a valid window - moveFn!(0, 2) + windowFn!({ offset: 0, limit: 2 }) graph.run() const moveResult4 = tracker.getResult(compareFractionalIndex) diff --git a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts index 5bdbfd558..5739cfb36 100644 --- a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts @@ -729,21 +729,23 @@ describe(`Operators`, () => { describe(`Operators`, () => { describe(`TopKWithFractionalIndex operator with array`, () => { - it(`should support moving topK window past current window using setMoveFn callback`, () => { + it(`should support moving topK window past current window using setWindowFn callback`, () => { const graph = new D2() const input = graph.newInput<[number, { id: number; value: string }]>() const tracker = new MessageTracker< [number, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | undefined + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined input.pipe( topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { limit: 3, offset: 0, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -777,13 +779,13 @@ describe(`Operators`, () => { ) expect(initialSortedValues).toEqual([`a`, `b`, `c`]) - // Verify moveFn was set - expect(moveFn).toBeDefined() + // Verify windowFn was set + expect(windowFn).toBeDefined() const numberOfMessages = tracker.getResult().messageCount // Move the window to show elements d, e, f (offset: 3, limit: 3) - moveFn!(3, 3) + windowFn!({ offset: 3, limit: 3 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -800,21 +802,23 @@ describe(`Operators`, () => { expect(affectedKeys).toEqual(expect.arrayContaining([1, 2, 3, 4, 5, 6])) }) - it(`should support moving topK window before current window using setMoveFn callback`, () => { + it(`should support moving topK window before current window using setWindowFn callback`, () => { const graph = new D2() const input = graph.newInput<[number, { id: number; value: string }]>() const tracker = new MessageTracker< [number, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | undefined + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | undefined input.pipe( topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { limit: 3, offset: 3, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -848,13 +852,13 @@ describe(`Operators`, () => { ) expect(initialSortedValues).toEqual([`d`, `e`, `f`]) - // Verify moveFn was set - expect(moveFn).toBeDefined() + // Verify windowFn was set + expect(windowFn).toBeDefined() const numberOfMessages = tracker.getResult().messageCount // Move the window to show elements d, e, f (offset: 3, limit: 3) - moveFn!(0, 3) + windowFn!({ offset: 0, limit: 3 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -878,14 +882,16 @@ describe(`Operators`, () => { [number, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | null = null + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null input.pipe( topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { limit: 2, offset: 0, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -919,7 +925,7 @@ describe(`Operators`, () => { // tracker.reset() // Move offset to 1, keeping limit at 2 (should show b, c) - moveFn!(1, 2) + windowFn!({ offset: 1, limit: 2 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -932,7 +938,7 @@ describe(`Operators`, () => { // tracker.reset() // Move offset to 2, keeping limit at 2 (should show c, d) - moveFn!(2, 2) + windowFn!({ offset: 2, limit: 2 }) graph.run() const moveResult2 = tracker.getResult(compareFractionalIndex) @@ -943,7 +949,7 @@ describe(`Operators`, () => { expect(moveSortedValues2).toEqual([`c`, `d`]) // Move offset back to 0, keeping limit at 2 (should show a, b) - moveFn!(0, 2) + windowFn!({ offset: 0, limit: 2 }) graph.run() const moveResult3 = tracker.getResult(compareFractionalIndex) @@ -961,14 +967,16 @@ describe(`Operators`, () => { [number, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | null = null + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null input.pipe( topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { limit: 2, offset: 1, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -1001,7 +1009,7 @@ describe(`Operators`, () => { expect(initialSortedValues).toEqual([`b`, `c`]) // Increase limit to 3, keeping offset at 1 (should show b, c, d) - moveFn!(1, 3) + windowFn!({ offset: 1, limit: 3 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -1012,7 +1020,7 @@ describe(`Operators`, () => { expect(moveSortedValues).toEqual([`b`, `c`, `d`]) // Decrease limit to 1, keeping offset at 1 (should show just b) - moveFn!(1, 1) + windowFn!({ offset: 1, limit: 1 }) graph.run() const moveResult2 = tracker.getResult(compareFractionalIndex) @@ -1030,14 +1038,16 @@ describe(`Operators`, () => { [number, [{ id: number; value: string }, string]] >() - let moveFn: ((offset: number, limit: number) => void) | null = null + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null input.pipe( topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { limit: 2, offset: 0, - setMoveFn: (fn) => { - moveFn = fn + setWindowFn: (fn) => { + windowFn = fn }, }), output((message) => { @@ -1067,7 +1077,7 @@ describe(`Operators`, () => { expect(initialSortedValues).toEqual([`a`, `b`]) // Move to offset 2, limit 2 (should show only c, since we only have 3 total elements) - moveFn!(2, 2) + windowFn!({ offset: 2, limit: 2 }) graph.run() const moveResult = tracker.getResult(compareFractionalIndex) @@ -1078,7 +1088,7 @@ describe(`Operators`, () => { expect(moveSortedValues).toEqual([`c`]) // Only 1 element available at offset 2 // Move to offset 5, limit 2 (should show no elements, beyond available data) - moveFn!(5, 2) + windowFn!({ offset: 5, limit: 2 }) graph.run() const moveResult2 = tracker.getResult(compareFractionalIndex) @@ -1089,7 +1099,7 @@ describe(`Operators`, () => { expect(moveSortedValues2).toEqual([]) // No elements available at offset 5 // Move to a negative offset and limit (should show no elements) - moveFn!(-5, 2) + windowFn!({ offset: -5, limit: 2 }) graph.run() const moveResult3 = tracker.getResult(compareFractionalIndex) @@ -1100,7 +1110,7 @@ describe(`Operators`, () => { expect(moveSortedValues3).toEqual([]) // Move back to a valid window - moveFn!(0, 2) + windowFn!({ offset: 0, limit: 2 }) graph.run() const moveResult4 = tracker.getResult(compareFractionalIndex) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 033a0b6a8..1cd033453 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -28,7 +28,9 @@ import type { NamespacedAndKeyedStream, ResultStream, } from "../../types.js" -import type { QueryCache, QueryMapping } from "./types.js" +import type { QueryCache, QueryMapping, WindowOptions } from "./types.js" + +export type { WindowOptions } from "./types.js" /** * Result of query compilation including both the pipeline and source-specific WHERE clauses @@ -87,7 +89,7 @@ export function compileQuery( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, - setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap() ): CompilationResult { @@ -135,7 +137,7 @@ export function compileQuery( callbacks, lazySources, optimizableOrderByCollections, - setMoveFn, + setWindowFn, cache, queryMapping, aliasToCollectionId, @@ -171,7 +173,7 @@ export function compileQuery( callbacks, lazySources, optimizableOrderByCollections, - setMoveFn, + setWindowFn, rawQuery, compileQuery, aliasToCollectionId, @@ -314,7 +316,7 @@ export function compileQuery( query.select || {}, collections[mainCollectionId]!, optimizableOrderByCollections, - setMoveFn, + setWindowFn, query.limit, query.offset ) @@ -385,7 +387,7 @@ function processFrom( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, - setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache, queryMapping: QueryMapping, aliasToCollectionId: Record, @@ -417,7 +419,7 @@ function processFrom( callbacks, lazySources, optimizableOrderByCollections, - setMoveFn, + setWindowFn, cache, queryMapping ) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 4685c7acd..505f90bf3 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -31,7 +31,7 @@ import type { NamespacedAndKeyedStream, NamespacedRow, } from "../../types.js" -import type { QueryCache, QueryMapping } from "./types.js" +import type { QueryCache, QueryMapping, WindowOptions } from "./types.js" import type { CollectionSubscription } from "../../collection/subscription.js" /** Function type for loading specific keys into a lazy collection */ @@ -61,7 +61,7 @@ export function processJoins( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, - setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, aliasToCollectionId: Record, @@ -84,7 +84,7 @@ export function processJoins( callbacks, lazySources, optimizableOrderByCollections, - setMoveFn, + setWindowFn, rawQuery, onCompileSubquery, aliasToCollectionId, @@ -113,7 +113,7 @@ function processJoin( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, - setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, aliasToCollectionId: Record, @@ -134,7 +134,7 @@ function processJoin( callbacks, lazySources, optimizableOrderByCollections, - setMoveFn, + setWindowFn, cache, queryMapping, onCompileSubquery, @@ -425,7 +425,7 @@ function processJoinSource( callbacks: Record, lazySources: Set, optimizableOrderByCollections: Record, - setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache, queryMapping: QueryMapping, onCompileSubquery: CompileQueryFn, @@ -458,7 +458,7 @@ function processJoinSource( callbacks, lazySources, optimizableOrderByCollections, - setMoveFn, + setWindowFn, cache, queryMapping ) diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index ee864b121..679dceeda 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -5,6 +5,7 @@ import { ensureIndexForField } from "../../indexes/auto-index.js" import { findIndexForField } from "../../utils/index-optimization.js" import { compileExpression } from "./evaluators.js" import { replaceAggregatesByRefs } from "./group-by.js" +import type { WindowOptions } from "./types.js" import type { CompiledSingleRowExpression } from "./evaluators.js" import type { OrderBy, OrderByClause, QueryIR, Select } from "../ir.js" import type { NamespacedAndKeyedStream, NamespacedRow } from "../../types.js" @@ -38,7 +39,7 @@ export function processOrderBy( selectClause: Select, collection: Collection, optimizableOrderByCollections: Record, - setMoveFn: (moveFn: (offset: number, limit: number) => void) => void, + setWindowFn: (windowFn: (options: WindowOptions) => void) => void, limit?: number, offset?: number ): IStreamBuilder> { @@ -197,15 +198,19 @@ export function processOrderBy( offset, comparator: compare, setSizeCallback, - setMoveFn: (moveFn: (offset: number, limit: number) => void) => { - setMoveFn( + setWindowFn: ( + windowFn: (options: { offset?: number; limit?: number }) => void + ) => { + setWindowFn( // We wrap the move function such that we update the orderByOptimizationInfo // because that is used by the `dataNeeded` callback to determine if we need to load more data - (newOffset, newLimit) => { - moveFn(newOffset, newLimit) + (options) => { + windowFn(options) if (orderByOptimizationInfo) { - orderByOptimizationInfo.offset = newOffset - orderByOptimizationInfo.limit = newLimit + orderByOptimizationInfo.offset = + options.offset ?? orderByOptimizationInfo.offset + orderByOptimizationInfo.limit = + options.limit ?? orderByOptimizationInfo.limit } } ) diff --git a/packages/db/src/query/compiler/types.ts b/packages/db/src/query/compiler/types.ts index 8e6080e72..f10f953c9 100644 --- a/packages/db/src/query/compiler/types.ts +++ b/packages/db/src/query/compiler/types.ts @@ -10,3 +10,8 @@ export type QueryCache = WeakMap * Mapping from optimized queries back to their original queries for caching */ export type QueryMapping = WeakMap + +export type WindowOptions = { + offset?: number + limit?: number +} diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index b5699b087..476596ca6 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -6,6 +6,7 @@ import { transactionScopedScheduler } from "../../scheduler.js" import { getActiveTransaction } from "../../transactions.js" import { CollectionSubscriber } from "./collection-subscriber.js" import { getCollectionBuilder } from "./collection-registry.js" +import type { WindowOptions } from "../compiler/index.js" import type { SchedulerContextId } from "../../scheduler.js" import type { CollectionSubscription } from "../../collection/subscription.js" import type { RootStreamBuilder } from "@tanstack/db-ivm" @@ -33,10 +34,10 @@ export type LiveQueryCollectionUtils = UtilsRecord & { getRunCount: () => number getBuilder: () => CollectionConfigBuilder /** - * Moves the offset and limit of an ordered query. + * Sets the offset and limit of an ordered query. * Is a no-op if the query is not ordered. */ - move: (offset: number, limit: number) => void + setWindow: (options: WindowOptions) => void } type PendingGraphRun = { @@ -86,7 +87,7 @@ export class CollectionConfigBuilder< // Reference to the live query collection for error state transitions private liveQueryCollection?: Collection - private moveFn: ((offset: number, limit: number) => void) | undefined + private windowFn: ((options: WindowOptions) => void) | undefined private maybeRunGraphFn: (() => void) | undefined @@ -180,20 +181,20 @@ export class CollectionConfigBuilder< utils: { getRunCount: this.getRunCount.bind(this), getBuilder: () => this, - move: this.move.bind(this), + setWindow: this.setWindow.bind(this), }, } } - move(offset: number, limit: number) { - if (!this.moveFn) { + setWindow(options: WindowOptions) { + if (!this.windowFn) { console.log( `This collection can't be moved because no move function was set` ) return } - this.moveFn(offset, limit) + this.windowFn(options) this.maybeRunGraphFn?.() } @@ -542,8 +543,8 @@ export class CollectionConfigBuilder< this.lazySourcesCallbacks, this.lazySources, this.optimizableOrderByCollections, - (moveFn: (offset: number, limit: number) => void) => { - this.moveFn = moveFn + (windowFn: (options: WindowOptions) => void) => { + this.windowFn = windowFn } ) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 458e0f0b6..264cb6293 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -978,7 +978,7 @@ describe(`createLiveQueryCollection`, () => { ]) // Move the window to show users David, Eve, Frank (offset: 3, limit: 3) - activeUsers.utils.move(3, 3) + activeUsers.utils.setWindow({ offset: 3, limit: 3 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1028,7 +1028,7 @@ describe(`createLiveQueryCollection`, () => { ]) // Move the window to show users Alice, Bob, Charlie (offset: 0, limit: 3) - activeUsers.utils.move(0, 3) + activeUsers.utils.setWindow({ offset: 0, limit: 3 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1073,7 +1073,7 @@ describe(`createLiveQueryCollection`, () => { expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) // Move offset to 1, keeping limit at 2 (should show Bob, Charlie) - activeUsers.utils.move(1, 2) + activeUsers.utils.setWindow({ offset: 1, limit: 2 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1082,7 +1082,7 @@ describe(`createLiveQueryCollection`, () => { expect(moveResults1.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) // Move offset to 2, keeping limit at 2 (should show Charlie, David) - activeUsers.utils.move(2, 2) + activeUsers.utils.setWindow({ offset: 2, limit: 2 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1091,7 +1091,7 @@ describe(`createLiveQueryCollection`, () => { expect(moveResults2.map((r) => r.name)).toEqual([`Charlie`, `David`]) // Move offset back to 0, keeping limit at 2 (should show Alice, Bob) - activeUsers.utils.move(0, 2) + activeUsers.utils.setWindow({ offset: 0, limit: 2 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1133,7 +1133,7 @@ describe(`createLiveQueryCollection`, () => { expect(initialResults.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) // Increase limit to 3, keeping offset at 1 (should show Bob, Charlie, David) - activeUsers.utils.move(1, 3) + activeUsers.utils.setWindow({ offset: 1, limit: 3 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1146,7 +1146,7 @@ describe(`createLiveQueryCollection`, () => { ]) // Decrease limit to 1, keeping offset at 1 (should show just Bob) - activeUsers.utils.move(1, 1) + activeUsers.utils.setWindow({ offset: 1, limit: 1 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1185,7 +1185,7 @@ describe(`createLiveQueryCollection`, () => { expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) // Move to offset 2, limit 2 (should show only Charlie, since we only have 3 total users) - activeUsers.utils.move(2, 2) + activeUsers.utils.setWindow({ offset: 2, limit: 2 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1194,7 +1194,7 @@ describe(`createLiveQueryCollection`, () => { expect(moveResults1.map((r) => r.name)).toEqual([`Charlie`]) // Only 1 user available at offset 2 // Move to offset 5, limit 2 (should show no users, beyond available data) - activeUsers.utils.move(5, 2) + activeUsers.utils.setWindow({ offset: 5, limit: 2 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1203,7 +1203,7 @@ describe(`createLiveQueryCollection`, () => { expect(moveResults2).toEqual([]) // No users available at offset 5 // Move to a negative offset and limit (should show no users) - activeUsers.utils.move(-5, 2) + activeUsers.utils.setWindow({ offset: -5, limit: 2 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1212,7 +1212,7 @@ describe(`createLiveQueryCollection`, () => { expect(moveResults3).toEqual([]) // Move back to a valid window - activeUsers.utils.move(0, 2) + activeUsers.utils.setWindow({ offset: 0, limit: 2 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1258,7 +1258,7 @@ describe(`createLiveQueryCollection`, () => { ]) // Move the window to show next 3 users (Charlie, Bob, Alice) - activeUsers.utils.move(3, 3) + activeUsers.utils.setWindow({ offset: 3, limit: 3 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1288,7 +1288,7 @@ describe(`createLiveQueryCollection`, () => { expect(initialResults.length).toBe(2) // Move should be a no-op for non-ordered queries - activeUsers.utils.move(1, 1) + activeUsers.utils.setWindow({ offset: 1, limit: 1 }) // Wait a bit to ensure no changes occur await new Promise((resolve) => setTimeout(resolve, 10)) @@ -1349,7 +1349,7 @@ describe(`createLiveQueryCollection`, () => { ]) // Move the window to show next 3 posts (Post D, Post E, Post F) - userPosts.utils.move(3, 3) + userPosts.utils.setWindow({ offset: 3, limit: 3 }) // Wait for the move to take effect await new Promise((resolve) => setTimeout(resolve, 10)) From aac8cdb1dfeb8f56110c3ec9c954ad5df47e95be Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 15 Oct 2025 09:43:40 +0100 Subject: [PATCH 14/15] fix local only utils types --- packages/db/src/local-only.ts | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/packages/db/src/local-only.ts b/packages/db/src/local-only.ts index 756dab21a..11f19a43c 100644 --- a/packages/db/src/local-only.ts +++ b/packages/db/src/local-only.ts @@ -1,15 +1,12 @@ import type { BaseCollectionConfig, CollectionConfig, - DeleteMutationFn, DeleteMutationFnParams, InferSchemaOutput, - InsertMutationFn, InsertMutationFnParams, OperationType, PendingMutation, SyncConfig, - UpdateMutationFn, UpdateMutationFnParams, UtilsRecord, } from "./types" @@ -67,13 +64,7 @@ type LocalOnlyCollectionOptionsResult< T extends object, TKey extends string | number, TSchema extends StandardSchemaV1 | never = never, -> = Omit< - CollectionConfig, - `onInsert` | `onUpdate` | `onDelete` -> & { - onInsert?: InsertMutationFn - onUpdate?: UpdateMutationFn - onDelete?: DeleteMutationFn +> = CollectionConfig & { utils: LocalOnlyCollectionUtils } @@ -191,7 +182,7 @@ export function localOnlyCollectionOptions< const { initialData, onInsert, onUpdate, onDelete, ...restConfig } = config // Create the sync configuration with transaction confirmation capability - const syncResult = createLocalOnlySync(initialData) + const syncResult = createLocalOnlySync(initialData) /** * Create wrapper handlers that call user handlers first, then confirm transactions @@ -279,9 +270,11 @@ export function localOnlyCollectionOptions< onDelete: wrappedOnDelete, utils: { acceptMutations, - } as LocalOnlyCollectionUtils, + }, startSync: true, gcTime: 0, + } as LocalOnlyCollectionOptionsResult & { + schema?: StandardSchemaV1 } } From d4fa02472ed481056a093a3461fb59f8d3a74a6f Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 15 Oct 2025 10:21:51 +0100 Subject: [PATCH 15/15] tweaks --- .changeset/clever-parks-report.md | 16 ++- packages/db/src/errors.ts | 12 ++ .../query/live/collection-config-builder.ts | 10 +- .../tests/query/live-query-collection.test.ts | 125 ++++++++++++++++-- 4 files changed, 145 insertions(+), 18 deletions(-) diff --git a/.changeset/clever-parks-report.md b/.changeset/clever-parks-report.md index 5fc399af1..63a2ec563 100644 --- a/.changeset/clever-parks-report.md +++ b/.changeset/clever-parks-report.md @@ -3,4 +3,18 @@ "@tanstack/db": patch --- -Make limit and offset mutable on ordered live queries. +Add `utils.setWindow()` method to live query collections to dynamically change limit and offset on ordered queries. + +You can now change the pagination window of an ordered live query without recreating the collection: + +```ts +const users = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.name, "asc") + .limit(10) + .offset(0) +) + +users.utils.setWindow({ offset: 10, limit: 10 }) +``` diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 209e7b7a6..3858df178 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -629,3 +629,15 @@ export class MissingAliasInputsError extends QueryCompilationError { ) } } + +/** + * Error thrown when setWindow is called on a collection without an ORDER BY clause. + */ +export class SetWindowRequiresOrderByError extends QueryCompilationError { + constructor() { + super( + `setWindow() can only be called on collections with an ORDER BY clause. ` + + `Add .orderBy() to your query to enable window movement.` + ) + } +} diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 476596ca6..a7952e998 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,7 +1,10 @@ import { D2, output } from "@tanstack/db-ivm" import { compileQuery } from "../compiler/index.js" import { buildQuery, getQueryIR } from "../builder/index.js" -import { MissingAliasInputsError } from "../../errors.js" +import { + MissingAliasInputsError, + SetWindowRequiresOrderByError, +} from "../../errors.js" import { transactionScopedScheduler } from "../../scheduler.js" import { getActiveTransaction } from "../../transactions.js" import { CollectionSubscriber } from "./collection-subscriber.js" @@ -188,10 +191,7 @@ export class CollectionConfigBuilder< setWindow(options: WindowOptions) { if (!this.windowFn) { - console.log( - `This collection can't be moved because no move function was set` - ) - return + throw new SetWindowRequiresOrderByError() } this.windowFn(options) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 264cb6293..7e7a31b20 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -1155,6 +1155,112 @@ describe(`createLiveQueryCollection`, () => { expect(moveResults2.map((r) => r.name)).toEqual([`Bob`]) }) + it(`should support changing only offset (keeping limit the same)`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-offset-only`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(0) + ) + + await activeUsers.preload() + + // Initial result should have first 2 users (Alice, Bob) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Alice`, `Bob`]) + + // Change only offset to 2, limit should remain 2 + activeUsers.utils.setWindow({ offset: 2 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([`Charlie`, `David`]) + + // Change only offset to 1, limit should still be 2 + activeUsers.utils.setWindow({ offset: 1 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + }) + + it(`should support changing only limit (keeping offset the same)`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `extended-users-limit-only`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + { id: 4, name: `David`, active: true }, + { id: 5, name: `Eve`, active: true }, + { id: 6, name: `Frank`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + .offset(1) + ) + + await activeUsers.preload() + + // Initial result should have 2 users starting from offset 1 (Bob, Charlie) + expect(activeUsers.size).toBe(2) + const initialResults = activeUsers.toArray + expect(initialResults.map((r) => r.name)).toEqual([`Bob`, `Charlie`]) + + // Change only limit to 4, offset should remain 1 + activeUsers.utils.setWindow({ limit: 4 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults1 = activeUsers.toArray + expect(moveResults1.map((r) => r.name)).toEqual([ + `Bob`, + `Charlie`, + `David`, + `Eve`, + ]) + + // Change only limit to 1, offset should still be 1 + activeUsers.utils.setWindow({ limit: 1 }) + + // Wait for the move to take effect + await new Promise((resolve) => setTimeout(resolve, 10)) + + const moveResults2 = activeUsers.toArray + expect(moveResults2.map((r) => r.name)).toEqual([`Bob`]) + }) + it(`should handle edge cases when moving beyond available data`, async () => { const limitedUsers = createCollection( mockSyncCollectionOptions({ @@ -1271,7 +1377,7 @@ describe(`createLiveQueryCollection`, () => { ]) }) - it(`should be a no-op when used on non-ordered queries`, async () => { + it(`should throw an error when used on non-ordered queries`, async () => { const activeUsers = createLiveQueryCollection( (q) => q @@ -1284,18 +1390,13 @@ describe(`createLiveQueryCollection`, () => { // Initial result should have all active users expect(activeUsers.size).toBe(2) - const initialResults = activeUsers.toArray - expect(initialResults.length).toBe(2) - // Move should be a no-op for non-ordered queries - activeUsers.utils.setWindow({ offset: 1, limit: 1 }) - - // Wait a bit to ensure no changes occur - await new Promise((resolve) => setTimeout(resolve, 10)) - - const moveResults = activeUsers.toArray - expect(moveResults.length).toBe(2) // Should still have the same number of results - expect(moveResults).toEqual(initialResults) // Should be the same results + // setWindow should throw an error for non-ordered queries + expect(() => { + activeUsers.utils.setWindow({ offset: 1, limit: 1 }) + }).toThrow( + /setWindow\(\) can only be called on collections with an ORDER BY clause/ + ) }) it(`should work with complex queries including joins`, async () => {