From f329fb15cc1960694200ab34de1db8bba5636238 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 12 Oct 2025 18:48:15 +0100 Subject: [PATCH 01/13] wip --- packages/db/src/collection/events.ts | 12 + packages/db/src/collection/index.ts | 24 +- packages/db/src/collection/subscription.ts | 157 +++++++++- packages/db/src/collection/sync.ts | 62 +++- .../query/live/collection-config-builder.ts | 3 +- .../src/query/live/collection-subscriber.ts | 45 ++- .../db/tests/collection-subscription.test.ts | 271 ++++++++++++++++++ packages/db/tests/collection.test.ts | 207 +++++++++++++ .../tests/query/live-query-collection.test.ts | 88 ++++++ 9 files changed, 859 insertions(+), 10 deletions(-) create mode 100644 packages/db/tests/collection-subscription.test.ts diff --git a/packages/db/src/collection/events.ts b/packages/db/src/collection/events.ts index d0f56e744..138c3e907 100644 --- a/packages/db/src/collection/events.ts +++ b/packages/db/src/collection/events.ts @@ -31,9 +31,20 @@ export interface CollectionSubscribersChangeEvent { subscriberCount: number } +/** + * Event emitted when the collection's loading more state changes + */ +export interface CollectionLoadingMoreChangeEvent { + type: `loadingMore:change` + collection: Collection + isLoadingMore: boolean + previousIsLoadingMore: boolean +} + export type AllCollectionEvents = { "status:change": CollectionStatusChangeEvent "subscribers:change": CollectionSubscribersChangeEvent + "loadingMore:change": CollectionLoadingMoreChangeEvent } & { [K in CollectionStatus as `status:${K}`]: CollectionStatusEvent } @@ -42,6 +53,7 @@ export type CollectionEvent = | AllCollectionEvents[keyof AllCollectionEvents] | CollectionStatusChangeEvent | CollectionSubscribersChangeEvent + | CollectionLoadingMoreChangeEvent export type CollectionEventHandler = ( event: AllCollectionEvents[T] diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index 965d02592..a23e10d83 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -303,6 +303,7 @@ export class CollectionImpl< collection: this, // Required for passing to config.sync callback state: this._state, lifecycle: this._lifecycle, + events: this._events, }) // Only start sync immediately if explicitly enabled @@ -355,6 +356,14 @@ export class CollectionImpl< return this._lifecycle.status === `ready` } + /** + * Check if the collection is currently loading more data + * @returns true if the collection has pending load more operations, false otherwise + */ + public get isLoadingMore(): boolean { + return this._sync.isLoadingMore + } + /** * Start sync immediately - internal method for compiled queries * This bypasses lazy loading for special cases like live query results @@ -368,11 +377,18 @@ export class CollectionImpl< * @param options Options to control what data is being loaded * @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded. * If data loading is synchronous, the data is loaded when the method returns. + * Returns undefined if no sync function is configured. */ - public syncMore(options: OnLoadMoreOptions): void | Promise { - if (this._sync.syncOnLoadMoreFn) { - return this._sync.syncOnLoadMoreFn(options) - } + public syncMore(options: OnLoadMoreOptions): Promise | undefined { + return this._sync.syncMore(options) + } + + /** + * Tracks a load promise for isLoadingMore state. + * @internal This is for internal coordination (e.g., live-query glue code), not for general use. + */ + public trackLoadPromise(promise: Promise): void { + this._sync.trackLoadPromise(promise) } /** diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index f369971c6..592c8e565 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -28,6 +28,32 @@ type CollectionSubscriptionOptions = { onUnsubscribe?: () => void } +type SubscriptionStatus = `ready` | `loadingMore` + +interface SubscriptionStatusChangeEvent { + type: `status:change` + subscription: CollectionSubscription + previousStatus: SubscriptionStatus + status: SubscriptionStatus +} + +interface SubscriptionStatusEvent { + type: `status:${T}` + subscription: CollectionSubscription + previousStatus: SubscriptionStatus + status: T +} + +type AllSubscriptionEvents = { + "status:change": SubscriptionStatusChangeEvent + "status:ready": SubscriptionStatusEvent<`ready`> + "status:loadingMore": SubscriptionStatusEvent<`loadingMore`> +} + +type SubscriptionEventHandler = ( + event: AllSubscriptionEvents[T] +) => void + export class CollectionSubscription { private loadedInitialState = false @@ -42,6 +68,16 @@ export class CollectionSubscription { private orderByIndex: IndexInterface | undefined + // Status tracking + public status: SubscriptionStatus = `ready` + private pendingLoadMorePromises: Set> = new Set() + + // Event emitter + private listeners = new Map< + keyof AllSubscriptionEvents, + Set> + >() + constructor( private collection: CollectionImpl, private callback: (changes: Array>) => void, @@ -71,6 +107,95 @@ export class CollectionSubscription { this.orderByIndex = index } + /** + * Subscribe to a subscription event + */ + on( + event: T, + callback: SubscriptionEventHandler + ) { + if (!this.listeners.has(event)) { + this.listeners.set(event, new Set()) + } + this.listeners.get(event)!.add(callback) + + return () => { + this.listeners.get(event)?.delete(callback) + } + } + + /** + * Subscribe to a subscription event once + */ + once( + event: T, + callback: SubscriptionEventHandler + ) { + const unsubscribe = this.on(event, (eventPayload) => { + callback(eventPayload) + unsubscribe() + }) + return unsubscribe + } + + /** + * Unsubscribe from a subscription event + */ + off( + event: T, + callback: SubscriptionEventHandler + ) { + this.listeners.get(event)?.delete(callback) + } + + /** + * Emit a subscription event + */ + private emit( + event: T, + eventPayload: AllSubscriptionEvents[T] + ) { + this.listeners.get(event)?.forEach((listener) => { + try { + listener(eventPayload) + } catch (error) { + // Re-throw in a microtask to surface the error + queueMicrotask(() => { + throw error + }) + } + }) + } + + /** + * Set subscription status and emit events if changed + */ + private setStatus(newStatus: SubscriptionStatus) { + if (this.status === newStatus) { + return // No change + } + + const previousStatus = this.status + this.status = newStatus + + // Emit status:change event + this.emit(`status:change`, { + type: `status:change`, + subscription: this, + previousStatus, + status: newStatus, + }) + + // Emit specific status event + const eventKey: `status:${SubscriptionStatus}` = `status:${newStatus}` + this.emit(eventKey, { + type: eventKey, + subscription: this, + previousStatus, + status: newStatus, + } as AllSubscriptionEvents[typeof eventKey]) + } + hasLoadedInitialState() { return this.loadedInitialState } @@ -121,10 +246,23 @@ export class CollectionSubscription { // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - this.collection.syncMore({ + const syncPromise = this.collection.syncMore({ where: stateOpts.where, }) + // Track the promise if it exists + if (syncPromise) { + this.pendingLoadMorePromises.add(syncPromise) + this.setStatus(`loadingMore`) + + syncPromise.finally(() => { + this.pendingLoadMorePromises.delete(syncPromise) + if (this.pendingLoadMorePromises.size === 0) { + this.setStatus(`ready`) + } + }) + } + // Also load data immediately from the collection const snapshot = this.collection.currentStateAsChanges(stateOpts) @@ -215,11 +353,24 @@ export class CollectionSubscription { // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - this.collection.syncMore({ + const syncPromise = this.collection.syncMore({ where: whereWithValueFilter, limit, orderBy, }) + + // Track the promise if it exists + if (syncPromise) { + this.pendingLoadMorePromises.add(syncPromise) + this.setStatus(`loadingMore`) + + syncPromise.finally(() => { + this.pendingLoadMorePromises.delete(syncPromise) + if (this.pendingLoadMorePromises.size === 0) { + this.setStatus(`ready`) + } + }) + } } /** @@ -264,6 +415,8 @@ export class CollectionSubscription { } unsubscribe() { + // Clear all event listeners to prevent memory leaks + this.listeners.clear() this.options.onUnsubscribe?.() } } diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 1958c58f9..5129342fe 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -19,6 +19,7 @@ import type { import type { CollectionImpl } from "./index.js" import type { CollectionStateManager } from "./state" import type { CollectionLifecycleManager } from "./lifecycle" +import type { CollectionEventsManager } from "./events.js" export class CollectionSyncManager< TOutput extends object = Record, @@ -29,6 +30,7 @@ export class CollectionSyncManager< private collection!: CollectionImpl private state!: CollectionStateManager private lifecycle!: CollectionLifecycleManager + private _events!: CollectionEventsManager private config!: CollectionConfig private id: string @@ -38,6 +40,8 @@ export class CollectionSyncManager< | ((options: OnLoadMoreOptions) => void | Promise) | null = null + private pendingLoadMorePromises: Set> = new Set() + /** * Creates a new CollectionSyncManager instance */ @@ -50,10 +54,12 @@ export class CollectionSyncManager< collection: CollectionImpl state: CollectionStateManager lifecycle: CollectionLifecycleManager + events: CollectionEventsManager }) { this.collection = deps.collection this.state = deps.state this.lifecycle = deps.lifecycle + this._events = deps.events } /** @@ -239,16 +245,68 @@ export class CollectionSyncManager< return this.preloadPromise } + /** + * Gets whether the collection is currently loading more data + */ + public get isLoadingMore(): boolean { + return this.pendingLoadMorePromises.size > 0 + } + + /** + * Tracks a load promise for isLoadingMore state. + * @internal This is for internal coordination (e.g., live-query glue code), not for general use. + */ + public trackLoadPromise(promise: Promise): void { + const wasLoading = this.isLoadingMore + this.pendingLoadMorePromises.add(promise) + const isLoadingNow = this.isLoadingMore + + if (!wasLoading && isLoadingNow) { + this._events.emit(`loadingMore:change`, { + type: `loadingMore:change`, + collection: this.collection, + isLoadingMore: true, + previousIsLoadingMore: false, + }) + } + + promise.finally(() => { + // Check loading state BEFORE removing the promise + const wasLoadingBeforeRemoval = this.isLoadingMore + this.pendingLoadMorePromises.delete(promise) + const stillLoading = this.isLoadingMore + + if (wasLoadingBeforeRemoval && !stillLoading) { + this._events.emit(`loadingMore:change`, { + type: `loadingMore:change`, + collection: this.collection, + isLoadingMore: false, + previousIsLoadingMore: true, + }) + } + }) + } + /** * Requests the sync layer to load more data. * @param options Options to control what data is being loaded * @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded. * If data loading is synchronous, the data is loaded when the method returns. + * Returns undefined if no sync function is configured. */ - public syncMore(options: OnLoadMoreOptions): void | Promise { + public syncMore(options: OnLoadMoreOptions): Promise | undefined { if (this.syncOnLoadMoreFn) { - return this.syncOnLoadMoreFn(options) + const result = this.syncOnLoadMoreFn(options) + + // If the result is void (synchronous), wrap in Promise.resolve() + const promise = result === undefined ? Promise.resolve() : result + + // Track the promise + this.trackLoadPromise(promise) + + return promise } + return undefined } public cleanup(): void { diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 6e68a5faa..33aaec172 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -411,7 +411,8 @@ export class CollectionConfigBuilder< collection, config, syncState, - this + this, + config.collection // Pass the result collection ) const subscription = collectionSubscriber.subscribe() diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index c2930faeb..39ab6fafc 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -21,12 +21,19 @@ export class CollectionSubscriber< private collectionAlias: string + // Track deferred promises for subscription loading states + private subscriptionLoadingPromises = new Map< + CollectionSubscription, + { resolve: () => void } + >() + constructor( private collectionId: string, private collection: Collection, private config: Parameters[`sync`]>[0], private syncState: FullSyncState, - private collectionConfigBuilder: CollectionConfigBuilder + private collectionConfigBuilder: CollectionConfigBuilder, + private resultCollection: Collection ) { this.collectionAlias = findCollectionAlias( this.collectionId, @@ -80,7 +87,43 @@ export class CollectionSubscriber< includeInitialState ) } + + // Subscribe to subscription status changes to propagate loading state + const statusUnsubscribe = subscription.on(`status:change`, (event) => { + if (event.status === `loadingMore`) { + // Guard against duplicate transitions + if (!this.subscriptionLoadingPromises.has(subscription)) { + let resolve: () => void + const promise = new Promise((res) => { + resolve = res + }) + + this.subscriptionLoadingPromises.set(subscription, { + resolve: resolve!, + }) + this.resultCollection.trackLoadPromise(promise) + } + } else { + // status is 'ready' + const deferred = this.subscriptionLoadingPromises.get(subscription) + if (deferred) { + // Clear the map entry FIRST (before resolving) + this.subscriptionLoadingPromises.delete(subscription) + deferred.resolve() + } + } + }) + const unsubscribe = () => { + // If subscription has a pending promise, resolve it before unsubscribing + const deferred = this.subscriptionLoadingPromises.get(subscription) + if (deferred) { + // Clear the map entry FIRST (before resolving) + this.subscriptionLoadingPromises.delete(subscription) + deferred.resolve() + } + + statusUnsubscribe() subscription.unsubscribe() } this.syncState.unsubscribeCallbacks.add(unsubscribe) diff --git a/packages/db/tests/collection-subscription.test.ts b/packages/db/tests/collection-subscription.test.ts new file mode 100644 index 000000000..2c5922564 --- /dev/null +++ b/packages/db/tests/collection-subscription.test.ts @@ -0,0 +1,271 @@ +import { describe, expect, it } from "vitest" +import { createCollection } from "../src/collection/index.js" +import { flushPromises } from "./utils" + +describe(`CollectionSubscription status tracking`, () => { + it(`subscription starts with status 'ready'`, () => { + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + }) + + const subscription = collection.subscribeChanges(() => {}) + + expect(subscription.status).toBe(`ready`) + subscription.unsubscribe() + }) + + it(`status changes to 'loadingMore' when requestSnapshot triggers a promise`, async () => { + let resolveLoadMore: () => void + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve + }) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => loadMorePromise, + } + }, + }, + }) + + const subscription = collection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + expect(subscription.status).toBe(`ready`) + + // Trigger a snapshot request that will call syncMore + subscription.requestSnapshot({ optimizedOnly: false }) + + // Status should now be loadingMore + expect(subscription.status).toBe(`loadingMore`) + + // Resolve the load more promise + resolveLoadMore!() + await flushPromises() + + // Status should be back to ready + expect(subscription.status).toBe(`ready`) + + subscription.unsubscribe() + }) + + it(`status changes back to 'ready' when promise resolves`, async () => { + let resolveLoadMore: () => void + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve + }) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => loadMorePromise, + } + }, + }, + }) + + const subscription = collection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + subscription.requestSnapshot({ optimizedOnly: false }) + expect(subscription.status).toBe(`loadingMore`) + + resolveLoadMore!() + await flushPromises() + + expect(subscription.status).toBe(`ready`) + subscription.unsubscribe() + }) + + it(`concurrent promises keep status as 'loadingMore' until all resolve`, async () => { + let resolveLoadMore1: () => void + let resolveLoadMore2: () => void + let callCount = 0 + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => { + callCount++ + if (callCount === 1) { + return new Promise((resolve) => { + resolveLoadMore1 = resolve + }) + } else { + return new Promise((resolve) => { + resolveLoadMore2 = resolve + }) + } + }, + } + }, + }, + }) + + const subscription = collection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + // Trigger first load + subscription.requestSnapshot({ optimizedOnly: false }) + expect(subscription.status).toBe(`loadingMore`) + + // Trigger second load + subscription.requestSnapshot({ optimizedOnly: false }) + expect(subscription.status).toBe(`loadingMore`) + + // Resolve first promise + resolveLoadMore1!() + await flushPromises() + + // Should still be loading because second promise is pending + expect(subscription.status).toBe(`loadingMore`) + + // Resolve second promise + resolveLoadMore2!() + await flushPromises() + + // Now should be ready + expect(subscription.status).toBe(`ready`) + subscription.unsubscribe() + }) + + it(`emits 'status:change' event`, async () => { + let resolveLoadMore: () => void + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve + }) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => loadMorePromise, + } + }, + }, + }) + + const subscription = collection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + const statusChanges: Array<{ previous: string; current: string }> = [] + + subscription.on(`status:change`, (event) => { + statusChanges.push({ + previous: event.previousStatus, + current: event.status, + }) + }) + + subscription.requestSnapshot({ optimizedOnly: false }) + await flushPromises() + + expect(statusChanges).toHaveLength(1) + expect(statusChanges[0]).toEqual({ + previous: `ready`, + current: `loadingMore`, + }) + + resolveLoadMore!() + await flushPromises() + + expect(statusChanges).toHaveLength(2) + expect(statusChanges[1]).toEqual({ + previous: `loadingMore`, + current: `ready`, + }) + + subscription.unsubscribe() + }) + + it(`promise rejection still cleans up and sets status back to 'ready'`, async () => { + let rejectLoadMore: (error: Error) => void + const loadMorePromise = new Promise((_, reject) => { + rejectLoadMore = reject + }) + // Attach catch handler before rejecting to avoid unhandled rejection + const handledPromise = loadMorePromise.catch(() => {}) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => handledPromise, + } + }, + }, + }) + + const subscription = collection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + subscription.requestSnapshot({ optimizedOnly: false }) + expect(subscription.status).toBe(`loadingMore`) + + // Reject the promise + rejectLoadMore!(new Error(`Load failed`)) + await flushPromises() + + // Status should still be back to ready + expect(subscription.status).toBe(`ready`) + subscription.unsubscribe() + }) + + it(`unsubscribe clears event listeners`, () => { + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + }) + + const subscription = collection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + let eventCount = 0 + subscription.on(`status:change`, () => { + eventCount++ + }) + + subscription.unsubscribe() + + // After unsubscribe, listeners should be cleared + // We can't easily verify this without accessing private members, + // but we can at least verify unsubscribe doesn't throw + expect(eventCount).toBe(0) + }) +}) diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 77b74ec75..4d9df5178 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1356,3 +1356,210 @@ describe(`Collection`, () => { expect(state.size).toBe(3) }) }) + +describe(`Collection isLoadingMore property`, () => { + it(`isLoadingMore is false initially`, () => { + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + }) + + expect(collection.isLoadingMore).toBe(false) + }) + + it(`isLoadingMore becomes true when syncMore returns a promise`, async () => { + let resolveLoadMore: () => void + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve + }) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => loadMorePromise, + } + }, + }, + }) + + expect(collection.isLoadingMore).toBe(false) + + collection.syncMore({}) + expect(collection.isLoadingMore).toBe(true) + + resolveLoadMore!() + await flushPromises() + + expect(collection.isLoadingMore).toBe(false) + }) + + it(`isLoadingMore becomes false when promise resolves`, async () => { + let resolveLoadMore: () => void + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve + }) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => loadMorePromise, + } + }, + }, + }) + + collection.syncMore({}) + expect(collection.isLoadingMore).toBe(true) + + resolveLoadMore!() + await flushPromises() + + expect(collection.isLoadingMore).toBe(false) + }) + + it(`concurrent syncMore calls keep isLoadingMore true until all resolve`, async () => { + let resolveLoadMore1: () => void + let resolveLoadMore2: () => void + let callCount = 0 + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => { + callCount++ + if (callCount === 1) { + return new Promise((resolve) => { + resolveLoadMore1 = resolve + }) + } else { + return new Promise((resolve) => { + resolveLoadMore2 = resolve + }) + } + }, + } + }, + }, + }) + + collection.syncMore({}) + collection.syncMore({}) + + expect(collection.isLoadingMore).toBe(true) + + resolveLoadMore1!() + await flushPromises() + + // Should still be loading because second promise is pending + expect(collection.isLoadingMore).toBe(true) + + resolveLoadMore2!() + await flushPromises() + + // Now should be false + expect(collection.isLoadingMore).toBe(false) + }) + + it(`emits loadingMore:change event`, async () => { + let resolveLoadMore: () => void + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve + }) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => loadMorePromise, + } + }, + }, + }) + + const loadingChanges: Array<{ + isLoadingMore: boolean + previousIsLoadingMore: boolean + }> = [] + + collection.on(`loadingMore:change`, (event) => { + loadingChanges.push({ + isLoadingMore: event.isLoadingMore, + previousIsLoadingMore: event.previousIsLoadingMore, + }) + }) + + collection.syncMore({}) + await flushPromises() + + expect(loadingChanges).toHaveLength(1) + expect(loadingChanges[0]).toEqual({ + isLoadingMore: true, + previousIsLoadingMore: false, + }) + + resolveLoadMore!() + await flushPromises() + + expect(loadingChanges).toHaveLength(2) + expect(loadingChanges[1]).toEqual({ + isLoadingMore: false, + previousIsLoadingMore: true, + }) + }) + + it(`rejected promises still clean up`, async () => { + let rejectLoadMore: (error: Error) => void + const loadMorePromise = new Promise((_, reject) => { + rejectLoadMore = reject + }) + // Attach catch handler before rejecting to avoid unhandled rejection + const handledPromise = loadMorePromise.catch(() => {}) + + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ markReady }) => { + markReady() + return { + onLoadMore: () => handledPromise, + } + }, + }, + }) + + collection.syncMore({}) + expect(collection.isLoadingMore).toBe(true) + + // Reject the promise + rejectLoadMore!(new Error(`Load failed`)) + await flushPromises() + + expect(collection.isLoadingMore).toBe(false) + }) +}) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 14c84d7e7..0e29fffe9 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -937,4 +937,92 @@ describe(`createLiveQueryCollection`, () => { }) }) }) + + describe(`isLoadingMore integration`, () => { + it(`live query result collection has isLoadingMore property`, async () => { + const sourceCollection = createCollection<{ id: string; value: string }>({ + id: `source`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + }) + + const liveQuery = createLiveQueryCollection((q) => + q.from({ item: sourceCollection }) + ) + + await liveQuery.preload() + + expect(liveQuery.isLoadingMore).toBeDefined() + expect(liveQuery.isLoadingMore).toBe(false) + }) + + it(`isLoadingMore property exists and starts as false`, async () => { + const sourceCollection = createCollection<{ id: string; value: string }>({ + id: `source`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: sourceCollection }), + startSync: true, + }) + + await liveQuery.preload() + + expect(liveQuery.isLoadingMore).toBe(false) + }) + + it(`source collection isLoadingMore is independent`, async () => { + let resolveLoadMore: () => void + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve + }) + + const sourceCollection = createCollection<{ id: string; value: number }>({ + id: `source`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady, begin, write, commit }) => { + begin() + write({ type: `insert`, value: { id: `1`, value: 1 } }) + commit() + markReady() + return { + onLoadMore: () => loadMorePromise, + } + }, + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: sourceCollection }), + startSync: true, + }) + + await liveQuery.preload() + + // Calling syncMore directly on source collection sets its own isLoadingMore + sourceCollection.syncMore({}) + expect(sourceCollection.isLoadingMore).toBe(true) + + // But live query isLoadingMore tracks subscription-driven loads, not direct syncMore calls + // so it remains false unless subscriptions trigger loads via predicate pushdown + expect(liveQuery.isLoadingMore).toBe(false) + + resolveLoadMore!() + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(sourceCollection.isLoadingMore).toBe(false) + expect(liveQuery.isLoadingMore).toBe(false) + }) + }) }) From dc34f5be4f4521bea21a8a20157a35ff8429f760 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 12 Oct 2025 19:14:11 +0100 Subject: [PATCH 02/13] refactor so both CollectionEventsManager and CollectionSubscription subclass the same event emiiter implimetation --- packages/db/src/collection/events.ts | 86 ++----------- packages/db/src/collection/subscription.ts | 80 +----------- packages/db/src/event-emitter.ts | 118 ++++++++++++++++++ .../collection-subscribe-changes.test.ts | 4 +- 4 files changed, 138 insertions(+), 150 deletions(-) create mode 100644 packages/db/src/event-emitter.ts diff --git a/packages/db/src/collection/events.ts b/packages/db/src/collection/events.ts index 138c3e907..23f2dd87c 100644 --- a/packages/db/src/collection/events.ts +++ b/packages/db/src/collection/events.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from "../event-emitter.js" import type { Collection } from "./index.js" import type { CollectionStatus } from "../types.js" @@ -59,89 +60,26 @@ export type CollectionEventHandler = ( event: AllCollectionEvents[T] ) => void -export class CollectionEventsManager { +export class CollectionEventsManager extends EventEmitter { private collection!: Collection - private listeners = new Map< - keyof AllCollectionEvents, - Set> - >() - constructor() {} + constructor() { + super() + } setDeps(deps: { collection: Collection }) { this.collection = deps.collection } - on( - event: T, - callback: CollectionEventHandler - ) { - if (!this.listeners.has(event)) { - this.listeners.set(event, new Set()) - } - this.listeners.get(event)!.add(callback) - - return () => { - this.listeners.get(event)?.delete(callback) - } - } - - once( - event: T, - callback: CollectionEventHandler - ) { - const unsubscribe = this.on(event, (eventPayload) => { - callback(eventPayload) - unsubscribe() - }) - return unsubscribe - } - - off( - event: T, - callback: CollectionEventHandler - ) { - this.listeners.get(event)?.delete(callback) - } - - waitFor( - event: T, - timeout?: number - ): Promise { - return new Promise((resolve, reject) => { - let timeoutId: NodeJS.Timeout | undefined - const unsubscribe = this.on(event, (eventPayload) => { - if (timeoutId) { - clearTimeout(timeoutId) - timeoutId = undefined - } - resolve(eventPayload) - unsubscribe() - }) - if (timeout) { - timeoutId = setTimeout(() => { - timeoutId = undefined - unsubscribe() - reject(new Error(`Timeout waiting for event ${event}`)) - }, timeout) - } - }) - } - + /** + * Emit an event to all listeners + * Public API for emitting collection events + */ emit( event: T, eventPayload: AllCollectionEvents[T] - ) { - this.listeners.get(event)?.forEach((listener) => { - try { - listener(eventPayload) - } catch (error) { - // Re-throw in a microtask to surface the error - queueMicrotask(() => { - throw error - }) - } - }) + ): void { + this.emitInner(event, eventPayload) } emitStatusChange( @@ -178,6 +116,6 @@ export class CollectionEventsManager { } cleanup() { - this.listeners.clear() + this.clearListeners() } } diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 592c8e565..a762e7563 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -1,6 +1,7 @@ import { ensureIndexForExpression } from "../indexes/auto-index.js" import { and, gt, lt } from "../query/builder/functions.js" import { Value } from "../query/ir.js" +import { EventEmitter } from "../event-emitter.js" import { createFilterFunctionFromExpression, createFilteredCallback, @@ -50,11 +51,7 @@ type AllSubscriptionEvents = { "status:loadingMore": SubscriptionStatusEvent<`loadingMore`> } -type SubscriptionEventHandler = ( - event: AllSubscriptionEvents[T] -) => void - -export class CollectionSubscription { +export class CollectionSubscription extends EventEmitter { private loadedInitialState = false // Flag to indicate that we have sent at least 1 snapshot. @@ -72,17 +69,12 @@ export class CollectionSubscription { public status: SubscriptionStatus = `ready` private pendingLoadMorePromises: Set> = new Set() - // Event emitter - private listeners = new Map< - keyof AllSubscriptionEvents, - Set> - >() - constructor( private collection: CollectionImpl, private callback: (changes: Array>) => void, private options: CollectionSubscriptionOptions ) { + super() // Auto-index for where expressions if enabled if (options.whereExpression) { ensureIndexForExpression(options.whereExpression, this.collection) @@ -107,66 +99,6 @@ export class CollectionSubscription { this.orderByIndex = index } - /** - * Subscribe to a subscription event - */ - on( - event: T, - callback: SubscriptionEventHandler - ) { - if (!this.listeners.has(event)) { - this.listeners.set(event, new Set()) - } - this.listeners.get(event)!.add(callback) - - return () => { - this.listeners.get(event)?.delete(callback) - } - } - - /** - * Subscribe to a subscription event once - */ - once( - event: T, - callback: SubscriptionEventHandler - ) { - const unsubscribe = this.on(event, (eventPayload) => { - callback(eventPayload) - unsubscribe() - }) - return unsubscribe - } - - /** - * Unsubscribe from a subscription event - */ - off( - event: T, - callback: SubscriptionEventHandler - ) { - this.listeners.get(event)?.delete(callback) - } - - /** - * Emit a subscription event - */ - private emit( - event: T, - eventPayload: AllSubscriptionEvents[T] - ) { - this.listeners.get(event)?.forEach((listener) => { - try { - listener(eventPayload) - } catch (error) { - // Re-throw in a microtask to surface the error - queueMicrotask(() => { - throw error - }) - } - }) - } - /** * Set subscription status and emit events if changed */ @@ -179,7 +111,7 @@ export class CollectionSubscription { this.status = newStatus // Emit status:change event - this.emit(`status:change`, { + this.emitInner(`status:change`, { type: `status:change`, subscription: this, previousStatus, @@ -188,7 +120,7 @@ export class CollectionSubscription { // Emit specific status event const eventKey: `status:${SubscriptionStatus}` = `status:${newStatus}` - this.emit(eventKey, { + this.emitInner(eventKey, { type: eventKey, subscription: this, previousStatus, @@ -416,7 +348,7 @@ export class CollectionSubscription { unsubscribe() { // Clear all event listeners to prevent memory leaks - this.listeners.clear() + this.clearListeners() this.options.onUnsubscribe?.() } } diff --git a/packages/db/src/event-emitter.ts b/packages/db/src/event-emitter.ts new file mode 100644 index 000000000..19656e573 --- /dev/null +++ b/packages/db/src/event-emitter.ts @@ -0,0 +1,118 @@ +/** + * Generic type-safe event emitter + * @template TEvents - Record of event names to event payload types + */ +export class EventEmitter> { + private listeners = new Map< + keyof TEvents, + Set<(event: TEvents[keyof TEvents]) => void> + >() + + /** + * Subscribe to an event + * @param event - Event name to listen for + * @param callback - Function to call when event is emitted + * @returns Unsubscribe function + */ + on( + event: T, + callback: (event: TEvents[T]) => void + ): () => void { + if (!this.listeners.has(event)) { + this.listeners.set(event, new Set()) + } + this.listeners.get(event)!.add(callback as (event: any) => void) + + return () => { + this.listeners.get(event)?.delete(callback as (event: any) => void) + } + } + + /** + * Subscribe to an event once (automatically unsubscribes after first emission) + * @param event - Event name to listen for + * @param callback - Function to call when event is emitted + * @returns Unsubscribe function + */ + once( + event: T, + callback: (event: TEvents[T]) => void + ): () => void { + const unsubscribe = this.on(event, (eventPayload) => { + callback(eventPayload) + unsubscribe() + }) + return unsubscribe + } + + /** + * Unsubscribe from an event + * @param event - Event name to stop listening for + * @param callback - Function to remove + */ + off( + event: T, + callback: (event: TEvents[T]) => void + ): void { + this.listeners.get(event)?.delete(callback as (event: any) => void) + } + + /** + * Wait for an event to be emitted + * @param event - Event name to wait for + * @param timeout - Optional timeout in milliseconds + * @returns Promise that resolves with the event payload + */ + waitFor( + event: T, + timeout?: number + ): Promise { + return new Promise((resolve, reject) => { + let timeoutId: NodeJS.Timeout | undefined + const unsubscribe = this.on(event, (eventPayload) => { + if (timeoutId) { + clearTimeout(timeoutId) + timeoutId = undefined + } + resolve(eventPayload) + unsubscribe() + }) + if (timeout) { + timeoutId = setTimeout(() => { + timeoutId = undefined + unsubscribe() + reject(new Error(`Timeout waiting for event ${String(event)}`)) + }, timeout) + } + }) + } + + /** + * Emit an event to all listeners + * @param event - Event name to emit + * @param eventPayload - Event payload + * @internal For use by subclasses - subclasses should wrap this with a public emit if needed + */ + protected emitInner( + event: T, + eventPayload: TEvents[T] + ): void { + this.listeners.get(event)?.forEach((listener) => { + try { + listener(eventPayload) + } catch (error) { + // Re-throw in a microtask to surface the error + queueMicrotask(() => { + throw error + }) + } + }) + } + + /** + * Clear all listeners + */ + protected clearListeners(): void { + this.listeners.clear() + } +} diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 36ee2416a..710f2122a 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -1647,7 +1647,7 @@ describe(`Collection.subscribeChanges`, () => { } }) - it(`should emit change events for multiple sync transactions before marking ready`, async () => { + it(`should emit change events for multiple sync transactions before marking ready`, () => { const changeEvents: Array = [] let testSyncFunctions: any = null @@ -1750,7 +1750,7 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.state.get(3)).toEqual({ id: 3, value: `third item` }) }) - it(`should emit change events while collection is loading for filtered subscriptions`, async () => { + it(`should emit change events while collection is loading for filtered subscriptions`, () => { const changeEvents: Array = [] let testSyncFunctions: any = null From b7f45c81b1a28ea27a18228960aa18007cd0425c Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 12 Oct 2025 19:20:59 +0100 Subject: [PATCH 03/13] changeset --- .changeset/cruel-buckets-shop.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/cruel-buckets-shop.md diff --git a/.changeset/cruel-buckets-shop.md b/.changeset/cruel-buckets-shop.md new file mode 100644 index 000000000..b677b56ee --- /dev/null +++ b/.changeset/cruel-buckets-shop.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Added `isLoadingMore` property and `loadingMore:change` events to collections and live queries, enabling UIs to display loading indicators when more data is being fetched via `syncMore`. Each live query maintains its own isolated loading state based on its subscriptions, preventing loading status "bleed" between independent queries that share the same source collections. From d6c6e7eb09b4e503ec73b148e246038fa781edd5 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 13 Oct 2025 18:12:49 +0100 Subject: [PATCH 04/13] rename loadMore to loadSubset --- packages/db/src/collection/index.ts | 14 +--- packages/db/src/collection/subscription.ts | 18 ++--- packages/db/src/collection/sync.ts | 24 +++---- .../query/live/collection-config-builder.ts | 12 ++-- .../src/query/live/collection-subscriber.ts | 4 +- packages/db/src/types.ts | 4 +- .../db/tests/collection-subscription.test.ts | 58 +++++++-------- packages/db/tests/collection.test.ts | 72 +++++++++---------- .../tests/query/live-query-collection.test.ts | 16 ++--- 9 files changed, 105 insertions(+), 117 deletions(-) diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index a23e10d83..868308833 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -25,7 +25,6 @@ import type { InferSchemaOutput, InsertConfig, NonSingleResult, - OnLoadMoreOptions, OperationConfig, SingleResult, SubscribeChangesOptions, @@ -218,7 +217,7 @@ export class CollectionImpl< private _events: CollectionEventsManager private _changes: CollectionChangesManager public _lifecycle: CollectionLifecycleManager - private _sync: CollectionSyncManager + public _sync: CollectionSyncManager private _indexes: CollectionIndexesManager private _mutations: CollectionMutationsManager< TOutput, @@ -372,17 +371,6 @@ export class CollectionImpl< this._sync.startSync() } - /** - * Requests the sync layer to load more data. - * @param options Options to control what data is being loaded - * @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded. - * If data loading is synchronous, the data is loaded when the method returns. - * Returns undefined if no sync function is configured. - */ - public syncMore(options: OnLoadMoreOptions): Promise | undefined { - return this._sync.syncMore(options) - } - /** * Tracks a load promise for isLoadingMore state. * @internal This is for internal coordination (e.g., live-query glue code), not for general use. diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index a762e7563..d79a72978 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -67,7 +67,7 @@ export class CollectionSubscription extends EventEmitter // Status tracking public status: SubscriptionStatus = `ready` - private pendingLoadMorePromises: Set> = new Set() + private pendingLoadSubsetPromises: Set> = new Set() constructor( private collection: CollectionImpl, @@ -178,18 +178,18 @@ export class CollectionSubscription extends EventEmitter // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - const syncPromise = this.collection.syncMore({ + const syncPromise = this.collection._sync.loadSubset({ where: stateOpts.where, }) // Track the promise if it exists if (syncPromise) { - this.pendingLoadMorePromises.add(syncPromise) + this.pendingLoadSubsetPromises.add(syncPromise) this.setStatus(`loadingMore`) syncPromise.finally(() => { - this.pendingLoadMorePromises.delete(syncPromise) - if (this.pendingLoadMorePromises.size === 0) { + this.pendingLoadSubsetPromises.delete(syncPromise) + if (this.pendingLoadSubsetPromises.size === 0) { this.setStatus(`ready`) } }) @@ -285,7 +285,7 @@ export class CollectionSubscription extends EventEmitter // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - const syncPromise = this.collection.syncMore({ + const syncPromise = this.collection._sync.loadSubset({ where: whereWithValueFilter, limit, orderBy, @@ -293,12 +293,12 @@ export class CollectionSubscription extends EventEmitter // Track the promise if it exists if (syncPromise) { - this.pendingLoadMorePromises.add(syncPromise) + this.pendingLoadSubsetPromises.add(syncPromise) this.setStatus(`loadingMore`) syncPromise.finally(() => { - this.pendingLoadMorePromises.delete(syncPromise) - if (this.pendingLoadMorePromises.size === 0) { + this.pendingLoadSubsetPromises.delete(syncPromise) + if (this.pendingLoadSubsetPromises.size === 0) { this.setStatus(`ready`) } }) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 5129342fe..7f65b3e24 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -13,7 +13,7 @@ import type { ChangeMessage, CleanupFn, CollectionConfig, - OnLoadMoreOptions, + LoadSubsetOptions, SyncConfigRes, } from "../types" import type { CollectionImpl } from "./index.js" @@ -36,11 +36,11 @@ export class CollectionSyncManager< public preloadPromise: Promise | null = null public syncCleanupFn: (() => void) | null = null - public syncOnLoadMoreFn: - | ((options: OnLoadMoreOptions) => void | Promise) + public syncLoadSubsetFn: + | ((options: LoadSubsetOptions) => void | Promise) | null = null - private pendingLoadMorePromises: Set> = new Set() + private pendingLoadSubsetPromises: Set> = new Set() /** * Creates a new CollectionSyncManager instance @@ -195,8 +195,8 @@ export class CollectionSyncManager< // Store cleanup function if provided this.syncCleanupFn = syncRes?.cleanup ?? null - // Store onLoadMore function if provided - this.syncOnLoadMoreFn = syncRes?.onLoadMore ?? null + // Store loadSubset function if provided + this.syncLoadSubsetFn = syncRes?.loadSubset ?? null } catch (error) { this.lifecycle.setStatus(`error`) throw error @@ -249,7 +249,7 @@ export class CollectionSyncManager< * Gets whether the collection is currently loading more data */ public get isLoadingMore(): boolean { - return this.pendingLoadMorePromises.size > 0 + return this.pendingLoadSubsetPromises.size > 0 } /** @@ -258,7 +258,7 @@ export class CollectionSyncManager< */ public trackLoadPromise(promise: Promise): void { const wasLoading = this.isLoadingMore - this.pendingLoadMorePromises.add(promise) + this.pendingLoadSubsetPromises.add(promise) const isLoadingNow = this.isLoadingMore if (!wasLoading && isLoadingNow) { @@ -273,7 +273,7 @@ export class CollectionSyncManager< promise.finally(() => { // Check loading state BEFORE removing the promise const wasLoadingBeforeRemoval = this.isLoadingMore - this.pendingLoadMorePromises.delete(promise) + this.pendingLoadSubsetPromises.delete(promise) const stillLoading = this.isLoadingMore if (wasLoadingBeforeRemoval && !stillLoading) { @@ -294,9 +294,9 @@ export class CollectionSyncManager< * If data loading is synchronous, the data is loaded when the method returns. * Returns undefined if no sync function is configured. */ - public syncMore(options: OnLoadMoreOptions): Promise | undefined { - if (this.syncOnLoadMoreFn) { - const result = this.syncOnLoadMoreFn(options) + public loadSubset(options: LoadSubsetOptions): Promise | undefined { + if (this.syncLoadSubsetFn) { + const result = this.syncLoadSubsetFn(options) // If the result is void (synchronous), wrap in Promise.resolve() const promise = result === undefined ? Promise.resolve() : result diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 33aaec172..cb010b179 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -183,13 +183,13 @@ export class CollectionConfigBuilder< syncState ) - const loadMoreDataCallbacks = this.subscribeToAllCollections( + const loadSubsetDataCallbacks = this.subscribeToAllCollections( config, fullSyncState ) // Initial run with callback to load more data if needed - this.maybeRunGraph(config, fullSyncState, loadMoreDataCallbacks) + this.maybeRunGraph(config, fullSyncState, loadSubsetDataCallbacks) // Return the unsubscribe function return () => { @@ -424,16 +424,16 @@ export class CollectionConfigBuilder< }) syncState.unsubscribeCallbacks.add(statusUnsubscribe) - const loadMore = collectionSubscriber.loadMoreIfNeeded.bind( + const loadSubset = collectionSubscriber.loadSubsetIfNeeded.bind( collectionSubscriber, subscription ) - return loadMore + return loadSubset } ) - const loadMoreDataCallback = () => { + const loadSubsetDataCallback = () => { loaders.map((loader) => loader()) return true } @@ -444,7 +444,7 @@ export class CollectionConfigBuilder< // Initial status check after all subscriptions are set up this.updateLiveQueryStatus(config) - return loadMoreDataCallback + return loadSubsetDataCallback } } diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 39ab6fafc..1974c8ee5 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -232,7 +232,7 @@ export class CollectionSubscriber< // This function is called by maybeRunGraph // after each iteration of the query pipeline // to ensure that the orderBy operator has enough data to work with - loadMoreIfNeeded(subscription: CollectionSubscription) { + loadSubsetIfNeeded(subscription: CollectionSubscription) { const orderByInfo = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId @@ -274,7 +274,7 @@ export class CollectionSubscriber< const trackedChanges = this.trackSentValues(changes, comparator) this.sendChangesToPipeline( trackedChanges, - this.loadMoreIfNeeded.bind(this, subscription) + this.loadSubsetIfNeeded.bind(this, subscription) ) } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index b48ab2f5f..b2bd66392 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -150,7 +150,7 @@ export type Row = Record> export type OperationType = `insert` | `update` | `delete` -export type OnLoadMoreOptions = { +export type LoadSubsetOptions = { where?: BasicExpression orderBy?: OrderBy limit?: number @@ -160,7 +160,7 @@ export type CleanupFn = () => void export type SyncConfigRes = { cleanup?: CleanupFn - onLoadMore?: (options: OnLoadMoreOptions) => void | Promise + loadSubset?: (options: LoadSubsetOptions) => void | Promise } export interface SyncConfig< T extends object = Record, diff --git a/packages/db/tests/collection-subscription.test.ts b/packages/db/tests/collection-subscription.test.ts index 2c5922564..ea7263f8b 100644 --- a/packages/db/tests/collection-subscription.test.ts +++ b/packages/db/tests/collection-subscription.test.ts @@ -21,9 +21,9 @@ describe(`CollectionSubscription status tracking`, () => { }) it(`status changes to 'loadingMore' when requestSnapshot triggers a promise`, async () => { - let resolveLoadMore: () => void - const loadMorePromise = new Promise((resolve) => { - resolveLoadMore = resolve + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve }) const collection = createCollection<{ id: string; value: string }>({ @@ -33,7 +33,7 @@ describe(`CollectionSubscription status tracking`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => loadMorePromise, + loadSubset: () => loadSubsetPromise, } }, }, @@ -45,14 +45,14 @@ describe(`CollectionSubscription status tracking`, () => { expect(subscription.status).toBe(`ready`) - // Trigger a snapshot request that will call syncMore + // Trigger a snapshot request that will call loadSubset subscription.requestSnapshot({ optimizedOnly: false }) // Status should now be loadingMore expect(subscription.status).toBe(`loadingMore`) // Resolve the load more promise - resolveLoadMore!() + resolveLoadSubset!() await flushPromises() // Status should be back to ready @@ -62,9 +62,9 @@ describe(`CollectionSubscription status tracking`, () => { }) it(`status changes back to 'ready' when promise resolves`, async () => { - let resolveLoadMore: () => void - const loadMorePromise = new Promise((resolve) => { - resolveLoadMore = resolve + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve }) const collection = createCollection<{ id: string; value: string }>({ @@ -74,7 +74,7 @@ describe(`CollectionSubscription status tracking`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => loadMorePromise, + loadSubset: () => loadSubsetPromise, } }, }, @@ -87,7 +87,7 @@ describe(`CollectionSubscription status tracking`, () => { subscription.requestSnapshot({ optimizedOnly: false }) expect(subscription.status).toBe(`loadingMore`) - resolveLoadMore!() + resolveLoadSubset!() await flushPromises() expect(subscription.status).toBe(`ready`) @@ -95,8 +95,8 @@ describe(`CollectionSubscription status tracking`, () => { }) it(`concurrent promises keep status as 'loadingMore' until all resolve`, async () => { - let resolveLoadMore1: () => void - let resolveLoadMore2: () => void + let resolveLoadSubset1: () => void + let resolveLoadSubset2: () => void let callCount = 0 const collection = createCollection<{ id: string; value: string }>({ @@ -106,15 +106,15 @@ describe(`CollectionSubscription status tracking`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => { + loadSubset: () => { callCount++ if (callCount === 1) { return new Promise((resolve) => { - resolveLoadMore1 = resolve + resolveLoadSubset1 = resolve }) } else { return new Promise((resolve) => { - resolveLoadMore2 = resolve + resolveLoadSubset2 = resolve }) } }, @@ -136,14 +136,14 @@ describe(`CollectionSubscription status tracking`, () => { expect(subscription.status).toBe(`loadingMore`) // Resolve first promise - resolveLoadMore1!() + resolveLoadSubset1!() await flushPromises() // Should still be loading because second promise is pending expect(subscription.status).toBe(`loadingMore`) // Resolve second promise - resolveLoadMore2!() + resolveLoadSubset2!() await flushPromises() // Now should be ready @@ -152,9 +152,9 @@ describe(`CollectionSubscription status tracking`, () => { }) it(`emits 'status:change' event`, async () => { - let resolveLoadMore: () => void - const loadMorePromise = new Promise((resolve) => { - resolveLoadMore = resolve + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve }) const collection = createCollection<{ id: string; value: string }>({ @@ -164,7 +164,7 @@ describe(`CollectionSubscription status tracking`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => loadMorePromise, + loadSubset: () => loadSubsetPromise, } }, }, @@ -192,7 +192,7 @@ describe(`CollectionSubscription status tracking`, () => { current: `loadingMore`, }) - resolveLoadMore!() + resolveLoadSubset!() await flushPromises() expect(statusChanges).toHaveLength(2) @@ -205,12 +205,12 @@ describe(`CollectionSubscription status tracking`, () => { }) it(`promise rejection still cleans up and sets status back to 'ready'`, async () => { - let rejectLoadMore: (error: Error) => void - const loadMorePromise = new Promise((_, reject) => { - rejectLoadMore = reject + let rejectLoadSubset: (error: Error) => void + const loadSubsetPromise = new Promise((_, reject) => { + rejectLoadSubset = reject }) // Attach catch handler before rejecting to avoid unhandled rejection - const handledPromise = loadMorePromise.catch(() => {}) + const handledPromise = loadSubsetPromise.catch(() => {}) const collection = createCollection<{ id: string; value: string }>({ id: `test`, @@ -219,7 +219,7 @@ describe(`CollectionSubscription status tracking`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => handledPromise, + loadSubset: () => handledPromise, } }, }, @@ -233,7 +233,7 @@ describe(`CollectionSubscription status tracking`, () => { expect(subscription.status).toBe(`loadingMore`) // Reject the promise - rejectLoadMore!(new Error(`Load failed`)) + rejectLoadSubset!(new Error(`Load failed`)) await flushPromises() // Status should still be back to ready diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 4d9df5178..112ae8730 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1372,10 +1372,10 @@ describe(`Collection isLoadingMore property`, () => { expect(collection.isLoadingMore).toBe(false) }) - it(`isLoadingMore becomes true when syncMore returns a promise`, async () => { - let resolveLoadMore: () => void - const loadMorePromise = new Promise((resolve) => { - resolveLoadMore = resolve + it(`isLoadingMore becomes true when loadSubset returns a promise`, async () => { + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve }) const collection = createCollection<{ id: string; value: string }>({ @@ -1386,7 +1386,7 @@ describe(`Collection isLoadingMore property`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => loadMorePromise, + loadSubset: () => loadSubsetPromise, } }, }, @@ -1394,19 +1394,19 @@ describe(`Collection isLoadingMore property`, () => { expect(collection.isLoadingMore).toBe(false) - collection.syncMore({}) + collection._sync.loadSubset({}) expect(collection.isLoadingMore).toBe(true) - resolveLoadMore!() + resolveLoadSubset!() await flushPromises() expect(collection.isLoadingMore).toBe(false) }) it(`isLoadingMore becomes false when promise resolves`, async () => { - let resolveLoadMore: () => void - const loadMorePromise = new Promise((resolve) => { - resolveLoadMore = resolve + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve }) const collection = createCollection<{ id: string; value: string }>({ @@ -1417,24 +1417,24 @@ describe(`Collection isLoadingMore property`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => loadMorePromise, + loadSubset: () => loadSubsetPromise, } }, }, }) - collection.syncMore({}) + collection._sync.loadSubset({}) expect(collection.isLoadingMore).toBe(true) - resolveLoadMore!() + resolveLoadSubset!() await flushPromises() expect(collection.isLoadingMore).toBe(false) }) - it(`concurrent syncMore calls keep isLoadingMore true until all resolve`, async () => { - let resolveLoadMore1: () => void - let resolveLoadMore2: () => void + it(`concurrent loadSubset calls keep isLoadingMore true until all resolve`, async () => { + let resolveLoadSubset1: () => void + let resolveLoadSubset2: () => void let callCount = 0 const collection = createCollection<{ id: string; value: string }>({ @@ -1445,15 +1445,15 @@ describe(`Collection isLoadingMore property`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => { + loadSubset: () => { callCount++ if (callCount === 1) { return new Promise((resolve) => { - resolveLoadMore1 = resolve + resolveLoadSubset1 = resolve }) } else { return new Promise((resolve) => { - resolveLoadMore2 = resolve + resolveLoadSubset2 = resolve }) } }, @@ -1462,18 +1462,18 @@ describe(`Collection isLoadingMore property`, () => { }, }) - collection.syncMore({}) - collection.syncMore({}) + collection._sync.loadSubset({}) + collection._sync.loadSubset({}) expect(collection.isLoadingMore).toBe(true) - resolveLoadMore1!() + resolveLoadSubset1!() await flushPromises() // Should still be loading because second promise is pending expect(collection.isLoadingMore).toBe(true) - resolveLoadMore2!() + resolveLoadSubset2!() await flushPromises() // Now should be false @@ -1481,9 +1481,9 @@ describe(`Collection isLoadingMore property`, () => { }) it(`emits loadingMore:change event`, async () => { - let resolveLoadMore: () => void - const loadMorePromise = new Promise((resolve) => { - resolveLoadMore = resolve + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve }) const collection = createCollection<{ id: string; value: string }>({ @@ -1494,7 +1494,7 @@ describe(`Collection isLoadingMore property`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => loadMorePromise, + loadSubset: () => loadSubsetPromise, } }, }, @@ -1512,7 +1512,7 @@ describe(`Collection isLoadingMore property`, () => { }) }) - collection.syncMore({}) + collection._sync.loadSubset({}) await flushPromises() expect(loadingChanges).toHaveLength(1) @@ -1521,7 +1521,7 @@ describe(`Collection isLoadingMore property`, () => { previousIsLoadingMore: false, }) - resolveLoadMore!() + resolveLoadSubset!() await flushPromises() expect(loadingChanges).toHaveLength(2) @@ -1532,12 +1532,12 @@ describe(`Collection isLoadingMore property`, () => { }) it(`rejected promises still clean up`, async () => { - let rejectLoadMore: (error: Error) => void - const loadMorePromise = new Promise((_, reject) => { - rejectLoadMore = reject + let rejectLoadSubset: (error: Error) => void + const loadSubsetPromise = new Promise((_, reject) => { + rejectLoadSubset = reject }) // Attach catch handler before rejecting to avoid unhandled rejection - const handledPromise = loadMorePromise.catch(() => {}) + const handledPromise = loadSubsetPromise.catch(() => {}) const collection = createCollection<{ id: string; value: string }>({ id: `test`, @@ -1547,17 +1547,17 @@ describe(`Collection isLoadingMore property`, () => { sync: ({ markReady }) => { markReady() return { - onLoadMore: () => handledPromise, + loadSubset: () => handledPromise, } }, }, }) - collection.syncMore({}) + collection._sync.loadSubset({}) expect(collection.isLoadingMore).toBe(true) // Reject the promise - rejectLoadMore!(new Error(`Load failed`)) + rejectLoadSubset!(new Error(`Load failed`)) await flushPromises() expect(collection.isLoadingMore).toBe(false) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 0e29fffe9..d28fa1e1f 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -982,9 +982,9 @@ describe(`createLiveQueryCollection`, () => { }) it(`source collection isLoadingMore is independent`, async () => { - let resolveLoadMore: () => void - const loadMorePromise = new Promise((resolve) => { - resolveLoadMore = resolve + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve }) const sourceCollection = createCollection<{ id: string; value: number }>({ @@ -997,7 +997,7 @@ describe(`createLiveQueryCollection`, () => { commit() markReady() return { - onLoadMore: () => loadMorePromise, + loadSubset: () => loadSubsetPromise, } }, }, @@ -1010,15 +1010,15 @@ describe(`createLiveQueryCollection`, () => { await liveQuery.preload() - // Calling syncMore directly on source collection sets its own isLoadingMore - sourceCollection.syncMore({}) + // Calling loadSubset directly on source collection sets its own isLoadingMore + sourceCollection._sync.loadSubset({}) expect(sourceCollection.isLoadingMore).toBe(true) - // But live query isLoadingMore tracks subscription-driven loads, not direct syncMore calls + // But live query isLoadingMore tracks subscription-driven loads, not direct loadSubset calls // so it remains false unless subscriptions trigger loads via predicate pushdown expect(liveQuery.isLoadingMore).toBe(false) - resolveLoadMore!() + resolveLoadSubset!() await new Promise((resolve) => setTimeout(resolve, 10)) expect(sourceCollection.isLoadingMore).toBe(false) From 7b584b1368a401ce70ac6ab7c83141d68101026b Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 13 Oct 2025 19:20:49 +0100 Subject: [PATCH 05/13] feed the subscription object through to the loadSubset call, and add an unsunbscribed event to it --- packages/db/src/collection/subscription.ts | 54 ++++++------- packages/db/src/types.ts | 92 +++++++++++++++++++++- 2 files changed, 117 insertions(+), 29 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index d79a72978..75d5ccca1 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -8,7 +8,13 @@ import { } from "./change-events.js" import type { BasicExpression, OrderBy } from "../query/ir.js" import type { IndexInterface } from "../indexes/base-index.js" -import type { ChangeMessage } from "../types.js" +import type { + ChangeMessage, + Subscription, + SubscriptionEvents, + SubscriptionStatus, + SubscriptionUnsubscribedEvent, +} from "../types.js" import type { CollectionImpl } from "./index.js" type RequestSnapshotOptions = { @@ -23,35 +29,17 @@ type RequestLimitedSnapshotOptions = { } type CollectionSubscriptionOptions = { + includeInitialState?: boolean /** Pre-compiled expression for filtering changes */ whereExpression?: BasicExpression /** Callback to call when the subscription is unsubscribed */ - onUnsubscribe?: () => void + onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void } -type SubscriptionStatus = `ready` | `loadingMore` - -interface SubscriptionStatusChangeEvent { - type: `status:change` - subscription: CollectionSubscription - previousStatus: SubscriptionStatus - status: SubscriptionStatus -} - -interface SubscriptionStatusEvent { - type: `status:${T}` - subscription: CollectionSubscription - previousStatus: SubscriptionStatus - status: T -} - -type AllSubscriptionEvents = { - "status:change": SubscriptionStatusChangeEvent - "status:ready": SubscriptionStatusEvent<`ready`> - "status:loadingMore": SubscriptionStatusEvent<`loadingMore`> -} - -export class CollectionSubscription extends EventEmitter { +export class CollectionSubscription + extends EventEmitter + implements Subscription +{ private loadedInitialState = false // Flag to indicate that we have sent at least 1 snapshot. @@ -66,7 +54,7 @@ export class CollectionSubscription extends EventEmitter private orderByIndex: IndexInterface | undefined // Status tracking - public status: SubscriptionStatus = `ready` + public readonly status: SubscriptionStatus = `ready` private pendingLoadSubsetPromises: Set> = new Set() constructor( @@ -75,6 +63,10 @@ export class CollectionSubscription extends EventEmitter private options: CollectionSubscriptionOptions ) { super() + if (options.onUnsubscribe) { + this.on(`unsubscribed`, (event) => options.onUnsubscribe!(event)) + } + // Auto-index for where expressions if enabled if (options.whereExpression) { ensureIndexForExpression(options.whereExpression, this.collection) @@ -108,6 +100,7 @@ export class CollectionSubscription extends EventEmitter } const previousStatus = this.status + // Cast to mutable for internal mutation this.status = newStatus // Emit status:change event @@ -125,7 +118,7 @@ export class CollectionSubscription extends EventEmitter subscription: this, previousStatus, status: newStatus, - } as AllSubscriptionEvents[typeof eventKey]) + } as SubscriptionEvents[typeof eventKey]) } hasLoadedInitialState() { @@ -180,6 +173,7 @@ export class CollectionSubscription extends EventEmitter // don't await it, we will load the data into the collection when it comes in const syncPromise = this.collection._sync.loadSubset({ where: stateOpts.where, + subscription: this, }) // Track the promise if it exists @@ -289,6 +283,7 @@ export class CollectionSubscription extends EventEmitter where: whereWithValueFilter, limit, orderBy, + subscription: this, }) // Track the promise if it exists @@ -347,8 +342,11 @@ export class CollectionSubscription extends EventEmitter } unsubscribe() { + this.emitInner(`unsubscribed`, { + type: `unsubscribed`, + subscription: this, + }) // Clear all event listeners to prevent memory leaks this.clearListeners() - this.options.onUnsubscribe?.() } } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index b2bd66392..275929061 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -150,17 +150,107 @@ export type Row = Record> export type OperationType = `insert` | `update` | `delete` +/** + * Subscription status values + */ +export type SubscriptionStatus = `ready` | `loadingMore` + +/** + * Event emitted when subscription status changes + */ +export interface SubscriptionStatusChangeEvent { + type: `status:change` + subscription: Subscription + previousStatus: SubscriptionStatus + status: SubscriptionStatus +} + +/** + * Event emitted when subscription status changes to a specific status + */ +export interface SubscriptionStatusEvent { + type: `status:${T}` + subscription: Subscription + previousStatus: SubscriptionStatus + status: T +} + +/** + * Event emitted when subscription is unsubscribed + */ +export interface SubscriptionUnsubscribedEvent { + type: `unsubscribed` + subscription: Subscription +} + +/** + * All subscription events + */ +export type SubscriptionEvents = { + "status:change": SubscriptionStatusChangeEvent + "status:ready": SubscriptionStatusEvent<`ready`> + "status:loadingMore": SubscriptionStatusEvent<`loadingMore`> + unsubscribed: SubscriptionUnsubscribedEvent +} + +/** + * Public interface for a collection subscription + * Used by sync implementations to track subscription lifecycle + */ +export interface Subscription { + /** Current status of the subscription */ + readonly status: SubscriptionStatus + + /** Subscribe to a subscription event */ + on: ( + event: T, + callback: (eventPayload: SubscriptionEvents[T]) => void + ) => () => void + + /** Subscribe to a subscription event once */ + once: ( + event: T, + callback: (eventPayload: SubscriptionEvents[T]) => void + ) => () => void + + /** Unsubscribe from a subscription event */ + off: ( + event: T, + callback: (eventPayload: SubscriptionEvents[T]) => void + ) => void + + /** Wait for a subscription event */ + waitFor: ( + event: T, + timeout?: number + ) => Promise +} + export type LoadSubsetOptions = { + /** The where expression to filter the data */ where?: BasicExpression + /** The order by clause to sort the data */ orderBy?: OrderBy + /** The limit of the data to load */ limit?: number + /** + * The subscription that triggered the load. + * Advanced sync implementations can use this for: + * - LRU caching keyed by subscription + * - Reference counting to track active subscriptions + * - Subscribing to subscription events (e.g., finalization/unsubscribe) + * @optional Available when called from CollectionSubscription, may be undefined for direct calls + */ + subscription?: Subscription } +export type LoadSubsetFn = (options: LoadSubsetOptions) => void | Promise + export type CleanupFn = () => void export type SyncConfigRes = { cleanup?: CleanupFn - loadSubset?: (options: LoadSubsetOptions) => void | Promise + loadSubset?: LoadSubsetFn } export interface SyncConfig< T extends object = Record, From 5636c466bc60dbb19e069538d1ca4ea7d91fdd02 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 13 Oct 2025 20:01:22 +0100 Subject: [PATCH 06/13] feed subscription through to the loadSubset callback, add unsubscribe event to the subscription, fix types --- packages/db/src/collection/subscription.ts | 13 +++++--- packages/db/src/local-only.ts | 2 +- packages/db/src/local-storage.ts | 39 ++++++++++++++++++++-- packages/db/src/types.ts | 27 ++------------- 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 75d5ccca1..9819cbad0 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -54,9 +54,13 @@ export class CollectionSubscription private orderByIndex: IndexInterface | undefined // Status tracking - public readonly status: SubscriptionStatus = `ready` + private _status: SubscriptionStatus = `ready` private pendingLoadSubsetPromises: Set> = new Set() + public get status(): SubscriptionStatus { + return this._status + } + constructor( private collection: CollectionImpl, private callback: (changes: Array>) => void, @@ -95,13 +99,12 @@ export class CollectionSubscription * Set subscription status and emit events if changed */ private setStatus(newStatus: SubscriptionStatus) { - if (this.status === newStatus) { + if (this._status === newStatus) { return // No change } - const previousStatus = this.status - // Cast to mutable for internal mutation - this.status = newStatus + const previousStatus = this._status + this._status = newStatus // Emit status:change event this.emitInner(`status:change`, { diff --git a/packages/db/src/local-only.ts b/packages/db/src/local-only.ts index 756dab21a..2122d3e1f 100644 --- a/packages/db/src/local-only.ts +++ b/packages/db/src/local-only.ts @@ -191,7 +191,7 @@ export function localOnlyCollectionOptions< const { initialData, onInsert, onUpdate, onDelete, ...restConfig } = config // Create the sync configuration with transaction confirmation capability - const syncResult = createLocalOnlySync(initialData) + const syncResult = createLocalOnlySync(initialData) /** * Create wrapper handlers that call user handlers first, then confirm transactions diff --git a/packages/db/src/local-storage.ts b/packages/db/src/local-storage.ts index cb9e3e2ce..a786a8f6e 100644 --- a/packages/db/src/local-storage.ts +++ b/packages/db/src/local-storage.ts @@ -9,11 +9,14 @@ import { import type { BaseCollectionConfig, CollectionConfig, + DeleteMutationFn, DeleteMutationFnParams, InferSchemaOutput, + InsertMutationFn, InsertMutationFnParams, PendingMutation, SyncConfig, + UpdateMutationFn, UpdateMutationFnParams, UtilsRecord, } from "./types" @@ -223,7 +226,25 @@ export function localStorageCollectionOptions< config: LocalStorageCollectionConfig, T, TKey> & { schema: T } -): CollectionConfig, TKey, T> & { +): Omit< + CollectionConfig, TKey, T>, + `onInsert` | `onUpdate` | `onDelete` +> & { + onInsert?: InsertMutationFn< + InferSchemaOutput, + TKey, + LocalStorageCollectionUtils + > + onUpdate?: UpdateMutationFn< + InferSchemaOutput, + TKey, + LocalStorageCollectionUtils + > + onDelete?: DeleteMutationFn< + InferSchemaOutput, + TKey, + LocalStorageCollectionUtils + > id: string utils: LocalStorageCollectionUtils schema: T @@ -238,7 +259,13 @@ export function localStorageCollectionOptions< config: LocalStorageCollectionConfig & { schema?: never // prohibit schema } -): CollectionConfig & { +): Omit< + CollectionConfig, + `onInsert` | `onUpdate` | `onDelete` +> & { + onInsert?: InsertMutationFn + onUpdate?: UpdateMutationFn + onDelete?: DeleteMutationFn id: string utils: LocalStorageCollectionUtils schema?: never // no schema in the result @@ -246,7 +273,13 @@ export function localStorageCollectionOptions< export function localStorageCollectionOptions( config: LocalStorageCollectionConfig -): Omit, `id`> & { +): Omit< + CollectionConfig, + `id` | `onInsert` | `onUpdate` | `onDelete` +> & { + onInsert?: InsertMutationFn + onUpdate?: UpdateMutationFn + onDelete?: DeleteMutationFn id: string utils: LocalStorageCollectionUtils schema?: StandardSchemaV1 diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 275929061..ea44638d4 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -3,6 +3,7 @@ import type { Collection } from "./collection/index.js" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { Transaction } from "./transactions" import type { BasicExpression, OrderBy } from "./query/ir.js" +import type { EventEmitter } from "./event-emitter.js" /** * Helper type to extract the output type from a standard schema @@ -197,33 +198,9 @@ export type SubscriptionEvents = { * Public interface for a collection subscription * Used by sync implementations to track subscription lifecycle */ -export interface Subscription { +export interface Subscription extends EventEmitter { /** Current status of the subscription */ readonly status: SubscriptionStatus - - /** Subscribe to a subscription event */ - on: ( - event: T, - callback: (eventPayload: SubscriptionEvents[T]) => void - ) => () => void - - /** Subscribe to a subscription event once */ - once: ( - event: T, - callback: (eventPayload: SubscriptionEvents[T]) => void - ) => () => void - - /** Unsubscribe from a subscription event */ - off: ( - event: T, - callback: (eventPayload: SubscriptionEvents[T]) => void - ) => void - - /** Wait for a subscription event */ - waitFor: ( - event: T, - timeout?: number - ) => Promise } export type LoadSubsetOptions = { From 53fa0273f39560848ef2fc9d9b1b388b50234da8 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 13 Oct 2025 21:14:20 +0100 Subject: [PATCH 07/13] add sync mode to base colleciton --- packages/db/src/collection/sync.ts | 18 +++++++++++++++++- packages/db/src/types.ts | 19 +++++++++++++++++++ .../db/tests/collection-subscription.test.ts | 5 +++++ packages/db/tests/collection.test.ts | 5 +++++ .../tests/query/live-query-collection.test.ts | 1 + 5 files changed, 47 insertions(+), 1 deletion(-) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 7f65b3e24..f5b76d977 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -1,4 +1,5 @@ import { + CollectionConfigurationError, CollectionIsInErrorStateError, DuplicateKeySyncError, NoPendingSyncTransactionCommitError, @@ -33,6 +34,7 @@ export class CollectionSyncManager< private _events!: CollectionEventsManager private config!: CollectionConfig private id: string + private syncMode: `eager` | `on-demand` public preloadPromise: Promise | null = null public syncCleanupFn: (() => void) | null = null @@ -48,6 +50,7 @@ export class CollectionSyncManager< constructor(config: CollectionConfig, id: string) { this.config = config this.id = id + this.syncMode = config.syncMode ?? `eager` } setDeps(deps: { @@ -197,6 +200,14 @@ export class CollectionSyncManager< // Store loadSubset function if provided this.syncLoadSubsetFn = syncRes?.loadSubset ?? null + + // Validate: on-demand mode requires a loadSubset function + if (this.syncMode === `on-demand` && !this.syncLoadSubsetFn) { + throw new CollectionConfigurationError( + `Collection "${this.id}" is configured with syncMode "on-demand" but the sync function did not return a loadSubset handler. ` + + `Either provide a loadSubset handler or use syncMode "eager".` + ) + } } catch (error) { this.lifecycle.setStatus(`error`) throw error @@ -292,9 +303,14 @@ export class CollectionSyncManager< * @param options Options to control what data is being loaded * @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded. * If data loading is synchronous, the data is loaded when the method returns. - * Returns undefined if no sync function is configured. + * Returns undefined if no sync function is configured or if syncMode is 'eager'. */ public loadSubset(options: LoadSubsetOptions): Promise | undefined { + // Bypass loadSubset when syncMode is 'eager' + if (this.syncMode === `eager`) { + return undefined + } + if (this.syncLoadSubsetFn) { const result = this.syncLoadSubsetFn(options) diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index ea44638d4..e8120a5ff 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -380,6 +380,16 @@ export type CollectionStatus = /** Collection has been cleaned up and resources freed */ | `cleaned-up` +/** + * @default `eager` + * @description + * Collections have two modes of sync: eager and on-demand. + * - eager: syncs all data immediately on preload + * - on-demand: syncs data in incremental snapshots when the collection is queried + * The exact implementation of the sync mode is up to the sync implementation. + */ +export type SyncMode = `eager` | `on-demand` + export interface BaseCollectionConfig< T extends object = Record, TKey extends string | number = string | number, @@ -441,6 +451,15 @@ export interface BaseCollectionConfig< * compare: (x, y) => x.createdAt.getTime() - y.createdAt.getTime() */ compare?: (x: T, y: T) => number + /** + * The mode of sync to use for the collection. + * @default `eager` + * @description + * - `eager`: syncs all data immediately on preload + * - `on-demand`: syncs data in incremental snapshots when the collection is queried + * The exact implementation of the sync mode is up to the sync implementation. + */ + syncMode?: SyncMode /** * Optional asynchronous handler function called before an insert operation * @param params Object containing transaction and collection information diff --git a/packages/db/tests/collection-subscription.test.ts b/packages/db/tests/collection-subscription.test.ts index ea7263f8b..94c83f9d5 100644 --- a/packages/db/tests/collection-subscription.test.ts +++ b/packages/db/tests/collection-subscription.test.ts @@ -29,6 +29,7 @@ describe(`CollectionSubscription status tracking`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, sync: { sync: ({ markReady }) => { markReady() @@ -70,6 +71,7 @@ describe(`CollectionSubscription status tracking`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, sync: { sync: ({ markReady }) => { markReady() @@ -102,6 +104,7 @@ describe(`CollectionSubscription status tracking`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, sync: { sync: ({ markReady }) => { markReady() @@ -160,6 +163,7 @@ describe(`CollectionSubscription status tracking`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, sync: { sync: ({ markReady }) => { markReady() @@ -215,6 +219,7 @@ describe(`CollectionSubscription status tracking`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, sync: { sync: ({ markReady }) => { markReady() diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 112ae8730..863ab0bcd 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1381,6 +1381,7 @@ describe(`Collection isLoadingMore property`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, startSync: true, sync: { sync: ({ markReady }) => { @@ -1412,6 +1413,7 @@ describe(`Collection isLoadingMore property`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, startSync: true, sync: { sync: ({ markReady }) => { @@ -1440,6 +1442,7 @@ describe(`Collection isLoadingMore property`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, startSync: true, sync: { sync: ({ markReady }) => { @@ -1489,6 +1492,7 @@ describe(`Collection isLoadingMore property`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, startSync: true, sync: { sync: ({ markReady }) => { @@ -1542,6 +1546,7 @@ describe(`Collection isLoadingMore property`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, + syncMode: `on-demand`, startSync: true, sync: { sync: ({ markReady }) => { diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index d28fa1e1f..c9b396538 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -990,6 +990,7 @@ describe(`createLiveQueryCollection`, () => { const sourceCollection = createCollection<{ id: string; value: number }>({ id: `source`, getKey: (item) => item.id, + syncMode: `on-demand`, sync: { sync: ({ markReady, begin, write, commit }) => { begin() From f78c8e148c075eb589d0356f34d688451a2f0312 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 14 Oct 2025 09:50:27 +0100 Subject: [PATCH 08/13] loadSubset fn return promise or true --- packages/db/src/collection/subscription.ts | 24 ++++++++++----------- packages/db/src/collection/sync.ts | 25 ++++++++++------------ packages/db/src/types.ts | 2 +- packages/db/tests/collection.test.ts | 24 +++++++++++++++++++++ 4 files changed, 48 insertions(+), 27 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 9819cbad0..3fe49dca6 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -174,18 +174,18 @@ export class CollectionSubscription // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - const syncPromise = this.collection._sync.loadSubset({ + const syncResult = this.collection._sync.loadSubset({ where: stateOpts.where, subscription: this, }) - // Track the promise if it exists - if (syncPromise) { - this.pendingLoadSubsetPromises.add(syncPromise) + // Track the promise if it's actually a promise (async work) + if (syncResult instanceof Promise) { + this.pendingLoadSubsetPromises.add(syncResult) this.setStatus(`loadingMore`) - syncPromise.finally(() => { - this.pendingLoadSubsetPromises.delete(syncPromise) + syncResult.finally(() => { + this.pendingLoadSubsetPromises.delete(syncResult) if (this.pendingLoadSubsetPromises.size === 0) { this.setStatus(`ready`) } @@ -282,20 +282,20 @@ export class CollectionSubscription // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - const syncPromise = this.collection._sync.loadSubset({ + const syncResult = this.collection._sync.loadSubset({ where: whereWithValueFilter, limit, orderBy, subscription: this, }) - // Track the promise if it exists - if (syncPromise) { - this.pendingLoadSubsetPromises.add(syncPromise) + // Track the promise if it's actually a promise (async work) + if (syncResult instanceof Promise) { + this.pendingLoadSubsetPromises.add(syncResult) this.setStatus(`loadingMore`) - syncPromise.finally(() => { - this.pendingLoadSubsetPromises.delete(syncPromise) + syncResult.finally(() => { + this.pendingLoadSubsetPromises.delete(syncResult) if (this.pendingLoadSubsetPromises.size === 0) { this.setStatus(`ready`) } diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index f5b76d977..e9f810097 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -39,7 +39,7 @@ export class CollectionSyncManager< public preloadPromise: Promise | null = null public syncCleanupFn: (() => void) | null = null public syncLoadSubsetFn: - | ((options: LoadSubsetOptions) => void | Promise) + | ((options: LoadSubsetOptions) => true | Promise) | null = null private pendingLoadSubsetPromises: Set> = new Set() @@ -302,27 +302,24 @@ export class CollectionSyncManager< * Requests the sync layer to load more data. * @param options Options to control what data is being loaded * @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded. - * If data loading is synchronous, the data is loaded when the method returns. - * Returns undefined if no sync function is configured or if syncMode is 'eager'. + * Returns true if no sync function is configured, if syncMode is 'eager', or if there is no work to do. */ - public loadSubset(options: LoadSubsetOptions): Promise | undefined { + public loadSubset(options: LoadSubsetOptions): Promise | true { // Bypass loadSubset when syncMode is 'eager' if (this.syncMode === `eager`) { - return undefined + return true } if (this.syncLoadSubsetFn) { const result = this.syncLoadSubsetFn(options) - - // If the result is void (synchronous), wrap in Promise.resolve() - const promise = result === undefined ? Promise.resolve() : result - - // Track the promise - this.trackLoadPromise(promise) - - return promise + // If the result is a promise, track it + if (result instanceof Promise) { + this.trackLoadPromise(result) + return result + } } - return undefined + + return true } public cleanup(): void { diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index e8120a5ff..a1cc31e53 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -221,7 +221,7 @@ export type LoadSubsetOptions = { subscription?: Subscription } -export type LoadSubsetFn = (options: LoadSubsetOptions) => void | Promise +export type LoadSubsetFn = (options: LoadSubsetOptions) => true | Promise export type CleanupFn = () => void diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 863ab0bcd..2dfa0865f 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1567,4 +1567,28 @@ describe(`Collection isLoadingMore property`, () => { expect(collection.isLoadingMore).toBe(false) }) + + it(`isLoadingMore stays false when loadSubset returns true (no work to do)`, () => { + const collection = createCollection<{ id: string; value: string }>({ + id: `test`, + getKey: (item) => item.id, + syncMode: `on-demand`, + startSync: true, + sync: { + sync: ({ markReady }) => { + markReady() + return { + loadSubset: () => true, // No work to do + } + }, + }, + }) + + expect(collection.isLoadingMore).toBe(false) + + // Call loadSubset - it should return true and not track any promise + const result = collection._sync.loadSubset({}) + expect(result).toBe(true) + expect(collection.isLoadingMore).toBe(false) + }) }) From 9ad11697a32145476ec3a7caf3cebd31bed1711f Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 14 Oct 2025 10:28:51 +0100 Subject: [PATCH 09/13] add comment on setting is loading --- packages/db/src/query/live/collection-subscriber.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index ee00c008c..ea962f354 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -75,6 +75,12 @@ export class CollectionSubscriber< // Subscribe to subscription status changes to propagate loading state const statusUnsubscribe = subscription.on(`status:change`, (event) => { + // TODO: For now we are setting this loading state whenever the subscription + // status changes to 'loadingMore'. But we have discussed it only happening + // when the the live query has it's offset/limit changed, and that triggers the + // subscription to request a snapshot. This will require more work to implement, + // and builds on https://github.com/TanStack/db/pull/663 which this PR + // does not yet depend on. if (event.status === `loadingMore`) { // Guard against duplicate transitions if (!this.subscriptionLoadingPromises.has(subscription)) { From 0a0bbb046b64c84b58c8db7d8128943f3e64df01 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 14 Oct 2025 17:51:06 +0100 Subject: [PATCH 10/13] address review --- packages/db/src/collection/events.ts | 13 +++-- packages/db/src/collection/index.ts | 6 +- packages/db/src/collection/subscription.ts | 44 +++++++------- packages/db/src/collection/sync.ts | 35 +++++------ .../src/query/live/collection-subscriber.ts | 4 +- packages/db/src/types.ts | 12 +--- .../db/tests/collection-subscription.test.ts | 22 +++---- packages/db/tests/collection.test.ts | 58 +++++++++---------- .../tests/query/live-query-collection.test.ts | 26 ++++----- 9 files changed, 105 insertions(+), 115 deletions(-) diff --git a/packages/db/src/collection/events.ts b/packages/db/src/collection/events.ts index 23f2dd87c..0f6d8cec1 100644 --- a/packages/db/src/collection/events.ts +++ b/packages/db/src/collection/events.ts @@ -35,17 +35,18 @@ export interface CollectionSubscribersChangeEvent { /** * Event emitted when the collection's loading more state changes */ -export interface CollectionLoadingMoreChangeEvent { - type: `loadingMore:change` +export interface CollectionLoadingSubsetChangeEvent { + type: `loadingSubset:change` collection: Collection - isLoadingMore: boolean - previousIsLoadingMore: boolean + isLoadingSubset: boolean + previousIsLoadingSubset: boolean + loadingSubsetTransition: `start` | `end` } export type AllCollectionEvents = { "status:change": CollectionStatusChangeEvent "subscribers:change": CollectionSubscribersChangeEvent - "loadingMore:change": CollectionLoadingMoreChangeEvent + "loadingSubset:change": CollectionLoadingSubsetChangeEvent } & { [K in CollectionStatus as `status:${K}`]: CollectionStatusEvent } @@ -54,7 +55,7 @@ export type CollectionEvent = | AllCollectionEvents[keyof AllCollectionEvents] | CollectionStatusChangeEvent | CollectionSubscribersChangeEvent - | CollectionLoadingMoreChangeEvent + | CollectionLoadingSubsetChangeEvent export type CollectionEventHandler = ( event: AllCollectionEvents[T] diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index 868308833..769746603 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -359,8 +359,8 @@ export class CollectionImpl< * Check if the collection is currently loading more data * @returns true if the collection has pending load more operations, false otherwise */ - public get isLoadingMore(): boolean { - return this._sync.isLoadingMore + public get isLoadingSubset(): boolean { + return this._sync.isLoadingSubset } /** @@ -372,7 +372,7 @@ export class CollectionImpl< } /** - * Tracks a load promise for isLoadingMore state. + * Tracks a load promise for isLoadingSubset state. * @internal This is for internal coordination (e.g., live-query glue code), not for general use. */ public trackLoadPromise(promise: Promise): void { diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 3fe49dca6..d6a43a256 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -124,6 +124,24 @@ export class CollectionSubscription } as SubscriptionEvents[typeof eventKey]) } + /** + * Track a loadSubset promise and manage loading status + */ + private trackLoadSubsetPromise(syncResult: Promise | true) { + // Track the promise if it's actually a promise (async work) + if (syncResult instanceof Promise) { + this.pendingLoadSubsetPromises.add(syncResult) + this.setStatus(`loadingSubset`) + + syncResult.finally(() => { + this.pendingLoadSubsetPromises.delete(syncResult) + if (this.pendingLoadSubsetPromises.size === 0) { + this.setStatus(`ready`) + } + }) + } + } + hasLoadedInitialState() { return this.loadedInitialState } @@ -179,18 +197,7 @@ export class CollectionSubscription subscription: this, }) - // Track the promise if it's actually a promise (async work) - if (syncResult instanceof Promise) { - this.pendingLoadSubsetPromises.add(syncResult) - this.setStatus(`loadingMore`) - - syncResult.finally(() => { - this.pendingLoadSubsetPromises.delete(syncResult) - if (this.pendingLoadSubsetPromises.size === 0) { - this.setStatus(`ready`) - } - }) - } + this.trackLoadSubsetPromise(syncResult) // Also load data immediately from the collection const snapshot = this.collection.currentStateAsChanges(stateOpts) @@ -289,18 +296,7 @@ export class CollectionSubscription subscription: this, }) - // Track the promise if it's actually a promise (async work) - if (syncResult instanceof Promise) { - this.pendingLoadSubsetPromises.add(syncResult) - this.setStatus(`loadingMore`) - - syncResult.finally(() => { - this.pendingLoadSubsetPromises.delete(syncResult) - if (this.pendingLoadSubsetPromises.size === 0) { - this.setStatus(`ready`) - } - }) - } + this.trackLoadSubsetPromise(syncResult) } /** diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index e9f810097..251fba608 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -259,40 +259,41 @@ export class CollectionSyncManager< /** * Gets whether the collection is currently loading more data */ - public get isLoadingMore(): boolean { + public get isLoadingSubset(): boolean { return this.pendingLoadSubsetPromises.size > 0 } /** - * Tracks a load promise for isLoadingMore state. + * Tracks a load promise for isLoadingSubset state. * @internal This is for internal coordination (e.g., live-query glue code), not for general use. */ public trackLoadPromise(promise: Promise): void { - const wasLoading = this.isLoadingMore + const loadingStarting = !this.isLoadingSubset this.pendingLoadSubsetPromises.add(promise) - const isLoadingNow = this.isLoadingMore - if (!wasLoading && isLoadingNow) { - this._events.emit(`loadingMore:change`, { - type: `loadingMore:change`, + if (loadingStarting) { + this._events.emit(`loadingSubset:change`, { + type: `loadingSubset:change`, collection: this.collection, - isLoadingMore: true, - previousIsLoadingMore: false, + isLoadingSubset: true, + previousIsLoadingSubset: false, + loadingSubsetTransition: `start`, }) } promise.finally(() => { - // Check loading state BEFORE removing the promise - const wasLoadingBeforeRemoval = this.isLoadingMore + const loadingEnding = + this.pendingLoadSubsetPromises.size === 1 && + this.pendingLoadSubsetPromises.has(promise) this.pendingLoadSubsetPromises.delete(promise) - const stillLoading = this.isLoadingMore - if (wasLoadingBeforeRemoval && !stillLoading) { - this._events.emit(`loadingMore:change`, { - type: `loadingMore:change`, + if (loadingEnding) { + this._events.emit(`loadingSubset:change`, { + type: `loadingSubset:change`, collection: this.collection, - isLoadingMore: false, - previousIsLoadingMore: true, + isLoadingSubset: false, + previousIsLoadingSubset: true, + loadingSubsetTransition: `end`, }) } }) diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index ea962f354..a3b2eb700 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -76,12 +76,12 @@ export class CollectionSubscriber< // Subscribe to subscription status changes to propagate loading state const statusUnsubscribe = subscription.on(`status:change`, (event) => { // TODO: For now we are setting this loading state whenever the subscription - // status changes to 'loadingMore'. But we have discussed it only happening + // status changes to 'loadingSubset'. But we have discussed it only happening // when the the live query has it's offset/limit changed, and that triggers the // subscription to request a snapshot. This will require more work to implement, // and builds on https://github.com/TanStack/db/pull/663 which this PR // does not yet depend on. - if (event.status === `loadingMore`) { + if (event.status === `loadingSubset`) { // Guard against duplicate transitions if (!this.subscriptionLoadingPromises.has(subscription)) { let resolve: () => void diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index a1cc31e53..73dfc229e 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -154,7 +154,7 @@ export type OperationType = `insert` | `update` | `delete` /** * Subscription status values */ -export type SubscriptionStatus = `ready` | `loadingMore` +export type SubscriptionStatus = `ready` | `loadingSubset` /** * Event emitted when subscription status changes @@ -190,7 +190,7 @@ export interface SubscriptionUnsubscribedEvent { export type SubscriptionEvents = { "status:change": SubscriptionStatusChangeEvent "status:ready": SubscriptionStatusEvent<`ready`> - "status:loadingMore": SubscriptionStatusEvent<`loadingMore`> + "status:loadingSubset": SubscriptionStatusEvent<`loadingSubset`> unsubscribed: SubscriptionUnsubscribedEvent } @@ -380,14 +380,6 @@ export type CollectionStatus = /** Collection has been cleaned up and resources freed */ | `cleaned-up` -/** - * @default `eager` - * @description - * Collections have two modes of sync: eager and on-demand. - * - eager: syncs all data immediately on preload - * - on-demand: syncs data in incremental snapshots when the collection is queried - * The exact implementation of the sync mode is up to the sync implementation. - */ export type SyncMode = `eager` | `on-demand` export interface BaseCollectionConfig< diff --git a/packages/db/tests/collection-subscription.test.ts b/packages/db/tests/collection-subscription.test.ts index 94c83f9d5..0b9e09384 100644 --- a/packages/db/tests/collection-subscription.test.ts +++ b/packages/db/tests/collection-subscription.test.ts @@ -20,7 +20,7 @@ describe(`CollectionSubscription status tracking`, () => { subscription.unsubscribe() }) - it(`status changes to 'loadingMore' when requestSnapshot triggers a promise`, async () => { + it(`status changes to 'loadingSubset' when requestSnapshot triggers a promise`, async () => { let resolveLoadSubset: () => void const loadSubsetPromise = new Promise((resolve) => { resolveLoadSubset = resolve @@ -49,8 +49,8 @@ describe(`CollectionSubscription status tracking`, () => { // Trigger a snapshot request that will call loadSubset subscription.requestSnapshot({ optimizedOnly: false }) - // Status should now be loadingMore - expect(subscription.status).toBe(`loadingMore`) + // Status should now be loadingSubset + expect(subscription.status).toBe(`loadingSubset`) // Resolve the load more promise resolveLoadSubset!() @@ -87,7 +87,7 @@ describe(`CollectionSubscription status tracking`, () => { }) subscription.requestSnapshot({ optimizedOnly: false }) - expect(subscription.status).toBe(`loadingMore`) + expect(subscription.status).toBe(`loadingSubset`) resolveLoadSubset!() await flushPromises() @@ -96,7 +96,7 @@ describe(`CollectionSubscription status tracking`, () => { subscription.unsubscribe() }) - it(`concurrent promises keep status as 'loadingMore' until all resolve`, async () => { + it(`concurrent promises keep status as 'loadingSubset' until all resolve`, async () => { let resolveLoadSubset1: () => void let resolveLoadSubset2: () => void let callCount = 0 @@ -132,18 +132,18 @@ describe(`CollectionSubscription status tracking`, () => { // Trigger first load subscription.requestSnapshot({ optimizedOnly: false }) - expect(subscription.status).toBe(`loadingMore`) + expect(subscription.status).toBe(`loadingSubset`) // Trigger second load subscription.requestSnapshot({ optimizedOnly: false }) - expect(subscription.status).toBe(`loadingMore`) + expect(subscription.status).toBe(`loadingSubset`) // Resolve first promise resolveLoadSubset1!() await flushPromises() // Should still be loading because second promise is pending - expect(subscription.status).toBe(`loadingMore`) + expect(subscription.status).toBe(`loadingSubset`) // Resolve second promise resolveLoadSubset2!() @@ -193,7 +193,7 @@ describe(`CollectionSubscription status tracking`, () => { expect(statusChanges).toHaveLength(1) expect(statusChanges[0]).toEqual({ previous: `ready`, - current: `loadingMore`, + current: `loadingSubset`, }) resolveLoadSubset!() @@ -201,7 +201,7 @@ describe(`CollectionSubscription status tracking`, () => { expect(statusChanges).toHaveLength(2) expect(statusChanges[1]).toEqual({ - previous: `loadingMore`, + previous: `loadingSubset`, current: `ready`, }) @@ -235,7 +235,7 @@ describe(`CollectionSubscription status tracking`, () => { }) subscription.requestSnapshot({ optimizedOnly: false }) - expect(subscription.status).toBe(`loadingMore`) + expect(subscription.status).toBe(`loadingSubset`) // Reject the promise rejectLoadSubset!(new Error(`Load failed`)) diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 2dfa0865f..a5a94fb06 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1357,8 +1357,8 @@ describe(`Collection`, () => { }) }) -describe(`Collection isLoadingMore property`, () => { - it(`isLoadingMore is false initially`, () => { +describe(`Collection isLoadingSubset property`, () => { + it(`isLoadingSubset is false initially`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, @@ -1369,10 +1369,10 @@ describe(`Collection isLoadingMore property`, () => { }, }) - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) }) - it(`isLoadingMore becomes true when loadSubset returns a promise`, async () => { + it(`isLoadingSubset becomes true when loadSubset returns a promise`, async () => { let resolveLoadSubset: () => void const loadSubsetPromise = new Promise((resolve) => { resolveLoadSubset = resolve @@ -1393,18 +1393,18 @@ describe(`Collection isLoadingMore property`, () => { }, }) - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) collection._sync.loadSubset({}) - expect(collection.isLoadingMore).toBe(true) + expect(collection.isLoadingSubset).toBe(true) resolveLoadSubset!() await flushPromises() - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) }) - it(`isLoadingMore becomes false when promise resolves`, async () => { + it(`isLoadingSubset becomes false when promise resolves`, async () => { let resolveLoadSubset: () => void const loadSubsetPromise = new Promise((resolve) => { resolveLoadSubset = resolve @@ -1426,15 +1426,15 @@ describe(`Collection isLoadingMore property`, () => { }) collection._sync.loadSubset({}) - expect(collection.isLoadingMore).toBe(true) + expect(collection.isLoadingSubset).toBe(true) resolveLoadSubset!() await flushPromises() - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) }) - it(`concurrent loadSubset calls keep isLoadingMore true until all resolve`, async () => { + it(`concurrent loadSubset calls keep isLoadingSubset true until all resolve`, async () => { let resolveLoadSubset1: () => void let resolveLoadSubset2: () => void let callCount = 0 @@ -1468,22 +1468,22 @@ describe(`Collection isLoadingMore property`, () => { collection._sync.loadSubset({}) collection._sync.loadSubset({}) - expect(collection.isLoadingMore).toBe(true) + expect(collection.isLoadingSubset).toBe(true) resolveLoadSubset1!() await flushPromises() // Should still be loading because second promise is pending - expect(collection.isLoadingMore).toBe(true) + expect(collection.isLoadingSubset).toBe(true) resolveLoadSubset2!() await flushPromises() // Now should be false - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) }) - it(`emits loadingMore:change event`, async () => { + it(`emits loadingSubset:change event`, async () => { let resolveLoadSubset: () => void const loadSubsetPromise = new Promise((resolve) => { resolveLoadSubset = resolve @@ -1505,14 +1505,14 @@ describe(`Collection isLoadingMore property`, () => { }) const loadingChanges: Array<{ - isLoadingMore: boolean - previousIsLoadingMore: boolean + isLoadingSubset: boolean + previousIsLoadingSubset: boolean }> = [] - collection.on(`loadingMore:change`, (event) => { + collection.on(`loadingSubset:change`, (event) => { loadingChanges.push({ - isLoadingMore: event.isLoadingMore, - previousIsLoadingMore: event.previousIsLoadingMore, + isLoadingSubset: event.isLoadingSubset, + previousIsLoadingSubset: event.previousIsLoadingSubset, }) }) @@ -1521,8 +1521,8 @@ describe(`Collection isLoadingMore property`, () => { expect(loadingChanges).toHaveLength(1) expect(loadingChanges[0]).toEqual({ - isLoadingMore: true, - previousIsLoadingMore: false, + isLoadingSubset: true, + previousIsLoadingSubset: false, }) resolveLoadSubset!() @@ -1530,8 +1530,8 @@ describe(`Collection isLoadingMore property`, () => { expect(loadingChanges).toHaveLength(2) expect(loadingChanges[1]).toEqual({ - isLoadingMore: false, - previousIsLoadingMore: true, + isLoadingSubset: false, + previousIsLoadingSubset: true, }) }) @@ -1559,16 +1559,16 @@ describe(`Collection isLoadingMore property`, () => { }) collection._sync.loadSubset({}) - expect(collection.isLoadingMore).toBe(true) + expect(collection.isLoadingSubset).toBe(true) // Reject the promise rejectLoadSubset!(new Error(`Load failed`)) await flushPromises() - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) }) - it(`isLoadingMore stays false when loadSubset returns true (no work to do)`, () => { + it(`isLoadingSubset stays false when loadSubset returns true (no work to do)`, () => { const collection = createCollection<{ id: string; value: string }>({ id: `test`, getKey: (item) => item.id, @@ -1584,11 +1584,11 @@ describe(`Collection isLoadingMore property`, () => { }, }) - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) // Call loadSubset - it should return true and not track any promise const result = collection._sync.loadSubset({}) expect(result).toBe(true) - expect(collection.isLoadingMore).toBe(false) + expect(collection.isLoadingSubset).toBe(false) }) }) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index c9b396538..bb1895e2c 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -938,8 +938,8 @@ describe(`createLiveQueryCollection`, () => { }) }) - describe(`isLoadingMore integration`, () => { - it(`live query result collection has isLoadingMore property`, async () => { + describe(`isLoadingSubset integration`, () => { + it(`live query result collection has isLoadingSubset property`, async () => { const sourceCollection = createCollection<{ id: string; value: string }>({ id: `source`, getKey: (item) => item.id, @@ -956,11 +956,11 @@ describe(`createLiveQueryCollection`, () => { await liveQuery.preload() - expect(liveQuery.isLoadingMore).toBeDefined() - expect(liveQuery.isLoadingMore).toBe(false) + expect(liveQuery.isLoadingSubset).toBeDefined() + expect(liveQuery.isLoadingSubset).toBe(false) }) - it(`isLoadingMore property exists and starts as false`, async () => { + it(`isLoadingSubset property exists and starts as false`, async () => { const sourceCollection = createCollection<{ id: string; value: string }>({ id: `source`, getKey: (item) => item.id, @@ -978,10 +978,10 @@ describe(`createLiveQueryCollection`, () => { await liveQuery.preload() - expect(liveQuery.isLoadingMore).toBe(false) + expect(liveQuery.isLoadingSubset).toBe(false) }) - it(`source collection isLoadingMore is independent`, async () => { + it(`source collection isLoadingSubset is independent`, async () => { let resolveLoadSubset: () => void const loadSubsetPromise = new Promise((resolve) => { resolveLoadSubset = resolve @@ -1011,19 +1011,19 @@ describe(`createLiveQueryCollection`, () => { await liveQuery.preload() - // Calling loadSubset directly on source collection sets its own isLoadingMore + // Calling loadSubset directly on source collection sets its own isLoadingSubset sourceCollection._sync.loadSubset({}) - expect(sourceCollection.isLoadingMore).toBe(true) + expect(sourceCollection.isLoadingSubset).toBe(true) - // But live query isLoadingMore tracks subscription-driven loads, not direct loadSubset calls + // But live query isLoadingSubset tracks subscription-driven loads, not direct loadSubset calls // so it remains false unless subscriptions trigger loads via predicate pushdown - expect(liveQuery.isLoadingMore).toBe(false) + expect(liveQuery.isLoadingSubset).toBe(false) resolveLoadSubset!() await new Promise((resolve) => setTimeout(resolve, 10)) - expect(sourceCollection.isLoadingMore).toBe(false) - expect(liveQuery.isLoadingMore).toBe(false) + expect(sourceCollection.isLoadingSubset).toBe(false) + expect(liveQuery.isLoadingSubset).toBe(false) }) }) }) From f8cd6feba6b62bc59ac613345536a467c6d9d4dc Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 15 Oct 2025 11:57:27 +0100 Subject: [PATCH 11/13] remove public trackLoadPromise --- packages/db/src/collection/index.ts | 8 -------- packages/db/src/query/live/collection-subscriber.ts | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index 87830a6d2..5a4ab3c07 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -371,14 +371,6 @@ export class CollectionImpl< this._sync.startSync() } - /** - * Tracks a load promise for isLoadingSubset state. - * @internal This is for internal coordination (e.g., live-query glue code), not for general use. - */ - public trackLoadPromise(promise: Promise): void { - this._sync.trackLoadPromise(promise) - } - /** * Preload the collection data by starting sync if not already started * Multiple concurrent calls will share the same promise diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index a3b2eb700..5835a2f0e 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -92,7 +92,7 @@ export class CollectionSubscriber< this.subscriptionLoadingPromises.set(subscription, { resolve: resolve!, }) - this.collectionConfigBuilder.liveQueryCollection!.trackLoadPromise( + this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise( promise ) } From 1c54b1b01a8af3eb5367a918c9abcbadf5818f00 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 15 Oct 2025 12:54:28 +0100 Subject: [PATCH 12/13] setWindow returns a promise when it triggers loading subset --- .../query/live/collection-config-builder.ts | 25 ++- .../tests/query/live-query-collection.test.ts | 154 ++++++++++++++++++ 2 files changed, 177 insertions(+), 2 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 81b92a98c..08302fc39 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -39,8 +39,10 @@ export type LiveQueryCollectionUtils = UtilsRecord & { /** * Sets the offset and limit of an ordered query. * Is a no-op if the query is not ordered. + * + * @returns `true` if no subset loading was triggered, or `Promise` that resolves when the subset has been loaded */ - setWindow: (options: WindowOptions) => void + setWindow: (options: WindowOptions) => true | Promise } type PendingGraphRun = { @@ -189,13 +191,32 @@ export class CollectionConfigBuilder< } } - setWindow(options: WindowOptions) { + setWindow(options: WindowOptions): true | Promise { if (!this.windowFn) { throw new SetWindowRequiresOrderByError() } this.windowFn(options) this.maybeRunGraphFn?.() + + // Check if loading a subset was triggered + if (this.liveQueryCollection?.isLoadingSubset) { + // Loading was triggered, return a promise that resolves when it completes + return new Promise((resolve) => { + const unsubscribe = this.liveQueryCollection!.on( + `loadingSubset:change`, + (event) => { + if (!event.isLoadingSubset) { + unsubscribe() + resolve() + } + } + ) + }) + } + + // No loading was triggered + return true } /** diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index cda95c169..87f407f2c 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -1551,5 +1551,159 @@ describe(`createLiveQueryCollection`, () => { `Post F`, ]) }) + + it(`setWindow returns true when no subset loading is triggered`, async () => { + const extendedUsers = createCollection( + mockSyncCollectionOptions({ + id: `users-no-loading`, + getKey: (user) => user.id, + initialData: [ + { id: 1, name: `Alice`, active: true }, + { id: 2, name: `Bob`, active: true }, + { id: 3, name: `Charlie`, active: true }, + ], + }) + ) + + const activeUsers = createLiveQueryCollection((q) => + q + .from({ user: extendedUsers }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `asc`) + .limit(2) + ) + + await activeUsers.preload() + + // setWindow should return true when no loading is triggered + const result = activeUsers.utils.setWindow({ offset: 1, limit: 2 }) + expect(result).toBe(true) + }) + + it(`setWindow returns and resolves a Promise when async loading is triggered`, async () => { + // This is an integration test that validates the full async flow: + // 1. setWindow triggers loading more data + // 2. Returns a Promise (not true) + // 3. The Promise waits for loading to complete + // 4. The Promise resolves once loading is done + + vi.useFakeTimers() + + try { + let loadSubsetCallCount = 0 + + const sourceCollection = createCollection<{ + id: number + value: number + }>({ + id: `source-async-subset-loading`, + getKey: (item) => item.id, + syncMode: `on-demand`, + startSync: true, + sync: { + sync: ({ markReady, begin, write, commit }) => { + // Provide minimal initial data + begin() + write({ type: `insert`, value: { id: 1, value: 1 } }) + write({ type: `insert`, value: { id: 2, value: 2 } }) + write({ type: `insert`, value: { id: 3, value: 3 } }) + commit() + markReady() + + return { + loadSubset: () => { + loadSubsetCallCount++ + + // First call is for the initial window request + if (loadSubsetCallCount === 1) { + return true + } + + // Second call (triggered by setWindow) returns a promise + const loadPromise = new Promise((resolve) => { + // Simulate async data loading with a delay + setTimeout(() => { + begin() + // Load additional items that would be needed for the new window + write({ type: `insert`, value: { id: 4, value: 4 } }) + write({ type: `insert`, value: { id: 5, value: 5 } }) + write({ type: `insert`, value: { id: 6, value: 6 } }) + commit() + resolve() + }, 50) + }) + + return loadPromise + }, + } + }, + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .orderBy(({ item }) => item.value, `asc`) + .limit(2) + .offset(0), + startSync: true, + }) + + await liveQuery.preload() + + // Initial state: should have 2 items (values 1, 2) + expect(liveQuery.size).toBe(2) + expect(liveQuery.isLoadingSubset).toBe(false) + expect(loadSubsetCallCount).toBe(1) + + // Move window to offset 3, which requires loading more data + // This should trigger loadSubset and return a Promise + const result = liveQuery.utils.setWindow({ offset: 3, limit: 2 }) + + // CRITICAL VALIDATION: result should be a Promise, not true + expect(result).toBeInstanceOf(Promise) + expect(result).not.toBe(true) + + // Advance just a bit to let the scheduler execute and trigger loadSubset + await vi.advanceTimersByTimeAsync(1) + + // Verify that loading was triggered and is in progress + expect(loadSubsetCallCount).toBeGreaterThan(1) + expect(liveQuery.isLoadingSubset).toBe(true) + + // Track when the promise resolves + let promiseResolved = false + if (result !== true) { + result.then(() => { + promiseResolved = true + }) + } + + // Promise should NOT be resolved yet because loading is still in progress + await vi.advanceTimersByTimeAsync(10) + expect(promiseResolved).toBe(false) + expect(liveQuery.isLoadingSubset).toBe(true) + + // Now advance time to complete the loading (50ms total from loadSubset call) + await vi.advanceTimersByTimeAsync(40) + + // Wait for the promise to resolve + if (result !== true) { + await result + } + + // CRITICAL VALIDATION: Promise has resolved and loading is complete + expect(promiseResolved).toBe(true) + expect(liveQuery.isLoadingSubset).toBe(false) + + // Verify the window was successfully moved and has the right data + expect(liveQuery.size).toBe(2) + const items = liveQuery.toArray + expect(items.map((i) => i.value)).toEqual([4, 5]) + } finally { + vi.useRealTimers() + } + }) }) }) From f31a67e4f6e7ee26d1f1f42df7cbd2a651909c69 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 15 Oct 2025 11:48:26 -0600 Subject: [PATCH 13/13] feat: implement useLiveInfiniteQuery hook for React (#666) * feat: implement useLiveInfiniteQuery hook for React * use the new utils.setWindow to page through the results improve types add test that checks that we detect new pages on more rows syncing changeset tweaks * isFetchingNextPage set by promise from setWindow --------- Co-authored-by: Sam Willis --- .changeset/smooth-goats-ring.md | 58 ++ packages/db/src/query/index.ts | 1 + packages/react-db/src/index.ts | 1 + packages/react-db/src/useLiveInfiniteQuery.ts | 185 ++++ .../tests/useLiveInfiniteQuery.test.tsx | 967 ++++++++++++++++++ 5 files changed, 1212 insertions(+) create mode 100644 .changeset/smooth-goats-ring.md create mode 100644 packages/react-db/src/useLiveInfiniteQuery.ts create mode 100644 packages/react-db/tests/useLiveInfiniteQuery.test.tsx diff --git a/.changeset/smooth-goats-ring.md b/.changeset/smooth-goats-ring.md new file mode 100644 index 000000000..6fc227d34 --- /dev/null +++ b/.changeset/smooth-goats-ring.md @@ -0,0 +1,58 @@ +--- +"@tanstack/react-db": patch +--- + +Add `useLiveInfiniteQuery` hook for infinite scrolling with live updates. + +The new `useLiveInfiniteQuery` hook provides an infinite query pattern similar to TanStack Query's `useInfiniteQuery`, but with live updates from your local collection. It uses `liveQueryCollection.utils.setWindow()` internally to efficiently paginate through ordered data without recreating the query on each page fetch. + +**Key features:** + +- Automatic live updates as data changes in the collection +- Efficient pagination using dynamic window adjustment +- Peek-ahead mechanism to detect when more pages are available +- Compatible with TanStack Query's infinite query API patterns + +**Example usage:** + +```tsx +import { useLiveInfiniteQuery } from "@tanstack/react-db" + +function PostList() { + const { data, pages, fetchNextPage, hasNextPage, isLoading } = + useLiveInfiniteQuery( + (q) => + q + .from({ posts: postsCollection }) + .orderBy(({ posts }) => posts.createdAt, "desc"), + { + pageSize: 20, + getNextPageParam: (lastPage, allPages) => + lastPage.length === 20 ? allPages.length : undefined, + } + ) + + if (isLoading) return
Loading...
+ + return ( +
+ {pages.map((page, i) => ( +
+ {page.map((post) => ( + + ))} +
+ ))} + {hasNextPage && ( + + )} +
+ ) +} +``` + +**Requirements:** + +- Query must include `.orderBy()` for the window mechanism to work +- Returns flattened `data` array and `pages` array for flexible rendering +- Automatically detects new pages when data is synced to the collection diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index c5e5873cc..17f4dd8e7 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -56,3 +56,4 @@ export { } from "./live-query-collection.js" export { type LiveQueryCollectionConfig } from "./live/types.js" +export { type LiveQueryCollectionUtils } from "./live/collection-config-builder.js" diff --git a/packages/react-db/src/index.ts b/packages/react-db/src/index.ts index bd98349f0..bb3cd3ad5 100644 --- a/packages/react-db/src/index.ts +++ b/packages/react-db/src/index.ts @@ -1,5 +1,6 @@ // Re-export all public APIs export * from "./useLiveQuery" +export * from "./useLiveInfiniteQuery" // Re-export everything from @tanstack/db export * from "@tanstack/db" diff --git a/packages/react-db/src/useLiveInfiniteQuery.ts b/packages/react-db/src/useLiveInfiniteQuery.ts new file mode 100644 index 000000000..1e44fd16e --- /dev/null +++ b/packages/react-db/src/useLiveInfiniteQuery.ts @@ -0,0 +1,185 @@ +import { useCallback, useEffect, useMemo, useRef, useState } from "react" +import { useLiveQuery } from "./useLiveQuery" +import type { + Context, + InferResultType, + InitialQueryBuilder, + LiveQueryCollectionUtils, + QueryBuilder, +} from "@tanstack/db" + +/** + * Type guard to check if utils object has setWindow method (LiveQueryCollectionUtils) + */ +function isLiveQueryCollectionUtils( + utils: unknown +): utils is LiveQueryCollectionUtils { + return typeof (utils as any).setWindow === `function` +} + +export type UseLiveInfiniteQueryConfig = { + pageSize?: number + initialPageParam?: number + getNextPageParam: ( + lastPage: Array[number]>, + allPages: Array[number]>>, + lastPageParam: number, + allPageParams: Array + ) => number | undefined +} + +export type UseLiveInfiniteQueryReturn = Omit< + ReturnType>, + `data` +> & { + data: InferResultType + pages: Array[number]>> + pageParams: Array + fetchNextPage: () => void + hasNextPage: boolean + isFetchingNextPage: boolean +} + +/** + * Create an infinite query using a query function with live updates + * + * Uses `utils.setWindow()` to dynamically adjust the limit/offset window + * without recreating the live query collection on each page change. + * + * @param queryFn - Query function that defines what data to fetch. Must include `.orderBy()` for setWindow to work. + * @param config - Configuration including pageSize and getNextPageParam + * @param deps - Array of dependencies that trigger query re-execution when changed + * @returns Object with pages, data, and pagination controls + * + * @example + * // Basic infinite query + * const { data, pages, fetchNextPage, hasNextPage } = useLiveInfiniteQuery( + * (q) => q + * .from({ posts: postsCollection }) + * .orderBy(({ posts }) => posts.createdAt, 'desc') + * .select(({ posts }) => ({ + * id: posts.id, + * title: posts.title + * })), + * { + * pageSize: 20, + * getNextPageParam: (lastPage, allPages) => + * lastPage.length === 20 ? allPages.length : undefined + * } + * ) + * + * @example + * // With dependencies + * const { pages, fetchNextPage } = useLiveInfiniteQuery( + * (q) => q + * .from({ posts: postsCollection }) + * .where(({ posts }) => eq(posts.category, category)) + * .orderBy(({ posts }) => posts.createdAt, 'desc'), + * { + * pageSize: 10, + * getNextPageParam: (lastPage) => + * lastPage.length === 10 ? lastPage.length : undefined + * }, + * [category] + * ) + */ +export function useLiveInfiniteQuery( + queryFn: (q: InitialQueryBuilder) => QueryBuilder, + config: UseLiveInfiniteQueryConfig, + deps: Array = [] +): UseLiveInfiniteQueryReturn { + const pageSize = config.pageSize || 20 + const initialPageParam = config.initialPageParam ?? 0 + + // Track how many pages have been loaded + const [loadedPageCount, setLoadedPageCount] = useState(1) + const [isFetchingNextPage, setIsFetchingNextPage] = useState(false) + + // Stringify deps for comparison + const depsKey = JSON.stringify(deps) + const prevDepsKeyRef = useRef(depsKey) + + // Reset page count when dependencies change + useEffect(() => { + if (prevDepsKeyRef.current !== depsKey) { + setLoadedPageCount(1) + prevDepsKeyRef.current = depsKey + } + }, [depsKey]) + + // Create a live query with initial limit and offset + // The query function is wrapped to add limit/offset to the query + const queryResult = useLiveQuery( + (q) => queryFn(q).limit(pageSize).offset(0), + deps + ) + + // Update the window when loadedPageCount changes + // We fetch one extra item to peek if there's a next page + useEffect(() => { + const newLimit = loadedPageCount * pageSize + 1 // +1 to peek ahead + const utils = queryResult.collection.utils + // setWindow is available on live query collections with orderBy + if (isLiveQueryCollectionUtils(utils)) { + const result = utils.setWindow({ offset: 0, limit: newLimit }) + // setWindow returns true if data is immediately available, or Promise if loading + if (result !== true) { + setIsFetchingNextPage(true) + result.then(() => { + setIsFetchingNextPage(false) + }) + } else { + setIsFetchingNextPage(false) + } + } + }, [loadedPageCount, pageSize, queryResult.collection]) + + // Split the data array into pages and determine if there's a next page + const { pages, pageParams, hasNextPage, flatData } = useMemo(() => { + const dataArray = queryResult.data as InferResultType + const totalItemsRequested = loadedPageCount * pageSize + + // Check if we have more data than requested (the peek ahead item) + const hasMore = dataArray.length > totalItemsRequested + + // Build pages array (without the peek ahead item) + const pagesResult: Array[number]>> = [] + const pageParamsResult: Array = [] + + for (let i = 0; i < loadedPageCount; i++) { + const pageData = dataArray.slice(i * pageSize, (i + 1) * pageSize) + pagesResult.push(pageData) + pageParamsResult.push(initialPageParam + i) + } + + // Flatten the pages for the data return (without peek ahead item) + const flatDataResult = dataArray.slice( + 0, + totalItemsRequested + ) as InferResultType + + return { + pages: pagesResult, + pageParams: pageParamsResult, + hasNextPage: hasMore, + flatData: flatDataResult, + } + }, [queryResult.data, loadedPageCount, pageSize, initialPageParam]) + + // Fetch next page + const fetchNextPage = useCallback(() => { + if (!hasNextPage || isFetchingNextPage) return + + setLoadedPageCount((prev) => prev + 1) + }, [hasNextPage, isFetchingNextPage]) + + return { + ...queryResult, + data: flatData, + pages, + pageParams, + fetchNextPage, + hasNextPage, + isFetchingNextPage, + } +} diff --git a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx new file mode 100644 index 000000000..0a8c3eeca --- /dev/null +++ b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx @@ -0,0 +1,967 @@ +import { describe, expect, it } from "vitest" +import { act, renderHook, waitFor } from "@testing-library/react" +import { createCollection, eq } from "@tanstack/db" +import { useLiveInfiniteQuery } from "../src/useLiveInfiniteQuery" +import { mockSyncCollectionOptions } from "../../db/tests/utils" + +type Post = { + id: string + title: string + content: string + createdAt: number + category: string +} + +const createMockPosts = (count: number): Array => { + const posts: Array = [] + for (let i = 1; i <= count; i++) { + posts.push({ + id: `${i}`, + title: `Post ${i}`, + content: `Content ${i}`, + createdAt: 1000000 - i * 1000, // Descending order + category: i % 2 === 0 ? `tech` : `life`, + }) + } + return posts +} + +describe(`useLiveInfiniteQuery`, () => { + it(`should fetch initial page of data`, async () => { + const posts = createMockPosts(50) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `initial-page-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`) + .select(({ posts: p }) => ({ + id: p.id, + title: p.title, + createdAt: p.createdAt, + })), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Should have 1 page initially + expect(result.current.pages).toHaveLength(1) + expect(result.current.pages[0]).toHaveLength(10) + + // Data should be flattened + expect(result.current.data).toHaveLength(10) + + // Should have next page since we have 50 items total + expect(result.current.hasNextPage).toBe(true) + + // First item should be Post 1 (most recent by createdAt) + expect(result.current.pages[0]![0]).toMatchObject({ + id: `1`, + title: `Post 1`, + }) + }) + + it(`should fetch multiple pages`, async () => { + const posts = createMockPosts(50) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `multiple-pages-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Initially 1 page + expect(result.current.pages).toHaveLength(1) + expect(result.current.hasNextPage).toBe(true) + + // Fetch next page + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + expect(result.current.pages[0]).toHaveLength(10) + expect(result.current.pages[1]).toHaveLength(10) + expect(result.current.data).toHaveLength(20) + expect(result.current.hasNextPage).toBe(true) + + // Fetch another page + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(3) + }) + + expect(result.current.data).toHaveLength(30) + expect(result.current.hasNextPage).toBe(true) + }) + + it(`should detect when no more pages available`, async () => { + const posts = createMockPosts(25) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `no-more-pages-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Page 1: 10 items, has more + expect(result.current.pages).toHaveLength(1) + expect(result.current.hasNextPage).toBe(true) + + // Fetch page 2 + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + // Page 2: 10 items, has more + expect(result.current.pages[1]).toHaveLength(10) + expect(result.current.hasNextPage).toBe(true) + + // Fetch page 3 + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(3) + }) + + // Page 3: 5 items, no more + expect(result.current.pages[2]).toHaveLength(5) + expect(result.current.data).toHaveLength(25) + expect(result.current.hasNextPage).toBe(false) + }) + + it(`should handle empty results`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `empty-results-test`, + getKey: (post: Post) => post.id, + initialData: [], + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // With no data, we still have 1 page (which is empty) + expect(result.current.pages).toHaveLength(1) + expect(result.current.pages[0]).toHaveLength(0) + expect(result.current.data).toHaveLength(0) + expect(result.current.hasNextPage).toBe(false) + }) + + it(`should update pages when underlying data changes`, async () => { + const posts = createMockPosts(30) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `live-updates-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Fetch 2 pages + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + expect(result.current.data).toHaveLength(20) + + // Insert a new post with most recent timestamp + act(() => { + collection.utils.begin() + collection.utils.write({ + type: `insert`, + value: { + id: `new-1`, + title: `New Post`, + content: `New Content`, + createdAt: 1000001, // Most recent + category: `tech`, + }, + }) + collection.utils.commit() + }) + + await waitFor(() => { + // New post should be first + expect(result.current.pages[0]![0]).toMatchObject({ + id: `new-1`, + title: `New Post`, + }) + }) + + // Still showing 2 pages (20 items), but content has shifted + // The new item is included, pushing the last item out of view + expect(result.current.pages).toHaveLength(2) + expect(result.current.data).toHaveLength(20) + expect(result.current.pages[0]).toHaveLength(10) + expect(result.current.pages[1]).toHaveLength(10) + }) + + it(`should handle deletions across pages`, async () => { + const posts = createMockPosts(25) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `deletions-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Fetch 2 pages + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + expect(result.current.data).toHaveLength(20) + const firstItemId = result.current.data[0]!.id + + // Delete the first item + act(() => { + collection.utils.begin() + collection.utils.write({ + type: `delete`, + value: posts[0]!, + }) + collection.utils.commit() + }) + + await waitFor(() => { + // First item should have changed + expect(result.current.data[0]!.id).not.toBe(firstItemId) + }) + + // Still showing 2 pages, each pulls from remaining 24 items + // Page 1: items 0-9 (10 items) + // Page 2: items 10-19 (10 items) + // Total: 20 items (item 20-23 are beyond our loaded pages) + expect(result.current.pages).toHaveLength(2) + expect(result.current.data).toHaveLength(20) + expect(result.current.pages[0]).toHaveLength(10) + expect(result.current.pages[1]).toHaveLength(10) + }) + + it(`should work with where clauses`, async () => { + const posts = createMockPosts(50) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `where-clause-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .where(({ posts: p }) => eq(p.category, `tech`)) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 5, + getNextPageParam: (lastPage) => + lastPage.length === 5 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Should only have tech posts (every even ID) + expect(result.current.pages).toHaveLength(1) + expect(result.current.pages[0]).toHaveLength(5) + + // All items should be tech category + result.current.pages[0]!.forEach((post) => { + expect(post.category).toBe(`tech`) + }) + + // Should have more pages + expect(result.current.hasNextPage).toBe(true) + + // Fetch next page + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + expect(result.current.data).toHaveLength(10) + }) + + it(`should re-execute query when dependencies change`, async () => { + const posts = createMockPosts(50) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `deps-change-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result, rerender } = renderHook( + ({ category }: { category: string }) => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .where(({ posts: p }) => eq(p.category, category)) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 5, + getNextPageParam: (lastPage) => + lastPage.length === 5 ? lastPage.length : undefined, + }, + [category] + ) + }, + { initialProps: { category: `tech` } } + ) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Fetch 2 pages of tech posts + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + // Change category to life + act(() => { + rerender({ category: `life` }) + }) + + await waitFor(() => { + // Should reset to 1 page with life posts + expect(result.current.pages).toHaveLength(1) + }) + + // All items should be life category + result.current.pages[0]!.forEach((post) => { + expect(post.category).toBe(`life`) + }) + }) + + it(`should track pageParams correctly`, async () => { + const posts = createMockPosts(30) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `page-params-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + initialPageParam: 0, + getNextPageParam: (lastPage, allPages, lastPageParam) => + lastPage.length === 10 ? lastPageParam + 1 : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + expect(result.current.pageParams).toEqual([0]) + + // Fetch next page + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pageParams).toEqual([0, 1]) + }) + + // Fetch another page + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pageParams).toEqual([0, 1, 2]) + }) + }) + + it(`should handle exact page size boundaries`, async () => { + const posts = createMockPosts(20) // Exactly 2 pages + const collection = createCollection( + mockSyncCollectionOptions({ + id: `exact-boundary-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + // Better getNextPageParam that checks against total data available + getNextPageParam: (lastPage, allPages) => { + // If last page is not full, we're done + if (lastPage.length < 10) return undefined + // Check if we've likely loaded all data (this is a heuristic) + // In a real app with backend, you'd check response metadata + const totalLoaded = allPages.flat().length + // If we have less than a full page left, no more pages + return totalLoaded + }, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + expect(result.current.hasNextPage).toBe(true) + + // Fetch page 2 + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + expect(result.current.pages[1]).toHaveLength(10) + // With setWindow peek-ahead, we can now detect no more pages immediately + // We request 21 items (2 * 10 + 1 peek) but only get 20, so we know there's no more + expect(result.current.hasNextPage).toBe(false) + + // Verify total data + expect(result.current.data).toHaveLength(20) + }) + + it(`should not fetch when already fetching`, async () => { + const posts = createMockPosts(50) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `concurrent-fetch-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + expect(result.current.pages).toHaveLength(1) + + // With sync data, all fetches complete immediately, so all 3 calls will succeed + // The key is that they won't cause race conditions or errors + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(3) + }) + + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(4) + }) + + // All fetches should have succeeded + expect(result.current.pages).toHaveLength(4) + expect(result.current.data).toHaveLength(40) + }) + + it(`should not fetch when hasNextPage is false`, async () => { + const posts = createMockPosts(5) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `no-fetch-when-done-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + expect(result.current.hasNextPage).toBe(false) + expect(result.current.pages).toHaveLength(1) + + // Try to fetch when there's no next page + act(() => { + result.current.fetchNextPage() + }) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Should still have only 1 page + expect(result.current.pages).toHaveLength(1) + }) + + it(`should support custom initialPageParam`, async () => { + const posts = createMockPosts(30) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `initial-param-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + initialPageParam: 100, + getNextPageParam: (lastPage, allPages, lastPageParam) => + lastPage.length === 10 ? lastPageParam + 1 : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + expect(result.current.pageParams).toEqual([100]) + + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pageParams).toEqual([100, 101]) + }) + }) + + it(`should detect hasNextPage change when new items are synced`, async () => { + // Start with exactly 20 items (2 pages) + const posts = createMockPosts(20) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `sync-detection-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Load both pages + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(2) + }) + + // Should have no next page (exactly 20 items, 2 full pages, peek returns nothing) + expect(result.current.hasNextPage).toBe(false) + expect(result.current.data).toHaveLength(20) + + // Add 5 more items to the collection + act(() => { + collection.utils.begin() + for (let i = 0; i < 5; i++) { + collection.utils.write({ + type: `insert`, + value: { + id: `new-${i}`, + title: `New Post ${i}`, + content: `Content ${i}`, + createdAt: Date.now() + i, + category: `tech`, + }, + }) + } + collection.utils.commit() + }) + + // Should now detect that there's a next page available + await waitFor(() => { + expect(result.current.hasNextPage).toBe(true) + }) + + // Data should still be 20 items (we haven't fetched the next page yet) + expect(result.current.data).toHaveLength(20) + expect(result.current.pages).toHaveLength(2) + + // Fetch the next page + act(() => { + result.current.fetchNextPage() + }) + + await waitFor(() => { + expect(result.current.pages).toHaveLength(3) + }) + + // Third page should have the new items + expect(result.current.pages[2]).toHaveLength(5) + expect(result.current.data).toHaveLength(25) + + // No more pages available now + expect(result.current.hasNextPage).toBe(false) + }) + + it(`should set isFetchingNextPage to false when data is immediately available`, async () => { + const posts = createMockPosts(50) + const collection = createCollection( + mockSyncCollectionOptions({ + id: `immediate-data-test`, + getKey: (post: Post) => post.id, + initialData: posts, + }) + ) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Initially 1 page and not fetching + expect(result.current.pages).toHaveLength(1) + expect(result.current.isFetchingNextPage).toBe(false) + + // Fetch next page - should remain false because data is immediately available + act(() => { + result.current.fetchNextPage() + }) + + // Since data is *synchronously* available, isFetchingNextPage should be false + expect(result.current.pages).toHaveLength(2) + expect(result.current.isFetchingNextPage).toBe(false) + }) + + it(`should track isFetchingNextPage when async loading is triggered`, async () => { + let loadSubsetCallCount = 0 + + const collection = createCollection({ + id: `async-loading-test`, + getKey: (post: Post) => post.id, + syncMode: `on-demand`, + startSync: true, + sync: { + sync: ({ markReady, begin, write, commit }) => { + // Provide initial data + begin() + for (let i = 1; i <= 15; i++) { + write({ + type: `insert`, + value: { + id: `${i}`, + title: `Post ${i}`, + content: `Content ${i}`, + createdAt: 1000000 - i * 1000, + category: i % 2 === 0 ? `tech` : `life`, + }, + }) + } + commit() + markReady() + + return { + loadSubset: () => { + loadSubsetCallCount++ + + // First few calls return true (initial load + window setup) + if (loadSubsetCallCount <= 2) { + return true + } + + // Subsequent calls simulate async loading with a real timeout + const loadPromise = new Promise((resolve) => { + setTimeout(() => { + begin() + // Load more data + for (let i = 16; i <= 30; i++) { + write({ + type: `insert`, + value: { + id: `${i}`, + title: `Post ${i}`, + content: `Content ${i}`, + createdAt: 1000000 - i * 1000, + category: i % 2 === 0 ? `tech` : `life`, + }, + }) + } + commit() + resolve() + }, 50) + }) + + return loadPromise + }, + } + }, + }, + }) + + const { result } = renderHook(() => { + return useLiveInfiniteQuery( + (q) => + q + .from({ posts: collection }) + .orderBy(({ posts: p }) => p.createdAt, `desc`), + { + pageSize: 10, + getNextPageParam: (lastPage) => + lastPage.length === 10 ? lastPage.length : undefined, + } + ) + }) + + await waitFor(() => { + expect(result.current.isReady).toBe(true) + }) + + // Wait for initial window setup to complete + await waitFor(() => { + expect(result.current.isFetchingNextPage).toBe(false) + }) + + expect(result.current.pages).toHaveLength(1) + + // Fetch next page which will trigger async loading + act(() => { + result.current.fetchNextPage() + }) + + // Should be fetching now and so isFetchingNextPage should be true *synchronously!* + expect(result.current.isFetchingNextPage).toBe(true) + + // Wait for loading to complete + await waitFor( + () => { + expect(result.current.isFetchingNextPage).toBe(false) + }, + { timeout: 200 } + ) + + // Should have 2 pages now + expect(result.current.pages).toHaveLength(2) + expect(result.current.data).toHaveLength(20) + }, 10000) +})