Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .changeset/clever-parks-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
"@tanstack/db-ivm": patch
"@tanstack/db": patch
---

Add `utils.setWindow()` method to live query collections to dynamically change limit and offset on ordered queries.

You can now change the pagination window of an ordered live query without recreating the collection:

```ts
const users = createLiveQueryCollection((q) =>
q
.from({ user: usersCollection })
.orderBy(({ user }) => user.name, "asc")
.limit(10)
.offset(0)
)

users.utils.setWindow({ offset: 10, limit: 10 })
```
5 changes: 5 additions & 0 deletions packages/db-ivm/src/operators/orderBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export interface OrderByOptions<Ve> {

type OrderByWithFractionalIndexOptions<Ve> = OrderByOptions<Ve> & {
setSizeCallback?: (getSize: () => number) => void
setWindowFn?: (
windowFn: (options: { offset?: number; limit?: number }) => void
) => void
}

/**
Expand Down Expand Up @@ -147,6 +150,7 @@ export function orderByWithFractionalIndexBase<
const limit = options?.limit ?? Infinity
const offset = options?.offset ?? 0
const setSizeCallback = options?.setSizeCallback
const setWindowFn = options?.setWindowFn
const comparator =
options?.comparator ??
((a, b) => {
Expand All @@ -167,6 +171,7 @@ export function orderByWithFractionalIndexBase<
limit,
offset,
setSizeCallback,
setWindowFn,
}
),
consolidate()
Expand Down
124 changes: 111 additions & 13 deletions packages/db-ivm/src/operators/topKWithFractionalIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ import { generateKeyBetween } from "fractional-indexing"
import { DifferenceStreamWriter, UnaryOperator } from "../graph.js"
import { StreamBuilder } from "../d2.js"
import { MultiSet } from "../multiset.js"
import { binarySearch, globalObjectIdGenerator } from "../utils.js"
import {
binarySearch,
diffHalfOpen,
globalObjectIdGenerator,
} from "../utils.js"
import type { HRange } from "../utils.js"
import type { DifferenceStreamReader } from "../graph.js"
import type { IStreamBuilder, PipedOperator } from "../types.js"

export interface TopKWithFractionalIndexOptions {
limit?: number
offset?: number
setSizeCallback?: (getSize: () => number) => void
setWindowFn?: (
windowFn: (options: { offset?: number; limit?: number }) => void
) => void
}

export type TopKChanges<V> = {
Expand All @@ -19,6 +27,15 @@ export type TopKChanges<V> = {
moveOut: IndexedValue<V> | null
}

export type TopKMoveChanges<V> = {
/** Flag that marks whether there were any changes to the topK */
changes: boolean
/** Indicates which elements move into the topK (if any) */
moveIns: Array<IndexedValue<V>>
/** Indicates which elements move out of the topK (if any) */
moveOuts: Array<IndexedValue<V>>
}

