diff --git a/.changeset/cruel-buckets-shop.md b/.changeset/cruel-buckets-shop.md
new file mode 100644
index 000000000..b677b56ee
--- /dev/null
+++ b/.changeset/cruel-buckets-shop.md
@@ -0,0 +1,5 @@
+---
+"@tanstack/db": patch
+---
+
+Added `isLoadingMore` property and `loadingMore:change` events to collections and live queries, enabling UIs to display loading indicators when more data is being fetched via `syncMore`. Each live query maintains its own isolated loading state based on its subscriptions, preventing loading status "bleed" between independent queries that share the same source collections.
diff --git a/.changeset/smooth-goats-ring.md b/.changeset/smooth-goats-ring.md
new file mode 100644
index 000000000..6fc227d34
--- /dev/null
+++ b/.changeset/smooth-goats-ring.md
@@ -0,0 +1,58 @@
+---
+"@tanstack/react-db": patch
+---
+
+Add `useLiveInfiniteQuery` hook for infinite scrolling with live updates.
+
+The new `useLiveInfiniteQuery` hook provides an infinite query pattern similar to TanStack Query's `useInfiniteQuery`, but with live updates from your local collection. It uses `liveQueryCollection.utils.setWindow()` internally to efficiently paginate through ordered data without recreating the query on each page fetch.
+
+**Key features:**
+
+- Automatic live updates as data changes in the collection
+- Efficient pagination using dynamic window adjustment
+- Peek-ahead mechanism to detect when more pages are available
+- Compatible with TanStack Query's infinite query API patterns
+
+**Example usage:**
+
+```tsx
+import { useLiveInfiniteQuery } from "@tanstack/react-db"
+
+function PostList() {
+ const { data, pages, fetchNextPage, hasNextPage, isLoading } =
+ useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: postsCollection })
+ .orderBy(({ posts }) => posts.createdAt, "desc"),
+ {
+ pageSize: 20,
+ getNextPageParam: (lastPage, allPages) =>
+ lastPage.length === 20 ? allPages.length : undefined,
+ }
+ )
+
+ if (isLoading) return
Loading...
+
+ return (
+
+ {pages.map((page, i) => (
+
+ {page.map((post) => (
+
+ ))}
+
+ ))}
+ {hasNextPage && (
+
+ )}
+
+ )
+}
+```
+
+**Requirements:**
+
+- Query must include `.orderBy()` for the window mechanism to work
+- Returns flattened `data` array and `pages` array for flexible rendering
+- Automatically detects new pages when data is synced to the collection
diff --git a/packages/db/src/collection/events.ts b/packages/db/src/collection/events.ts
index d0f56e744..0f6d8cec1 100644
--- a/packages/db/src/collection/events.ts
+++ b/packages/db/src/collection/events.ts
@@ -1,3 +1,4 @@
+import { EventEmitter } from "../event-emitter.js"
import type { Collection } from "./index.js"
import type { CollectionStatus } from "../types.js"
@@ -31,9 +32,21 @@ export interface CollectionSubscribersChangeEvent {
subscriberCount: number
}
+/**
+ * Event emitted when the collection's loading more state changes
+ */
+export interface CollectionLoadingSubsetChangeEvent {
+ type: `loadingSubset:change`
+ collection: Collection
+ isLoadingSubset: boolean
+ previousIsLoadingSubset: boolean
+ loadingSubsetTransition: `start` | `end`
+}
+
export type AllCollectionEvents = {
"status:change": CollectionStatusChangeEvent
"subscribers:change": CollectionSubscribersChangeEvent
+ "loadingSubset:change": CollectionLoadingSubsetChangeEvent
} & {
[K in CollectionStatus as `status:${K}`]: CollectionStatusEvent
}
@@ -42,94 +55,32 @@ export type CollectionEvent =
| AllCollectionEvents[keyof AllCollectionEvents]
| CollectionStatusChangeEvent
| CollectionSubscribersChangeEvent
+ | CollectionLoadingSubsetChangeEvent
export type CollectionEventHandler = (
event: AllCollectionEvents[T]
) => void
-export class CollectionEventsManager {
+export class CollectionEventsManager extends EventEmitter {
private collection!: Collection
- private listeners = new Map<
- keyof AllCollectionEvents,
- Set>
- >()
- constructor() {}
+ constructor() {
+ super()
+ }
setDeps(deps: { collection: Collection }) {
this.collection = deps.collection
}
- on(
- event: T,
- callback: CollectionEventHandler
- ) {
- if (!this.listeners.has(event)) {
- this.listeners.set(event, new Set())
- }
- this.listeners.get(event)!.add(callback)
-
- return () => {
- this.listeners.get(event)?.delete(callback)
- }
- }
-
- once(
- event: T,
- callback: CollectionEventHandler
- ) {
- const unsubscribe = this.on(event, (eventPayload) => {
- callback(eventPayload)
- unsubscribe()
- })
- return unsubscribe
- }
-
- off(
- event: T,
- callback: CollectionEventHandler
- ) {
- this.listeners.get(event)?.delete(callback)
- }
-
- waitFor(
- event: T,
- timeout?: number
- ): Promise {
- return new Promise((resolve, reject) => {
- let timeoutId: NodeJS.Timeout | undefined
- const unsubscribe = this.on(event, (eventPayload) => {
- if (timeoutId) {
- clearTimeout(timeoutId)
- timeoutId = undefined
- }
- resolve(eventPayload)
- unsubscribe()
- })
- if (timeout) {
- timeoutId = setTimeout(() => {
- timeoutId = undefined
- unsubscribe()
- reject(new Error(`Timeout waiting for event ${event}`))
- }, timeout)
- }
- })
- }
-
+ /**
+ * Emit an event to all listeners
+ * Public API for emitting collection events
+ */
emit(
event: T,
eventPayload: AllCollectionEvents[T]
- ) {
- this.listeners.get(event)?.forEach((listener) => {
- try {
- listener(eventPayload)
- } catch (error) {
- // Re-throw in a microtask to surface the error
- queueMicrotask(() => {
- throw error
- })
- }
- })
+ ): void {
+ this.emitInner(event, eventPayload)
}
emitStatusChange(
@@ -166,6 +117,6 @@ export class CollectionEventsManager {
}
cleanup() {
- this.listeners.clear()
+ this.clearListeners()
}
}
diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts
index e638ec62c..5a4ab3c07 100644
--- a/packages/db/src/collection/index.ts
+++ b/packages/db/src/collection/index.ts
@@ -25,7 +25,6 @@ import type {
InferSchemaOutput,
InsertConfig,
NonSingleResult,
- OnLoadMoreOptions,
OperationConfig,
SingleResult,
SubscribeChangesOptions,
@@ -218,7 +217,7 @@ export class CollectionImpl<
private _events: CollectionEventsManager
private _changes: CollectionChangesManager
public _lifecycle: CollectionLifecycleManager
- private _sync: CollectionSyncManager
+ public _sync: CollectionSyncManager
private _indexes: CollectionIndexesManager
private _mutations: CollectionMutationsManager<
TOutput,
@@ -303,6 +302,7 @@ export class CollectionImpl<
collection: this, // Required for passing to config.sync callback
state: this._state,
lifecycle: this._lifecycle,
+ events: this._events,
})
// Only start sync immediately if explicitly enabled
@@ -356,23 +356,19 @@ export class CollectionImpl<
}
/**
- * Start sync immediately - internal method for compiled queries
- * This bypasses lazy loading for special cases like live query results
+ * Check if the collection is currently loading more data
+ * @returns true if the collection has pending load more operations, false otherwise
*/
- public startSyncImmediate(): void {
- this._sync.startSync()
+ public get isLoadingSubset(): boolean {
+ return this._sync.isLoadingSubset
}
/**
- * Requests the sync layer to load more data.
- * @param options Options to control what data is being loaded
- * @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
- * If data loading is synchronous, the data is loaded when the method returns.
+ * Start sync immediately - internal method for compiled queries
+ * This bypasses lazy loading for special cases like live query results
*/
- public syncMore(options: OnLoadMoreOptions): void | Promise {
- if (this._sync.syncOnLoadMoreFn) {
- return this._sync.syncOnLoadMoreFn(options)
- }
+ public startSyncImmediate(): void {
+ this._sync.startSync()
}
/**
diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts
index f369971c6..d6a43a256 100644
--- a/packages/db/src/collection/subscription.ts
+++ b/packages/db/src/collection/subscription.ts
@@ -1,13 +1,20 @@
import { ensureIndexForExpression } from "../indexes/auto-index.js"
import { and, gt, lt } from "../query/builder/functions.js"
import { Value } from "../query/ir.js"
+import { EventEmitter } from "../event-emitter.js"
import {
createFilterFunctionFromExpression,
createFilteredCallback,
} from "./change-events.js"
import type { BasicExpression, OrderBy } from "../query/ir.js"
import type { IndexInterface } from "../indexes/base-index.js"
-import type { ChangeMessage } from "../types.js"
+import type {
+ ChangeMessage,
+ Subscription,
+ SubscriptionEvents,
+ SubscriptionStatus,
+ SubscriptionUnsubscribedEvent,
+} from "../types.js"
import type { CollectionImpl } from "./index.js"
type RequestSnapshotOptions = {
@@ -22,13 +29,17 @@ type RequestLimitedSnapshotOptions = {
}
type CollectionSubscriptionOptions = {
+ includeInitialState?: boolean
/** Pre-compiled expression for filtering changes */
whereExpression?: BasicExpression
/** Callback to call when the subscription is unsubscribed */
- onUnsubscribe?: () => void
+ onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void
}
-export class CollectionSubscription {
+export class CollectionSubscription
+ extends EventEmitter
+ implements Subscription
+{
private loadedInitialState = false
// Flag to indicate that we have sent at least 1 snapshot.
@@ -42,11 +53,24 @@ export class CollectionSubscription {
private orderByIndex: IndexInterface | undefined
+ // Status tracking
+ private _status: SubscriptionStatus = `ready`
+ private pendingLoadSubsetPromises: Set> = new Set()
+
+ public get status(): SubscriptionStatus {
+ return this._status
+ }
+
constructor(
private collection: CollectionImpl,
private callback: (changes: Array>) => void,
private options: CollectionSubscriptionOptions
) {
+ super()
+ if (options.onUnsubscribe) {
+ this.on(`unsubscribed`, (event) => options.onUnsubscribe!(event))
+ }
+
// Auto-index for where expressions if enabled
if (options.whereExpression) {
ensureIndexForExpression(options.whereExpression, this.collection)
@@ -71,6 +95,53 @@ export class CollectionSubscription {
this.orderByIndex = index
}
+ /**
+ * Set subscription status and emit events if changed
+ */
+ private setStatus(newStatus: SubscriptionStatus) {
+ if (this._status === newStatus) {
+ return // No change
+ }
+
+ const previousStatus = this._status
+ this._status = newStatus
+
+ // Emit status:change event
+ this.emitInner(`status:change`, {
+ type: `status:change`,
+ subscription: this,
+ previousStatus,
+ status: newStatus,
+ })
+
+ // Emit specific status event
+ const eventKey: `status:${SubscriptionStatus}` = `status:${newStatus}`
+ this.emitInner(eventKey, {
+ type: eventKey,
+ subscription: this,
+ previousStatus,
+ status: newStatus,
+ } as SubscriptionEvents[typeof eventKey])
+ }
+
+ /**
+ * Track a loadSubset promise and manage loading status
+ */
+ private trackLoadSubsetPromise(syncResult: Promise | true) {
+ // Track the promise if it's actually a promise (async work)
+ if (syncResult instanceof Promise) {
+ this.pendingLoadSubsetPromises.add(syncResult)
+ this.setStatus(`loadingSubset`)
+
+ syncResult.finally(() => {
+ this.pendingLoadSubsetPromises.delete(syncResult)
+ if (this.pendingLoadSubsetPromises.size === 0) {
+ this.setStatus(`ready`)
+ }
+ })
+ }
+ }
+
hasLoadedInitialState() {
return this.loadedInitialState
}
@@ -121,10 +192,13 @@ export class CollectionSubscription {
// Request the sync layer to load more data
// don't await it, we will load the data into the collection when it comes in
- this.collection.syncMore({
+ const syncResult = this.collection._sync.loadSubset({
where: stateOpts.where,
+ subscription: this,
})
+ this.trackLoadSubsetPromise(syncResult)
+
// Also load data immediately from the collection
const snapshot = this.collection.currentStateAsChanges(stateOpts)
@@ -215,11 +289,14 @@ export class CollectionSubscription {
// Request the sync layer to load more data
// don't await it, we will load the data into the collection when it comes in
- this.collection.syncMore({
+ const syncResult = this.collection._sync.loadSubset({
where: whereWithValueFilter,
limit,
orderBy,
+ subscription: this,
})
+
+ this.trackLoadSubsetPromise(syncResult)
}
/**
@@ -264,6 +341,11 @@ export class CollectionSubscription {
}
unsubscribe() {
- this.options.onUnsubscribe?.()
+ this.emitInner(`unsubscribed`, {
+ type: `unsubscribed`,
+ subscription: this,
+ })
+ // Clear all event listeners to prevent memory leaks
+ this.clearListeners()
}
}
diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts
index 1958c58f9..251fba608 100644
--- a/packages/db/src/collection/sync.ts
+++ b/packages/db/src/collection/sync.ts
@@ -1,4 +1,5 @@
import {
+ CollectionConfigurationError,
CollectionIsInErrorStateError,
DuplicateKeySyncError,
NoPendingSyncTransactionCommitError,
@@ -13,12 +14,13 @@ import type {
ChangeMessage,
CleanupFn,
CollectionConfig,
- OnLoadMoreOptions,
+ LoadSubsetOptions,
SyncConfigRes,
} from "../types"
import type { CollectionImpl } from "./index.js"
import type { CollectionStateManager } from "./state"
import type { CollectionLifecycleManager } from "./lifecycle"
+import type { CollectionEventsManager } from "./events.js"
export class CollectionSyncManager<
TOutput extends object = Record,
@@ -29,31 +31,38 @@ export class CollectionSyncManager<
private collection!: CollectionImpl
private state!: CollectionStateManager
private lifecycle!: CollectionLifecycleManager
+ private _events!: CollectionEventsManager
private config!: CollectionConfig
private id: string
+ private syncMode: `eager` | `on-demand`
public preloadPromise: Promise | null = null
public syncCleanupFn: (() => void) | null = null
- public syncOnLoadMoreFn:
- | ((options: OnLoadMoreOptions) => void | Promise)
+ public syncLoadSubsetFn:
+ | ((options: LoadSubsetOptions) => true | Promise)
| null = null
+ private pendingLoadSubsetPromises: Set> = new Set()
+
/**
* Creates a new CollectionSyncManager instance
*/
constructor(config: CollectionConfig, id: string) {
this.config = config
this.id = id
+ this.syncMode = config.syncMode ?? `eager`
}
setDeps(deps: {
collection: CollectionImpl
state: CollectionStateManager
lifecycle: CollectionLifecycleManager
+ events: CollectionEventsManager
}) {
this.collection = deps.collection
this.state = deps.state
this.lifecycle = deps.lifecycle
+ this._events = deps.events
}
/**
@@ -189,8 +198,16 @@ export class CollectionSyncManager<
// Store cleanup function if provided
this.syncCleanupFn = syncRes?.cleanup ?? null
- // Store onLoadMore function if provided
- this.syncOnLoadMoreFn = syncRes?.onLoadMore ?? null
+ // Store loadSubset function if provided
+ this.syncLoadSubsetFn = syncRes?.loadSubset ?? null
+
+ // Validate: on-demand mode requires a loadSubset function
+ if (this.syncMode === `on-demand` && !this.syncLoadSubsetFn) {
+ throw new CollectionConfigurationError(
+ `Collection "${this.id}" is configured with syncMode "on-demand" but the sync function did not return a loadSubset handler. ` +
+ `Either provide a loadSubset handler or use syncMode "eager".`
+ )
+ }
} catch (error) {
this.lifecycle.setStatus(`error`)
throw error
@@ -239,16 +256,71 @@ export class CollectionSyncManager<
return this.preloadPromise
}
+ /**
+ * Gets whether the collection is currently loading more data
+ */
+ public get isLoadingSubset(): boolean {
+ return this.pendingLoadSubsetPromises.size > 0
+ }
+
+ /**
+ * Tracks a load promise for isLoadingSubset state.
+ * @internal This is for internal coordination (e.g., live-query glue code), not for general use.
+ */
+ public trackLoadPromise(promise: Promise): void {
+ const loadingStarting = !this.isLoadingSubset
+ this.pendingLoadSubsetPromises.add(promise)
+
+ if (loadingStarting) {
+ this._events.emit(`loadingSubset:change`, {
+ type: `loadingSubset:change`,
+ collection: this.collection,
+ isLoadingSubset: true,
+ previousIsLoadingSubset: false,
+ loadingSubsetTransition: `start`,
+ })
+ }
+
+ promise.finally(() => {
+ const loadingEnding =
+ this.pendingLoadSubsetPromises.size === 1 &&
+ this.pendingLoadSubsetPromises.has(promise)
+ this.pendingLoadSubsetPromises.delete(promise)
+
+ if (loadingEnding) {
+ this._events.emit(`loadingSubset:change`, {
+ type: `loadingSubset:change`,
+ collection: this.collection,
+ isLoadingSubset: false,
+ previousIsLoadingSubset: true,
+ loadingSubsetTransition: `end`,
+ })
+ }
+ })
+ }
+
/**
* Requests the sync layer to load more data.
* @param options Options to control what data is being loaded
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
- * If data loading is synchronous, the data is loaded when the method returns.
+ * Returns true if no sync function is configured, if syncMode is 'eager', or if there is no work to do.
*/
- public syncMore(options: OnLoadMoreOptions): void | Promise {
- if (this.syncOnLoadMoreFn) {
- return this.syncOnLoadMoreFn(options)
+ public loadSubset(options: LoadSubsetOptions): Promise | true {
+ // Bypass loadSubset when syncMode is 'eager'
+ if (this.syncMode === `eager`) {
+ return true
+ }
+
+ if (this.syncLoadSubsetFn) {
+ const result = this.syncLoadSubsetFn(options)
+ // If the result is a promise, track it
+ if (result instanceof Promise) {
+ this.trackLoadPromise(result)
+ return result
+ }
}
+
+ return true
}
public cleanup(): void {
diff --git a/packages/db/src/event-emitter.ts b/packages/db/src/event-emitter.ts
new file mode 100644
index 000000000..19656e573
--- /dev/null
+++ b/packages/db/src/event-emitter.ts
@@ -0,0 +1,118 @@
+/**
+ * Generic type-safe event emitter
+ * @template TEvents - Record of event names to event payload types
+ */
+export class EventEmitter> {
+ private listeners = new Map<
+ keyof TEvents,
+ Set<(event: TEvents[keyof TEvents]) => void>
+ >()
+
+ /**
+ * Subscribe to an event
+ * @param event - Event name to listen for
+ * @param callback - Function to call when event is emitted
+ * @returns Unsubscribe function
+ */
+ on(
+ event: T,
+ callback: (event: TEvents[T]) => void
+ ): () => void {
+ if (!this.listeners.has(event)) {
+ this.listeners.set(event, new Set())
+ }
+ this.listeners.get(event)!.add(callback as (event: any) => void)
+
+ return () => {
+ this.listeners.get(event)?.delete(callback as (event: any) => void)
+ }
+ }
+
+ /**
+ * Subscribe to an event once (automatically unsubscribes after first emission)
+ * @param event - Event name to listen for
+ * @param callback - Function to call when event is emitted
+ * @returns Unsubscribe function
+ */
+ once(
+ event: T,
+ callback: (event: TEvents[T]) => void
+ ): () => void {
+ const unsubscribe = this.on(event, (eventPayload) => {
+ callback(eventPayload)
+ unsubscribe()
+ })
+ return unsubscribe
+ }
+
+ /**
+ * Unsubscribe from an event
+ * @param event - Event name to stop listening for
+ * @param callback - Function to remove
+ */
+ off(
+ event: T,
+ callback: (event: TEvents[T]) => void
+ ): void {
+ this.listeners.get(event)?.delete(callback as (event: any) => void)
+ }
+
+ /**
+ * Wait for an event to be emitted
+ * @param event - Event name to wait for
+ * @param timeout - Optional timeout in milliseconds
+ * @returns Promise that resolves with the event payload
+ */
+ waitFor(
+ event: T,
+ timeout?: number
+ ): Promise {
+ return new Promise((resolve, reject) => {
+ let timeoutId: NodeJS.Timeout | undefined
+ const unsubscribe = this.on(event, (eventPayload) => {
+ if (timeoutId) {
+ clearTimeout(timeoutId)
+ timeoutId = undefined
+ }
+ resolve(eventPayload)
+ unsubscribe()
+ })
+ if (timeout) {
+ timeoutId = setTimeout(() => {
+ timeoutId = undefined
+ unsubscribe()
+ reject(new Error(`Timeout waiting for event ${String(event)}`))
+ }, timeout)
+ }
+ })
+ }
+
+ /**
+ * Emit an event to all listeners
+ * @param event - Event name to emit
+ * @param eventPayload - Event payload
+ * @internal For use by subclasses - subclasses should wrap this with a public emit if needed
+ */
+ protected emitInner(
+ event: T,
+ eventPayload: TEvents[T]
+ ): void {
+ this.listeners.get(event)?.forEach((listener) => {
+ try {
+ listener(eventPayload)
+ } catch (error) {
+ // Re-throw in a microtask to surface the error
+ queueMicrotask(() => {
+ throw error
+ })
+ }
+ })
+ }
+
+ /**
+ * Clear all listeners
+ */
+ protected clearListeners(): void {
+ this.listeners.clear()
+ }
+}
diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts
index c5e5873cc..17f4dd8e7 100644
--- a/packages/db/src/query/index.ts
+++ b/packages/db/src/query/index.ts
@@ -56,3 +56,4 @@ export {
} from "./live-query-collection.js"
export { type LiveQueryCollectionConfig } from "./live/types.js"
+export { type LiveQueryCollectionUtils } from "./live/collection-config-builder.js"
diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts
index a7952e998..08302fc39 100644
--- a/packages/db/src/query/live/collection-config-builder.ts
+++ b/packages/db/src/query/live/collection-config-builder.ts
@@ -39,8 +39,10 @@ export type LiveQueryCollectionUtils = UtilsRecord & {
/**
* Sets the offset and limit of an ordered query.
* Is a no-op if the query is not ordered.
+ *
+ * @returns `true` if no subset loading was triggered, or `Promise` that resolves when the subset has been loaded
*/
- setWindow: (options: WindowOptions) => void
+ setWindow: (options: WindowOptions) => true | Promise
}
type PendingGraphRun = {
@@ -88,7 +90,7 @@ export class CollectionConfigBuilder<
private isInErrorState = false
// Reference to the live query collection for error state transitions
- private liveQueryCollection?: Collection
+ public liveQueryCollection?: Collection
private windowFn: ((options: WindowOptions) => void) | undefined
@@ -189,13 +191,32 @@ export class CollectionConfigBuilder<
}
}
- setWindow(options: WindowOptions) {
+ setWindow(options: WindowOptions): true | Promise {
if (!this.windowFn) {
throw new SetWindowRequiresOrderByError()
}
this.windowFn(options)
this.maybeRunGraphFn?.()
+
+ // Check if loading a subset was triggered
+ if (this.liveQueryCollection?.isLoadingSubset) {
+ // Loading was triggered, return a promise that resolves when it completes
+ return new Promise((resolve) => {
+ const unsubscribe = this.liveQueryCollection!.on(
+ `loadingSubset:change`,
+ (event) => {
+ if (!event.isLoadingSubset) {
+ unsubscribe()
+ resolve()
+ }
+ }
+ )
+ })
+ }
+
+ // No loading was triggered
+ return true
}
/**
@@ -475,15 +496,15 @@ export class CollectionConfigBuilder<
}
)
- const loadMoreDataCallbacks = this.subscribeToAllCollections(
+ const loadSubsetDataCallbacks = this.subscribeToAllCollections(
config,
fullSyncState
)
- this.maybeRunGraphFn = () => this.scheduleGraphRun(loadMoreDataCallbacks)
+ this.maybeRunGraphFn = () => this.scheduleGraphRun(loadSubsetDataCallbacks)
// Initial run with callback to load more data if needed
- this.scheduleGraphRun(loadMoreDataCallbacks)
+ this.scheduleGraphRun(loadSubsetDataCallbacks)
// Return the unsubscribe function
return () => {
@@ -792,7 +813,7 @@ export class CollectionConfigBuilder<
// Combine all loaders into a single callback that initiates loading more data
// from any source that needs it. Returns true once all loaders have been called,
// but the actual async loading may still be in progress.
- const loadMoreDataCallback = () => {
+ const loadSubsetDataCallbacks = () => {
loaders.map((loader) => loader())
return true
}
@@ -804,7 +825,7 @@ export class CollectionConfigBuilder<
// Initial status check after all subscriptions are set up
this.updateLiveQueryStatus(config)
- return loadMoreDataCallback
+ return loadSubsetDataCallbacks
}
}
diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts
index 0abed0022..5835a2f0e 100644
--- a/packages/db/src/query/live/collection-subscriber.ts
+++ b/packages/db/src/query/live/collection-subscriber.ts
@@ -24,6 +24,12 @@ export class CollectionSubscriber<
// Keep track of the biggest value we've sent so far (needed for orderBy optimization)
private biggest: any = undefined
+ // Track deferred promises for subscription loading states
+ private subscriptionLoadingPromises = new Map<
+ CollectionSubscription,
+ { resolve: () => void }
+ >()
+
constructor(
private alias: string,
private collectionId: string,
@@ -66,7 +72,51 @@ export class CollectionSubscriber<
includeInitialState
)
}
+
+ // Subscribe to subscription status changes to propagate loading state
+ const statusUnsubscribe = subscription.on(`status:change`, (event) => {
+ // TODO: For now we are setting this loading state whenever the subscription
+ // status changes to 'loadingSubset'. But we have discussed it only happening
+ // when the the live query has it's offset/limit changed, and that triggers the
+ // subscription to request a snapshot. This will require more work to implement,
+ // and builds on https://github.com/TanStack/db/pull/663 which this PR
+ // does not yet depend on.
+ if (event.status === `loadingSubset`) {
+ // Guard against duplicate transitions
+ if (!this.subscriptionLoadingPromises.has(subscription)) {
+ let resolve: () => void
+ const promise = new Promise((res) => {
+ resolve = res
+ })
+
+ this.subscriptionLoadingPromises.set(subscription, {
+ resolve: resolve!,
+ })
+ this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise(
+ promise
+ )
+ }
+ } else {
+ // status is 'ready'
+ const deferred = this.subscriptionLoadingPromises.get(subscription)
+ if (deferred) {
+ // Clear the map entry FIRST (before resolving)
+ this.subscriptionLoadingPromises.delete(subscription)
+ deferred.resolve()
+ }
+ }
+ })
+
const unsubscribe = () => {
+ // If subscription has a pending promise, resolve it before unsubscribing
+ const deferred = this.subscriptionLoadingPromises.get(subscription)
+ if (deferred) {
+ // Clear the map entry FIRST (before resolving)
+ this.subscriptionLoadingPromises.delete(subscription)
+ deferred.resolve()
+ }
+
+ statusUnsubscribe()
subscription.unsubscribe()
}
// currentSyncState is always defined when subscribe() is called
diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts
index 5ce991a83..73c1fc4fe 100644
--- a/packages/db/src/types.ts
+++ b/packages/db/src/types.ts
@@ -3,6 +3,7 @@ import type { Collection } from "./collection/index.js"
import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { Transaction } from "./transactions"
import type { BasicExpression, OrderBy } from "./query/ir.js"
+import type { EventEmitter } from "./event-emitter.js"
/**
* Helper type to extract the output type from a standard schema
@@ -150,17 +151,83 @@ export type Row = Record>
export type OperationType = `insert` | `update` | `delete`
-export type OnLoadMoreOptions = {
+/**
+ * Subscription status values
+ */
+export type SubscriptionStatus = `ready` | `loadingSubset`
+
+/**
+ * Event emitted when subscription status changes
+ */
+export interface SubscriptionStatusChangeEvent {
+ type: `status:change`
+ subscription: Subscription
+ previousStatus: SubscriptionStatus
+ status: SubscriptionStatus
+}
+
+/**
+ * Event emitted when subscription status changes to a specific status
+ */
+export interface SubscriptionStatusEvent {
+ type: `status:${T}`
+ subscription: Subscription
+ previousStatus: SubscriptionStatus
+ status: T
+}
+
+/**
+ * Event emitted when subscription is unsubscribed
+ */
+export interface SubscriptionUnsubscribedEvent {
+ type: `unsubscribed`
+ subscription: Subscription
+}
+
+/**
+ * All subscription events
+ */
+export type SubscriptionEvents = {
+ "status:change": SubscriptionStatusChangeEvent
+ "status:ready": SubscriptionStatusEvent<`ready`>
+ "status:loadingSubset": SubscriptionStatusEvent<`loadingSubset`>
+ unsubscribed: SubscriptionUnsubscribedEvent
+}
+
+/**
+ * Public interface for a collection subscription
+ * Used by sync implementations to track subscription lifecycle
+ */
+export interface Subscription extends EventEmitter {
+ /** Current status of the subscription */
+ readonly status: SubscriptionStatus
+}
+
+export type LoadSubsetOptions = {
+ /** The where expression to filter the data */
where?: BasicExpression
+ /** The order by clause to sort the data */
orderBy?: OrderBy
+ /** The limit of the data to load */
limit?: number
+ /**
+ * The subscription that triggered the load.
+ * Advanced sync implementations can use this for:
+ * - LRU caching keyed by subscription
+ * - Reference counting to track active subscriptions
+ * - Subscribing to subscription events (e.g., finalization/unsubscribe)
+ * @optional Available when called from CollectionSubscription, may be undefined for direct calls
+ */
+ subscription?: Subscription
}
+export type LoadSubsetFn = (options: LoadSubsetOptions) => true | Promise
+
export type CleanupFn = () => void
export type SyncConfigRes = {
cleanup?: CleanupFn
- onLoadMore?: (options: OnLoadMoreOptions) => void | Promise
+ loadSubset?: LoadSubsetFn
}
export interface SyncConfig<
T extends object = Record,
@@ -313,6 +380,8 @@ export type CollectionStatus =
/** Collection has been cleaned up and resources freed */
| `cleaned-up`
+export type SyncMode = `eager` | `on-demand`
+
export interface BaseCollectionConfig<
T extends object = Record,
TKey extends string | number = string | number,
@@ -374,6 +443,15 @@ export interface BaseCollectionConfig<
* compare: (x, y) => x.createdAt.getTime() - y.createdAt.getTime()
*/
compare?: (x: T, y: T) => number
+ /**
+ * The mode of sync to use for the collection.
+ * @default `eager`
+ * @description
+ * - `eager`: syncs all data immediately on preload
+ * - `on-demand`: syncs data in incremental snapshots when the collection is queried
+ * The exact implementation of the sync mode is up to the sync implementation.
+ */
+ syncMode?: SyncMode
/**
* Optional asynchronous handler function called before an insert operation
* @param params Object containing transaction and collection information
diff --git a/packages/db/tests/collection-subscription.test.ts b/packages/db/tests/collection-subscription.test.ts
new file mode 100644
index 000000000..0b9e09384
--- /dev/null
+++ b/packages/db/tests/collection-subscription.test.ts
@@ -0,0 +1,276 @@
+import { describe, expect, it } from "vitest"
+import { createCollection } from "../src/collection/index.js"
+import { flushPromises } from "./utils"
+
+describe(`CollectionSubscription status tracking`, () => {
+ it(`subscription starts with status 'ready'`, () => {
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ },
+ },
+ })
+
+ const subscription = collection.subscribeChanges(() => {})
+
+ expect(subscription.status).toBe(`ready`)
+ subscription.unsubscribe()
+ })
+
+ it(`status changes to 'loadingSubset' when requestSnapshot triggers a promise`, async () => {
+ let resolveLoadSubset: () => void
+ const loadSubsetPromise = new Promise((resolve) => {
+ resolveLoadSubset = resolve
+ })
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => loadSubsetPromise,
+ }
+ },
+ },
+ })
+
+ const subscription = collection.subscribeChanges(() => {}, {
+ includeInitialState: false,
+ })
+
+ expect(subscription.status).toBe(`ready`)
+
+ // Trigger a snapshot request that will call loadSubset
+ subscription.requestSnapshot({ optimizedOnly: false })
+
+ // Status should now be loadingSubset
+ expect(subscription.status).toBe(`loadingSubset`)
+
+ // Resolve the load more promise
+ resolveLoadSubset!()
+ await flushPromises()
+
+ // Status should be back to ready
+ expect(subscription.status).toBe(`ready`)
+
+ subscription.unsubscribe()
+ })
+
+ it(`status changes back to 'ready' when promise resolves`, async () => {
+ let resolveLoadSubset: () => void
+ const loadSubsetPromise = new Promise((resolve) => {
+ resolveLoadSubset = resolve
+ })
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => loadSubsetPromise,
+ }
+ },
+ },
+ })
+
+ const subscription = collection.subscribeChanges(() => {}, {
+ includeInitialState: false,
+ })
+
+ subscription.requestSnapshot({ optimizedOnly: false })
+ expect(subscription.status).toBe(`loadingSubset`)
+
+ resolveLoadSubset!()
+ await flushPromises()
+
+ expect(subscription.status).toBe(`ready`)
+ subscription.unsubscribe()
+ })
+
+ it(`concurrent promises keep status as 'loadingSubset' until all resolve`, async () => {
+ let resolveLoadSubset1: () => void
+ let resolveLoadSubset2: () => void
+ let callCount = 0
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => {
+ callCount++
+ if (callCount === 1) {
+ return new Promise((resolve) => {
+ resolveLoadSubset1 = resolve
+ })
+ } else {
+ return new Promise((resolve) => {
+ resolveLoadSubset2 = resolve
+ })
+ }
+ },
+ }
+ },
+ },
+ })
+
+ const subscription = collection.subscribeChanges(() => {}, {
+ includeInitialState: false,
+ })
+
+ // Trigger first load
+ subscription.requestSnapshot({ optimizedOnly: false })
+ expect(subscription.status).toBe(`loadingSubset`)
+
+ // Trigger second load
+ subscription.requestSnapshot({ optimizedOnly: false })
+ expect(subscription.status).toBe(`loadingSubset`)
+
+ // Resolve first promise
+ resolveLoadSubset1!()
+ await flushPromises()
+
+ // Should still be loading because second promise is pending
+ expect(subscription.status).toBe(`loadingSubset`)
+
+ // Resolve second promise
+ resolveLoadSubset2!()
+ await flushPromises()
+
+ // Now should be ready
+ expect(subscription.status).toBe(`ready`)
+ subscription.unsubscribe()
+ })
+
+ it(`emits 'status:change' event`, async () => {
+ let resolveLoadSubset: () => void
+ const loadSubsetPromise = new Promise((resolve) => {
+ resolveLoadSubset = resolve
+ })
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => loadSubsetPromise,
+ }
+ },
+ },
+ })
+
+ const subscription = collection.subscribeChanges(() => {}, {
+ includeInitialState: false,
+ })
+
+ const statusChanges: Array<{ previous: string; current: string }> = []
+
+ subscription.on(`status:change`, (event) => {
+ statusChanges.push({
+ previous: event.previousStatus,
+ current: event.status,
+ })
+ })
+
+ subscription.requestSnapshot({ optimizedOnly: false })
+ await flushPromises()
+
+ expect(statusChanges).toHaveLength(1)
+ expect(statusChanges[0]).toEqual({
+ previous: `ready`,
+ current: `loadingSubset`,
+ })
+
+ resolveLoadSubset!()
+ await flushPromises()
+
+ expect(statusChanges).toHaveLength(2)
+ expect(statusChanges[1]).toEqual({
+ previous: `loadingSubset`,
+ current: `ready`,
+ })
+
+ subscription.unsubscribe()
+ })
+
+ it(`promise rejection still cleans up and sets status back to 'ready'`, async () => {
+ let rejectLoadSubset: (error: Error) => void
+ const loadSubsetPromise = new Promise((_, reject) => {
+ rejectLoadSubset = reject
+ })
+ // Attach catch handler before rejecting to avoid unhandled rejection
+ const handledPromise = loadSubsetPromise.catch(() => {})
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => handledPromise,
+ }
+ },
+ },
+ })
+
+ const subscription = collection.subscribeChanges(() => {}, {
+ includeInitialState: false,
+ })
+
+ subscription.requestSnapshot({ optimizedOnly: false })
+ expect(subscription.status).toBe(`loadingSubset`)
+
+ // Reject the promise
+ rejectLoadSubset!(new Error(`Load failed`))
+ await flushPromises()
+
+ // Status should still be back to ready
+ expect(subscription.status).toBe(`ready`)
+ subscription.unsubscribe()
+ })
+
+ it(`unsubscribe clears event listeners`, () => {
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ },
+ },
+ })
+
+ const subscription = collection.subscribeChanges(() => {}, {
+ includeInitialState: false,
+ })
+
+ let eventCount = 0
+ subscription.on(`status:change`, () => {
+ eventCount++
+ })
+
+ subscription.unsubscribe()
+
+ // After unsubscribe, listeners should be cleared
+ // We can't easily verify this without accessing private members,
+ // but we can at least verify unsubscribe doesn't throw
+ expect(eventCount).toBe(0)
+ })
+})
diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts
index 77b74ec75..a5a94fb06 100644
--- a/packages/db/tests/collection.test.ts
+++ b/packages/db/tests/collection.test.ts
@@ -1356,3 +1356,239 @@ describe(`Collection`, () => {
expect(state.size).toBe(3)
})
})
+
+describe(`Collection isLoadingSubset property`, () => {
+ it(`isLoadingSubset is false initially`, () => {
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ },
+ },
+ })
+
+ expect(collection.isLoadingSubset).toBe(false)
+ })
+
+ it(`isLoadingSubset becomes true when loadSubset returns a promise`, async () => {
+ let resolveLoadSubset: () => void
+ const loadSubsetPromise = new Promise((resolve) => {
+ resolveLoadSubset = resolve
+ })
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => loadSubsetPromise,
+ }
+ },
+ },
+ })
+
+ expect(collection.isLoadingSubset).toBe(false)
+
+ collection._sync.loadSubset({})
+ expect(collection.isLoadingSubset).toBe(true)
+
+ resolveLoadSubset!()
+ await flushPromises()
+
+ expect(collection.isLoadingSubset).toBe(false)
+ })
+
+ it(`isLoadingSubset becomes false when promise resolves`, async () => {
+ let resolveLoadSubset: () => void
+ const loadSubsetPromise = new Promise((resolve) => {
+ resolveLoadSubset = resolve
+ })
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => loadSubsetPromise,
+ }
+ },
+ },
+ })
+
+ collection._sync.loadSubset({})
+ expect(collection.isLoadingSubset).toBe(true)
+
+ resolveLoadSubset!()
+ await flushPromises()
+
+ expect(collection.isLoadingSubset).toBe(false)
+ })
+
+ it(`concurrent loadSubset calls keep isLoadingSubset true until all resolve`, async () => {
+ let resolveLoadSubset1: () => void
+ let resolveLoadSubset2: () => void
+ let callCount = 0
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => {
+ callCount++
+ if (callCount === 1) {
+ return new Promise((resolve) => {
+ resolveLoadSubset1 = resolve
+ })
+ } else {
+ return new Promise((resolve) => {
+ resolveLoadSubset2 = resolve
+ })
+ }
+ },
+ }
+ },
+ },
+ })
+
+ collection._sync.loadSubset({})
+ collection._sync.loadSubset({})
+
+ expect(collection.isLoadingSubset).toBe(true)
+
+ resolveLoadSubset1!()
+ await flushPromises()
+
+ // Should still be loading because second promise is pending
+ expect(collection.isLoadingSubset).toBe(true)
+
+ resolveLoadSubset2!()
+ await flushPromises()
+
+ // Now should be false
+ expect(collection.isLoadingSubset).toBe(false)
+ })
+
+ it(`emits loadingSubset:change event`, async () => {
+ let resolveLoadSubset: () => void
+ const loadSubsetPromise = new Promise((resolve) => {
+ resolveLoadSubset = resolve
+ })
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => loadSubsetPromise,
+ }
+ },
+ },
+ })
+
+ const loadingChanges: Array<{
+ isLoadingSubset: boolean
+ previousIsLoadingSubset: boolean
+ }> = []
+
+ collection.on(`loadingSubset:change`, (event) => {
+ loadingChanges.push({
+ isLoadingSubset: event.isLoadingSubset,
+ previousIsLoadingSubset: event.previousIsLoadingSubset,
+ })
+ })
+
+ collection._sync.loadSubset({})
+ await flushPromises()
+
+ expect(loadingChanges).toHaveLength(1)
+ expect(loadingChanges[0]).toEqual({
+ isLoadingSubset: true,
+ previousIsLoadingSubset: false,
+ })
+
+ resolveLoadSubset!()
+ await flushPromises()
+
+ expect(loadingChanges).toHaveLength(2)
+ expect(loadingChanges[1]).toEqual({
+ isLoadingSubset: false,
+ previousIsLoadingSubset: true,
+ })
+ })
+
+ it(`rejected promises still clean up`, async () => {
+ let rejectLoadSubset: (error: Error) => void
+ const loadSubsetPromise = new Promise((_, reject) => {
+ rejectLoadSubset = reject
+ })
+ // Attach catch handler before rejecting to avoid unhandled rejection
+ const handledPromise = loadSubsetPromise.catch(() => {})
+
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => handledPromise,
+ }
+ },
+ },
+ })
+
+ collection._sync.loadSubset({})
+ expect(collection.isLoadingSubset).toBe(true)
+
+ // Reject the promise
+ rejectLoadSubset!(new Error(`Load failed`))
+ await flushPromises()
+
+ expect(collection.isLoadingSubset).toBe(false)
+ })
+
+ it(`isLoadingSubset stays false when loadSubset returns true (no work to do)`, () => {
+ const collection = createCollection<{ id: string; value: string }>({
+ id: `test`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ return {
+ loadSubset: () => true, // No work to do
+ }
+ },
+ },
+ })
+
+ expect(collection.isLoadingSubset).toBe(false)
+
+ // Call loadSubset - it should return true and not track any promise
+ const result = collection._sync.loadSubset({})
+ expect(result).toBe(true)
+ expect(collection.isLoadingSubset).toBe(false)
+ })
+})
diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts
index 7e7a31b20..87f407f2c 100644
--- a/packages/db/tests/query/live-query-collection.test.ts
+++ b/packages/db/tests/query/live-query-collection.test.ts
@@ -939,6 +939,95 @@ describe(`createLiveQueryCollection`, () => {
})
})
+ describe(`isLoadingSubset integration`, () => {
+ it(`live query result collection has isLoadingSubset property`, async () => {
+ const sourceCollection = createCollection<{ id: string; value: string }>({
+ id: `source`,
+ getKey: (item) => item.id,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ },
+ },
+ })
+
+ const liveQuery = createLiveQueryCollection((q) =>
+ q.from({ item: sourceCollection })
+ )
+
+ await liveQuery.preload()
+
+ expect(liveQuery.isLoadingSubset).toBeDefined()
+ expect(liveQuery.isLoadingSubset).toBe(false)
+ })
+
+ it(`isLoadingSubset property exists and starts as false`, async () => {
+ const sourceCollection = createCollection<{ id: string; value: string }>({
+ id: `source`,
+ getKey: (item) => item.id,
+ sync: {
+ sync: ({ markReady }) => {
+ markReady()
+ },
+ },
+ })
+
+ const liveQuery = createLiveQueryCollection({
+ query: (q) => q.from({ item: sourceCollection }),
+ startSync: true,
+ })
+
+ await liveQuery.preload()
+
+ expect(liveQuery.isLoadingSubset).toBe(false)
+ })
+
+ it(`source collection isLoadingSubset is independent`, async () => {
+ let resolveLoadSubset: () => void
+ const loadSubsetPromise = new Promise((resolve) => {
+ resolveLoadSubset = resolve
+ })
+
+ const sourceCollection = createCollection<{ id: string; value: number }>({
+ id: `source`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ sync: {
+ sync: ({ markReady, begin, write, commit }) => {
+ begin()
+ write({ type: `insert`, value: { id: `1`, value: 1 } })
+ commit()
+ markReady()
+ return {
+ loadSubset: () => loadSubsetPromise,
+ }
+ },
+ },
+ })
+
+ const liveQuery = createLiveQueryCollection({
+ query: (q) => q.from({ item: sourceCollection }),
+ startSync: true,
+ })
+
+ await liveQuery.preload()
+
+ // Calling loadSubset directly on source collection sets its own isLoadingSubset
+ sourceCollection._sync.loadSubset({})
+ expect(sourceCollection.isLoadingSubset).toBe(true)
+
+ // But live query isLoadingSubset tracks subscription-driven loads, not direct loadSubset calls
+ // so it remains false unless subscriptions trigger loads via predicate pushdown
+ expect(liveQuery.isLoadingSubset).toBe(false)
+
+ resolveLoadSubset!()
+ await new Promise((resolve) => setTimeout(resolve, 10))
+
+ expect(sourceCollection.isLoadingSubset).toBe(false)
+ expect(liveQuery.isLoadingSubset).toBe(false)
+ })
+ })
+
describe(`move functionality`, () => {
it(`should support moving orderBy window past current window using move function`, async () => {
// Create a collection with more users for testing window movement
@@ -1462,5 +1551,159 @@ describe(`createLiveQueryCollection`, () => {
`Post F`,
])
})
+
+ it(`setWindow returns true when no subset loading is triggered`, async () => {
+ const extendedUsers = createCollection(
+ mockSyncCollectionOptions({
+ id: `users-no-loading`,
+ getKey: (user) => user.id,
+ initialData: [
+ { id: 1, name: `Alice`, active: true },
+ { id: 2, name: `Bob`, active: true },
+ { id: 3, name: `Charlie`, active: true },
+ ],
+ })
+ )
+
+ const activeUsers = createLiveQueryCollection((q) =>
+ q
+ .from({ user: extendedUsers })
+ .where(({ user }) => eq(user.active, true))
+ .orderBy(({ user }) => user.name, `asc`)
+ .limit(2)
+ )
+
+ await activeUsers.preload()
+
+ // setWindow should return true when no loading is triggered
+ const result = activeUsers.utils.setWindow({ offset: 1, limit: 2 })
+ expect(result).toBe(true)
+ })
+
+ it(`setWindow returns and resolves a Promise when async loading is triggered`, async () => {
+ // This is an integration test that validates the full async flow:
+ // 1. setWindow triggers loading more data
+ // 2. Returns a Promise (not true)
+ // 3. The Promise waits for loading to complete
+ // 4. The Promise resolves once loading is done
+
+ vi.useFakeTimers()
+
+ try {
+ let loadSubsetCallCount = 0
+
+ const sourceCollection = createCollection<{
+ id: number
+ value: number
+ }>({
+ id: `source-async-subset-loading`,
+ getKey: (item) => item.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady, begin, write, commit }) => {
+ // Provide minimal initial data
+ begin()
+ write({ type: `insert`, value: { id: 1, value: 1 } })
+ write({ type: `insert`, value: { id: 2, value: 2 } })
+ write({ type: `insert`, value: { id: 3, value: 3 } })
+ commit()
+ markReady()
+
+ return {
+ loadSubset: () => {
+ loadSubsetCallCount++
+
+ // First call is for the initial window request
+ if (loadSubsetCallCount === 1) {
+ return true
+ }
+
+ // Second call (triggered by setWindow) returns a promise
+ const loadPromise = new Promise((resolve) => {
+ // Simulate async data loading with a delay
+ setTimeout(() => {
+ begin()
+ // Load additional items that would be needed for the new window
+ write({ type: `insert`, value: { id: 4, value: 4 } })
+ write({ type: `insert`, value: { id: 5, value: 5 } })
+ write({ type: `insert`, value: { id: 6, value: 6 } })
+ commit()
+ resolve()
+ }, 50)
+ })
+
+ return loadPromise
+ },
+ }
+ },
+ },
+ })
+
+ const liveQuery = createLiveQueryCollection({
+ query: (q) =>
+ q
+ .from({ item: sourceCollection })
+ .orderBy(({ item }) => item.value, `asc`)
+ .limit(2)
+ .offset(0),
+ startSync: true,
+ })
+
+ await liveQuery.preload()
+
+ // Initial state: should have 2 items (values 1, 2)
+ expect(liveQuery.size).toBe(2)
+ expect(liveQuery.isLoadingSubset).toBe(false)
+ expect(loadSubsetCallCount).toBe(1)
+
+ // Move window to offset 3, which requires loading more data
+ // This should trigger loadSubset and return a Promise
+ const result = liveQuery.utils.setWindow({ offset: 3, limit: 2 })
+
+ // CRITICAL VALIDATION: result should be a Promise, not true
+ expect(result).toBeInstanceOf(Promise)
+ expect(result).not.toBe(true)
+
+ // Advance just a bit to let the scheduler execute and trigger loadSubset
+ await vi.advanceTimersByTimeAsync(1)
+
+ // Verify that loading was triggered and is in progress
+ expect(loadSubsetCallCount).toBeGreaterThan(1)
+ expect(liveQuery.isLoadingSubset).toBe(true)
+
+ // Track when the promise resolves
+ let promiseResolved = false
+ if (result !== true) {
+ result.then(() => {
+ promiseResolved = true
+ })
+ }
+
+ // Promise should NOT be resolved yet because loading is still in progress
+ await vi.advanceTimersByTimeAsync(10)
+ expect(promiseResolved).toBe(false)
+ expect(liveQuery.isLoadingSubset).toBe(true)
+
+ // Now advance time to complete the loading (50ms total from loadSubset call)
+ await vi.advanceTimersByTimeAsync(40)
+
+ // Wait for the promise to resolve
+ if (result !== true) {
+ await result
+ }
+
+ // CRITICAL VALIDATION: Promise has resolved and loading is complete
+ expect(promiseResolved).toBe(true)
+ expect(liveQuery.isLoadingSubset).toBe(false)
+
+ // Verify the window was successfully moved and has the right data
+ expect(liveQuery.size).toBe(2)
+ const items = liveQuery.toArray
+ expect(items.map((i) => i.value)).toEqual([4, 5])
+ } finally {
+ vi.useRealTimers()
+ }
+ })
})
})
diff --git a/packages/react-db/src/index.ts b/packages/react-db/src/index.ts
index bd98349f0..bb3cd3ad5 100644
--- a/packages/react-db/src/index.ts
+++ b/packages/react-db/src/index.ts
@@ -1,5 +1,6 @@
// Re-export all public APIs
export * from "./useLiveQuery"
+export * from "./useLiveInfiniteQuery"
// Re-export everything from @tanstack/db
export * from "@tanstack/db"
diff --git a/packages/react-db/src/useLiveInfiniteQuery.ts b/packages/react-db/src/useLiveInfiniteQuery.ts
new file mode 100644
index 000000000..1e44fd16e
--- /dev/null
+++ b/packages/react-db/src/useLiveInfiniteQuery.ts
@@ -0,0 +1,185 @@
+import { useCallback, useEffect, useMemo, useRef, useState } from "react"
+import { useLiveQuery } from "./useLiveQuery"
+import type {
+ Context,
+ InferResultType,
+ InitialQueryBuilder,
+ LiveQueryCollectionUtils,
+ QueryBuilder,
+} from "@tanstack/db"
+
+/**
+ * Type guard to check if utils object has setWindow method (LiveQueryCollectionUtils)
+ */
+function isLiveQueryCollectionUtils(
+ utils: unknown
+): utils is LiveQueryCollectionUtils {
+ return typeof (utils as any).setWindow === `function`
+}
+
+export type UseLiveInfiniteQueryConfig = {
+ pageSize?: number
+ initialPageParam?: number
+ getNextPageParam: (
+ lastPage: Array[number]>,
+ allPages: Array[number]>>,
+ lastPageParam: number,
+ allPageParams: Array
+ ) => number | undefined
+}
+
+export type UseLiveInfiniteQueryReturn = Omit<
+ ReturnType>,
+ `data`
+> & {
+ data: InferResultType
+ pages: Array[number]>>
+ pageParams: Array
+ fetchNextPage: () => void
+ hasNextPage: boolean
+ isFetchingNextPage: boolean
+}
+
+/**
+ * Create an infinite query using a query function with live updates
+ *
+ * Uses `utils.setWindow()` to dynamically adjust the limit/offset window
+ * without recreating the live query collection on each page change.
+ *
+ * @param queryFn - Query function that defines what data to fetch. Must include `.orderBy()` for setWindow to work.
+ * @param config - Configuration including pageSize and getNextPageParam
+ * @param deps - Array of dependencies that trigger query re-execution when changed
+ * @returns Object with pages, data, and pagination controls
+ *
+ * @example
+ * // Basic infinite query
+ * const { data, pages, fetchNextPage, hasNextPage } = useLiveInfiniteQuery(
+ * (q) => q
+ * .from({ posts: postsCollection })
+ * .orderBy(({ posts }) => posts.createdAt, 'desc')
+ * .select(({ posts }) => ({
+ * id: posts.id,
+ * title: posts.title
+ * })),
+ * {
+ * pageSize: 20,
+ * getNextPageParam: (lastPage, allPages) =>
+ * lastPage.length === 20 ? allPages.length : undefined
+ * }
+ * )
+ *
+ * @example
+ * // With dependencies
+ * const { pages, fetchNextPage } = useLiveInfiniteQuery(
+ * (q) => q
+ * .from({ posts: postsCollection })
+ * .where(({ posts }) => eq(posts.category, category))
+ * .orderBy(({ posts }) => posts.createdAt, 'desc'),
+ * {
+ * pageSize: 10,
+ * getNextPageParam: (lastPage) =>
+ * lastPage.length === 10 ? lastPage.length : undefined
+ * },
+ * [category]
+ * )
+ */
+export function useLiveInfiniteQuery(
+ queryFn: (q: InitialQueryBuilder) => QueryBuilder,
+ config: UseLiveInfiniteQueryConfig,
+ deps: Array = []
+): UseLiveInfiniteQueryReturn {
+ const pageSize = config.pageSize || 20
+ const initialPageParam = config.initialPageParam ?? 0
+
+ // Track how many pages have been loaded
+ const [loadedPageCount, setLoadedPageCount] = useState(1)
+ const [isFetchingNextPage, setIsFetchingNextPage] = useState(false)
+
+ // Stringify deps for comparison
+ const depsKey = JSON.stringify(deps)
+ const prevDepsKeyRef = useRef(depsKey)
+
+ // Reset page count when dependencies change
+ useEffect(() => {
+ if (prevDepsKeyRef.current !== depsKey) {
+ setLoadedPageCount(1)
+ prevDepsKeyRef.current = depsKey
+ }
+ }, [depsKey])
+
+ // Create a live query with initial limit and offset
+ // The query function is wrapped to add limit/offset to the query
+ const queryResult = useLiveQuery(
+ (q) => queryFn(q).limit(pageSize).offset(0),
+ deps
+ )
+
+ // Update the window when loadedPageCount changes
+ // We fetch one extra item to peek if there's a next page
+ useEffect(() => {
+ const newLimit = loadedPageCount * pageSize + 1 // +1 to peek ahead
+ const utils = queryResult.collection.utils
+ // setWindow is available on live query collections with orderBy
+ if (isLiveQueryCollectionUtils(utils)) {
+ const result = utils.setWindow({ offset: 0, limit: newLimit })
+ // setWindow returns true if data is immediately available, or Promise if loading
+ if (result !== true) {
+ setIsFetchingNextPage(true)
+ result.then(() => {
+ setIsFetchingNextPage(false)
+ })
+ } else {
+ setIsFetchingNextPage(false)
+ }
+ }
+ }, [loadedPageCount, pageSize, queryResult.collection])
+
+ // Split the data array into pages and determine if there's a next page
+ const { pages, pageParams, hasNextPage, flatData } = useMemo(() => {
+ const dataArray = queryResult.data as InferResultType
+ const totalItemsRequested = loadedPageCount * pageSize
+
+ // Check if we have more data than requested (the peek ahead item)
+ const hasMore = dataArray.length > totalItemsRequested
+
+ // Build pages array (without the peek ahead item)
+ const pagesResult: Array[number]>> = []
+ const pageParamsResult: Array = []
+
+ for (let i = 0; i < loadedPageCount; i++) {
+ const pageData = dataArray.slice(i * pageSize, (i + 1) * pageSize)
+ pagesResult.push(pageData)
+ pageParamsResult.push(initialPageParam + i)
+ }
+
+ // Flatten the pages for the data return (without peek ahead item)
+ const flatDataResult = dataArray.slice(
+ 0,
+ totalItemsRequested
+ ) as InferResultType
+
+ return {
+ pages: pagesResult,
+ pageParams: pageParamsResult,
+ hasNextPage: hasMore,
+ flatData: flatDataResult,
+ }
+ }, [queryResult.data, loadedPageCount, pageSize, initialPageParam])
+
+ // Fetch next page
+ const fetchNextPage = useCallback(() => {
+ if (!hasNextPage || isFetchingNextPage) return
+
+ setLoadedPageCount((prev) => prev + 1)
+ }, [hasNextPage, isFetchingNextPage])
+
+ return {
+ ...queryResult,
+ data: flatData,
+ pages,
+ pageParams,
+ fetchNextPage,
+ hasNextPage,
+ isFetchingNextPage,
+ }
+}
diff --git a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx
new file mode 100644
index 000000000..0a8c3eeca
--- /dev/null
+++ b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx
@@ -0,0 +1,967 @@
+import { describe, expect, it } from "vitest"
+import { act, renderHook, waitFor } from "@testing-library/react"
+import { createCollection, eq } from "@tanstack/db"
+import { useLiveInfiniteQuery } from "../src/useLiveInfiniteQuery"
+import { mockSyncCollectionOptions } from "../../db/tests/utils"
+
+type Post = {
+ id: string
+ title: string
+ content: string
+ createdAt: number
+ category: string
+}
+
+const createMockPosts = (count: number): Array => {
+ const posts: Array = []
+ for (let i = 1; i <= count; i++) {
+ posts.push({
+ id: `${i}`,
+ title: `Post ${i}`,
+ content: `Content ${i}`,
+ createdAt: 1000000 - i * 1000, // Descending order
+ category: i % 2 === 0 ? `tech` : `life`,
+ })
+ }
+ return posts
+}
+
+describe(`useLiveInfiniteQuery`, () => {
+ it(`should fetch initial page of data`, async () => {
+ const posts = createMockPosts(50)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `initial-page-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`)
+ .select(({ posts: p }) => ({
+ id: p.id,
+ title: p.title,
+ createdAt: p.createdAt,
+ })),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Should have 1 page initially
+ expect(result.current.pages).toHaveLength(1)
+ expect(result.current.pages[0]).toHaveLength(10)
+
+ // Data should be flattened
+ expect(result.current.data).toHaveLength(10)
+
+ // Should have next page since we have 50 items total
+ expect(result.current.hasNextPage).toBe(true)
+
+ // First item should be Post 1 (most recent by createdAt)
+ expect(result.current.pages[0]![0]).toMatchObject({
+ id: `1`,
+ title: `Post 1`,
+ })
+ })
+
+ it(`should fetch multiple pages`, async () => {
+ const posts = createMockPosts(50)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `multiple-pages-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Initially 1 page
+ expect(result.current.pages).toHaveLength(1)
+ expect(result.current.hasNextPage).toBe(true)
+
+ // Fetch next page
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ expect(result.current.pages[0]).toHaveLength(10)
+ expect(result.current.pages[1]).toHaveLength(10)
+ expect(result.current.data).toHaveLength(20)
+ expect(result.current.hasNextPage).toBe(true)
+
+ // Fetch another page
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(3)
+ })
+
+ expect(result.current.data).toHaveLength(30)
+ expect(result.current.hasNextPage).toBe(true)
+ })
+
+ it(`should detect when no more pages available`, async () => {
+ const posts = createMockPosts(25)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `no-more-pages-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Page 1: 10 items, has more
+ expect(result.current.pages).toHaveLength(1)
+ expect(result.current.hasNextPage).toBe(true)
+
+ // Fetch page 2
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ // Page 2: 10 items, has more
+ expect(result.current.pages[1]).toHaveLength(10)
+ expect(result.current.hasNextPage).toBe(true)
+
+ // Fetch page 3
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(3)
+ })
+
+ // Page 3: 5 items, no more
+ expect(result.current.pages[2]).toHaveLength(5)
+ expect(result.current.data).toHaveLength(25)
+ expect(result.current.hasNextPage).toBe(false)
+ })
+
+ it(`should handle empty results`, async () => {
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `empty-results-test`,
+ getKey: (post: Post) => post.id,
+ initialData: [],
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // With no data, we still have 1 page (which is empty)
+ expect(result.current.pages).toHaveLength(1)
+ expect(result.current.pages[0]).toHaveLength(0)
+ expect(result.current.data).toHaveLength(0)
+ expect(result.current.hasNextPage).toBe(false)
+ })
+
+ it(`should update pages when underlying data changes`, async () => {
+ const posts = createMockPosts(30)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `live-updates-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Fetch 2 pages
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ expect(result.current.data).toHaveLength(20)
+
+ // Insert a new post with most recent timestamp
+ act(() => {
+ collection.utils.begin()
+ collection.utils.write({
+ type: `insert`,
+ value: {
+ id: `new-1`,
+ title: `New Post`,
+ content: `New Content`,
+ createdAt: 1000001, // Most recent
+ category: `tech`,
+ },
+ })
+ collection.utils.commit()
+ })
+
+ await waitFor(() => {
+ // New post should be first
+ expect(result.current.pages[0]![0]).toMatchObject({
+ id: `new-1`,
+ title: `New Post`,
+ })
+ })
+
+ // Still showing 2 pages (20 items), but content has shifted
+ // The new item is included, pushing the last item out of view
+ expect(result.current.pages).toHaveLength(2)
+ expect(result.current.data).toHaveLength(20)
+ expect(result.current.pages[0]).toHaveLength(10)
+ expect(result.current.pages[1]).toHaveLength(10)
+ })
+
+ it(`should handle deletions across pages`, async () => {
+ const posts = createMockPosts(25)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `deletions-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Fetch 2 pages
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ expect(result.current.data).toHaveLength(20)
+ const firstItemId = result.current.data[0]!.id
+
+ // Delete the first item
+ act(() => {
+ collection.utils.begin()
+ collection.utils.write({
+ type: `delete`,
+ value: posts[0]!,
+ })
+ collection.utils.commit()
+ })
+
+ await waitFor(() => {
+ // First item should have changed
+ expect(result.current.data[0]!.id).not.toBe(firstItemId)
+ })
+
+ // Still showing 2 pages, each pulls from remaining 24 items
+ // Page 1: items 0-9 (10 items)
+ // Page 2: items 10-19 (10 items)
+ // Total: 20 items (item 20-23 are beyond our loaded pages)
+ expect(result.current.pages).toHaveLength(2)
+ expect(result.current.data).toHaveLength(20)
+ expect(result.current.pages[0]).toHaveLength(10)
+ expect(result.current.pages[1]).toHaveLength(10)
+ })
+
+ it(`should work with where clauses`, async () => {
+ const posts = createMockPosts(50)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `where-clause-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .where(({ posts: p }) => eq(p.category, `tech`))
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 5,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 5 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Should only have tech posts (every even ID)
+ expect(result.current.pages).toHaveLength(1)
+ expect(result.current.pages[0]).toHaveLength(5)
+
+ // All items should be tech category
+ result.current.pages[0]!.forEach((post) => {
+ expect(post.category).toBe(`tech`)
+ })
+
+ // Should have more pages
+ expect(result.current.hasNextPage).toBe(true)
+
+ // Fetch next page
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ expect(result.current.data).toHaveLength(10)
+ })
+
+ it(`should re-execute query when dependencies change`, async () => {
+ const posts = createMockPosts(50)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `deps-change-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result, rerender } = renderHook(
+ ({ category }: { category: string }) => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .where(({ posts: p }) => eq(p.category, category))
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 5,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 5 ? lastPage.length : undefined,
+ },
+ [category]
+ )
+ },
+ { initialProps: { category: `tech` } }
+ )
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Fetch 2 pages of tech posts
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ // Change category to life
+ act(() => {
+ rerender({ category: `life` })
+ })
+
+ await waitFor(() => {
+ // Should reset to 1 page with life posts
+ expect(result.current.pages).toHaveLength(1)
+ })
+
+ // All items should be life category
+ result.current.pages[0]!.forEach((post) => {
+ expect(post.category).toBe(`life`)
+ })
+ })
+
+ it(`should track pageParams correctly`, async () => {
+ const posts = createMockPosts(30)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `page-params-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ initialPageParam: 0,
+ getNextPageParam: (lastPage, allPages, lastPageParam) =>
+ lastPage.length === 10 ? lastPageParam + 1 : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ expect(result.current.pageParams).toEqual([0])
+
+ // Fetch next page
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pageParams).toEqual([0, 1])
+ })
+
+ // Fetch another page
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pageParams).toEqual([0, 1, 2])
+ })
+ })
+
+ it(`should handle exact page size boundaries`, async () => {
+ const posts = createMockPosts(20) // Exactly 2 pages
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `exact-boundary-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ // Better getNextPageParam that checks against total data available
+ getNextPageParam: (lastPage, allPages) => {
+ // If last page is not full, we're done
+ if (lastPage.length < 10) return undefined
+ // Check if we've likely loaded all data (this is a heuristic)
+ // In a real app with backend, you'd check response metadata
+ const totalLoaded = allPages.flat().length
+ // If we have less than a full page left, no more pages
+ return totalLoaded
+ },
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ expect(result.current.hasNextPage).toBe(true)
+
+ // Fetch page 2
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ expect(result.current.pages[1]).toHaveLength(10)
+ // With setWindow peek-ahead, we can now detect no more pages immediately
+ // We request 21 items (2 * 10 + 1 peek) but only get 20, so we know there's no more
+ expect(result.current.hasNextPage).toBe(false)
+
+ // Verify total data
+ expect(result.current.data).toHaveLength(20)
+ })
+
+ it(`should not fetch when already fetching`, async () => {
+ const posts = createMockPosts(50)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `concurrent-fetch-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ expect(result.current.pages).toHaveLength(1)
+
+ // With sync data, all fetches complete immediately, so all 3 calls will succeed
+ // The key is that they won't cause race conditions or errors
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(3)
+ })
+
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(4)
+ })
+
+ // All fetches should have succeeded
+ expect(result.current.pages).toHaveLength(4)
+ expect(result.current.data).toHaveLength(40)
+ })
+
+ it(`should not fetch when hasNextPage is false`, async () => {
+ const posts = createMockPosts(5)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `no-fetch-when-done-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ expect(result.current.hasNextPage).toBe(false)
+ expect(result.current.pages).toHaveLength(1)
+
+ // Try to fetch when there's no next page
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await new Promise((resolve) => setTimeout(resolve, 50))
+
+ // Should still have only 1 page
+ expect(result.current.pages).toHaveLength(1)
+ })
+
+ it(`should support custom initialPageParam`, async () => {
+ const posts = createMockPosts(30)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `initial-param-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ initialPageParam: 100,
+ getNextPageParam: (lastPage, allPages, lastPageParam) =>
+ lastPage.length === 10 ? lastPageParam + 1 : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ expect(result.current.pageParams).toEqual([100])
+
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pageParams).toEqual([100, 101])
+ })
+ })
+
+ it(`should detect hasNextPage change when new items are synced`, async () => {
+ // Start with exactly 20 items (2 pages)
+ const posts = createMockPosts(20)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `sync-detection-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Load both pages
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(2)
+ })
+
+ // Should have no next page (exactly 20 items, 2 full pages, peek returns nothing)
+ expect(result.current.hasNextPage).toBe(false)
+ expect(result.current.data).toHaveLength(20)
+
+ // Add 5 more items to the collection
+ act(() => {
+ collection.utils.begin()
+ for (let i = 0; i < 5; i++) {
+ collection.utils.write({
+ type: `insert`,
+ value: {
+ id: `new-${i}`,
+ title: `New Post ${i}`,
+ content: `Content ${i}`,
+ createdAt: Date.now() + i,
+ category: `tech`,
+ },
+ })
+ }
+ collection.utils.commit()
+ })
+
+ // Should now detect that there's a next page available
+ await waitFor(() => {
+ expect(result.current.hasNextPage).toBe(true)
+ })
+
+ // Data should still be 20 items (we haven't fetched the next page yet)
+ expect(result.current.data).toHaveLength(20)
+ expect(result.current.pages).toHaveLength(2)
+
+ // Fetch the next page
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ await waitFor(() => {
+ expect(result.current.pages).toHaveLength(3)
+ })
+
+ // Third page should have the new items
+ expect(result.current.pages[2]).toHaveLength(5)
+ expect(result.current.data).toHaveLength(25)
+
+ // No more pages available now
+ expect(result.current.hasNextPage).toBe(false)
+ })
+
+ it(`should set isFetchingNextPage to false when data is immediately available`, async () => {
+ const posts = createMockPosts(50)
+ const collection = createCollection(
+ mockSyncCollectionOptions({
+ id: `immediate-data-test`,
+ getKey: (post: Post) => post.id,
+ initialData: posts,
+ })
+ )
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Initially 1 page and not fetching
+ expect(result.current.pages).toHaveLength(1)
+ expect(result.current.isFetchingNextPage).toBe(false)
+
+ // Fetch next page - should remain false because data is immediately available
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ // Since data is *synchronously* available, isFetchingNextPage should be false
+ expect(result.current.pages).toHaveLength(2)
+ expect(result.current.isFetchingNextPage).toBe(false)
+ })
+
+ it(`should track isFetchingNextPage when async loading is triggered`, async () => {
+ let loadSubsetCallCount = 0
+
+ const collection = createCollection({
+ id: `async-loading-test`,
+ getKey: (post: Post) => post.id,
+ syncMode: `on-demand`,
+ startSync: true,
+ sync: {
+ sync: ({ markReady, begin, write, commit }) => {
+ // Provide initial data
+ begin()
+ for (let i = 1; i <= 15; i++) {
+ write({
+ type: `insert`,
+ value: {
+ id: `${i}`,
+ title: `Post ${i}`,
+ content: `Content ${i}`,
+ createdAt: 1000000 - i * 1000,
+ category: i % 2 === 0 ? `tech` : `life`,
+ },
+ })
+ }
+ commit()
+ markReady()
+
+ return {
+ loadSubset: () => {
+ loadSubsetCallCount++
+
+ // First few calls return true (initial load + window setup)
+ if (loadSubsetCallCount <= 2) {
+ return true
+ }
+
+ // Subsequent calls simulate async loading with a real timeout
+ const loadPromise = new Promise((resolve) => {
+ setTimeout(() => {
+ begin()
+ // Load more data
+ for (let i = 16; i <= 30; i++) {
+ write({
+ type: `insert`,
+ value: {
+ id: `${i}`,
+ title: `Post ${i}`,
+ content: `Content ${i}`,
+ createdAt: 1000000 - i * 1000,
+ category: i % 2 === 0 ? `tech` : `life`,
+ },
+ })
+ }
+ commit()
+ resolve()
+ }, 50)
+ })
+
+ return loadPromise
+ },
+ }
+ },
+ },
+ })
+
+ const { result } = renderHook(() => {
+ return useLiveInfiniteQuery(
+ (q) =>
+ q
+ .from({ posts: collection })
+ .orderBy(({ posts: p }) => p.createdAt, `desc`),
+ {
+ pageSize: 10,
+ getNextPageParam: (lastPage) =>
+ lastPage.length === 10 ? lastPage.length : undefined,
+ }
+ )
+ })
+
+ await waitFor(() => {
+ expect(result.current.isReady).toBe(true)
+ })
+
+ // Wait for initial window setup to complete
+ await waitFor(() => {
+ expect(result.current.isFetchingNextPage).toBe(false)
+ })
+
+ expect(result.current.pages).toHaveLength(1)
+
+ // Fetch next page which will trigger async loading
+ act(() => {
+ result.current.fetchNextPage()
+ })
+
+ // Should be fetching now and so isFetchingNextPage should be true *synchronously!*
+ expect(result.current.isFetchingNextPage).toBe(true)
+
+ // Wait for loading to complete
+ await waitFor(
+ () => {
+ expect(result.current.isFetchingNextPage).toBe(false)
+ },
+ { timeout: 200 }
+ )
+
+ // Should have 2 pages now
+ expect(result.current.pages).toHaveLength(2)
+ expect(result.current.data).toHaveLength(20)
+ }, 10000)
+})