/**
* A topK data structure that supports insertions and deletions
* and returns changes to the topK.
Expand Down Expand Up @@ -58,6 +75,49 @@ class TopKArray<V> implements TopK<V> {
return Math.max(0, Math.min(limit, available))
}

/**
* Moves the topK window
*/
move({
offset,
limit,
}: {
offset?: number
limit?: number
}): TopKMoveChanges<V> {
const oldOffset = this.#topKStart
const oldLimit = this.#topKEnd - this.#topKStart
const oldRange: HRange = [this.#topKStart, this.#topKEnd]

this.#topKStart = offset ?? oldOffset
this.#topKEnd = this.#topKStart + (limit ?? oldLimit)

const newRange: HRange = [this.#topKStart, this.#topKEnd]
const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange)

const moveIns: Array<IndexedValue<V>> = []
onlyInB.forEach((index) => {
const value = this.#sortedValues[index]
if (value) {
moveIns.push(value)
}
})

const moveOuts: Array<IndexedValue<V>> = []
onlyInA.forEach((index) => {
const value = this.#sortedValues[index]
if (value) {
moveOuts.push(value)
}
})

// It could be that there are changes (i.e. moveIns or moveOuts)
// but that the collection is lazy so we don't have the data yet that needs to move in/out
// so `moveIns` and `moveOuts` will be empty but `changes` will be true
// this will tell the caller that it needs to run the graph to load more data
return { moveIns, moveOuts, changes: onlyInA.length + onlyInB.length > 0 }
}

insert(value: V): TopKChanges<V> {
const result: TopKChanges<V> = { moveIn: null, moveOut: null }

Expand Down Expand Up @@ -178,8 +238,6 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
*/
#topK: TopK<TaggedValue<K, T>>

#limit: number

constructor(
id: number,
inputA: DifferenceStreamReader<[K, T]>,
Expand All @@ -188,7 +246,7 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
options: TopKWithFractionalIndexOptions
) {
super(id, inputA, output)
this.#limit = options.limit ?? Infinity
const limit = options.limit ?? Infinity
const offset = options.offset ?? 0
const compareTaggedValues = (
a: TaggedValue<K, T>,
Expand All @@ -204,8 +262,9 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
const tieBreakerB = getTag(b)
return tieBreakerA - tieBreakerB
}
this.#topK = this.createTopK(offset, this.#limit, compareTaggedValues)
this.#topK = this.createTopK(offset, limit, compareTaggedValues)
options.setSizeCallback?.(() => this.#topK.size)
options.setWindowFn?.(this.moveTopK.bind(this))
}

protected createTopK(
Expand All @@ -216,6 +275,32 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
return new TopKArray(offset, limit, comparator)
}

/**
* Moves the topK window based on the provided offset and limit.
* Any changes to the topK are sent to the output.
*/
moveTopK({ offset, limit }: { offset?: number; limit?: number }) {
if (!(this.#topK instanceof TopKArray)) {
throw new Error(
`Cannot move B+-tree implementation of TopK with fractional index`
)
}

const result: Array<[[K, IndexedValue<T>], number]> = []

const diff = this.#topK.move({ offset, limit })

diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result))
diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result))

if (diff.changes) {
// There are changes to the topK
// it could be that moveIns and moveOuts are empty
// because the collection is lazy, so we will run the graph again to load the data
this.output.sendData(new MultiSet(result))
}
}

run(): void {
const result: Array<[[K, IndexedValue<T>], number]> = []
for (const message of this.inputMessages()) {
Expand Down Expand Up @@ -258,23 +343,36 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
// so it doesn't affect the topK
}

if (res.moveIn) {
const index = getIndex(res.moveIn)
const taggedValue = getValue(res.moveIn)
this.handleMoveIn(res.moveIn, result)
this.handleMoveOut(res.moveOut, result)

return
}

private handleMoveIn(
moveIn: IndexedValue<TaggedValue<K, T>> | null,
result: Array<[[K, IndexedValue<T>], number]>
) {
if (moveIn) {
const index = getIndex(moveIn)
const taggedValue = getValue(moveIn)
const k = getKey(taggedValue)
const val = getVal(taggedValue)
result.push([[k, [val, index]], 1])
}
}

if (res.moveOut) {
const index = getIndex(res.moveOut)
const taggedValue = getValue(res.moveOut)
private handleMoveOut(
moveOut: IndexedValue<TaggedValue<K, T>> | null,
result: Array<[[K, IndexedValue<T>], number]>
) {
if (moveOut) {
const index = getIndex(moveOut)
const taggedValue = getValue(moveOut)
const k = getKey(taggedValue)
const val = getVal(taggedValue)
result.push([[k, [val, index]], -1])
}

return
}

private getMultiplicity(key: K): number {
Expand Down
33 changes: 33 additions & 0 deletions packages/db-ivm/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,36 @@ export function* mapIterable<T, U>(
yield fn(t)
}
}

export type HRange = [number, number] // half-open [start, end[ i.e. end is exclusive

/**
* Computes the difference between two half-open ranges.
* @param a - The first half-open range
* @param b - The second half-open range
* @returns The difference between the two ranges
*/
export function diffHalfOpen(a: HRange, b: HRange) {
const [a1, a2] = a
const [b1, b2] = b

// A \ B can be up to two segments (left and right of the overlap)
const onlyInA: Array<number> = [
...range(a1, Math.min(a2, b1)), // left side of A outside B
...range(Math.max(a1, b2), a2), // right side of A outside B
]

// B \ A similarly
const onlyInB: Array<number> = [
...range(b1, Math.min(b2, a1)),
...range(Math.max(b1, a2), b2),
]

return { onlyInA, onlyInB }
}

function range(start: number, end: number): Array<number> {
const out: Array<number> = []
for (let i = start; i < end; i++) out.push(i)
return out
}
Loading
Loading