From 88505207de02dbb4f8e98f52bbcadc26803309b4 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 15:08:22 +0100 Subject: [PATCH] add schedular address feedback with large refactor tweaks --- .changeset/olive-crews-love.md | 5 + .../db/src/query/live-query-collection.ts | 53 +- .../query/live/collection-config-builder.ts | 268 ++++++++- .../db/src/query/live/collection-registry.ts | 47 ++ .../src/query/live/collection-subscriber.ts | 49 +- packages/db/src/scheduler.ts | 198 +++++++ packages/db/src/transactions.ts | 13 +- packages/db/tests/query/scheduler.test.ts | 527 ++++++++++++++++++ 8 files changed, 1118 insertions(+), 42 deletions(-) create mode 100644 .changeset/olive-crews-love.md create mode 100644 packages/db/src/query/live/collection-registry.ts create mode 100644 packages/db/src/scheduler.ts create mode 100644 packages/db/tests/query/scheduler.test.ts diff --git a/.changeset/olive-crews-love.md b/.changeset/olive-crews-love.md new file mode 100644 index 000000000..428880964 --- /dev/null +++ b/.changeset/olive-crews-love.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Add a scheduler that ensures that if a transaction touches multiple collections that feed into a single live query, the live query only emits a single batch of updates. This fixes an issue where multiple renders could be triggered from a live query under this situation. diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index c73b12b37..dac5ee8c1 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -1,5 +1,10 @@ import { createCollection } from "../collection/index.js" import { CollectionConfigBuilder } from "./live/collection-config-builder.js" +import { + getBuilderFromConfig, + registerCollectionBuilder, +} from "./live/collection-registry.js" +import type { LiveQueryCollectionUtils } from "./live/collection-config-builder.js" import type { LiveQueryCollectionConfig } from "./live/types.js" import type { InitialQueryBuilder, QueryBuilder } from "./builder/index.js" import type { Collection } from "../collection/index.js" @@ -55,7 +60,9 @@ export function liveQueryCollectionOptions< TResult extends object = GetResult, >( config: LiveQueryCollectionConfig -): CollectionConfigForContext { +): CollectionConfigForContext & { + utils: LiveQueryCollectionUtils +} { const collectionConfigBuilder = new CollectionConfigBuilder< TContext, TResult @@ -63,7 +70,7 @@ export function liveQueryCollectionOptions< return collectionConfigBuilder.getConfig() as CollectionConfigForContext< TContext, TResult - > + > & { utils: LiveQueryCollectionUtils } } /** @@ -106,7 +113,9 @@ export function createLiveQueryCollection< TResult extends object = GetResult, >( query: (q: InitialQueryBuilder) => QueryBuilder -): CollectionForContext +): CollectionForContext & { + utils: LiveQueryCollectionUtils +} // Overload 2: Accept full config object with optional utilities export function createLiveQueryCollection< @@ -115,7 +124,9 @@ export function createLiveQueryCollection< TUtils extends UtilsRecord = {}, >( config: LiveQueryCollectionConfig & { utils?: TUtils } -): CollectionForContext +): CollectionForContext & { + utils: LiveQueryCollectionUtils & TUtils +} // Implementation export function createLiveQueryCollection< @@ -126,7 +137,9 @@ export function createLiveQueryCollection< configOrQuery: | (LiveQueryCollectionConfig & { utils?: TUtils }) | ((q: InitialQueryBuilder) => QueryBuilder) -): CollectionForContext { +): CollectionForContext & { + utils: LiveQueryCollectionUtils & TUtils +} { // Determine if the argument is a function (query) or a config object if (typeof configOrQuery === `function`) { // Simple query function case @@ -139,7 +152,7 @@ export function createLiveQueryCollection< return bridgeToCreateCollection(options) as CollectionForContext< TContext, TResult - > + > & { utils: LiveQueryCollectionUtils & TUtils } } else { // Config object case const config = configOrQuery as LiveQueryCollectionConfig< @@ -147,10 +160,16 @@ export function createLiveQueryCollection< TResult > & { utils?: TUtils } const options = liveQueryCollectionOptions(config) - return bridgeToCreateCollection({ - ...options, - utils: config.utils, - }) as CollectionForContext + + // Merge custom utils if provided, preserving the getBuilder() method for dependency tracking + if (config.utils) { + options.utils = { ...options.utils, ...config.utils } + } + + return bridgeToCreateCollection(options) as CollectionForContext< + TContext, + TResult + > & { utils: LiveQueryCollectionUtils & TUtils } } } @@ -162,12 +181,18 @@ function bridgeToCreateCollection< TResult extends object, TUtils extends UtilsRecord = {}, >( - options: CollectionConfig & { utils?: TUtils } + options: CollectionConfig & { utils: TUtils } ): Collection { - // This is the only place we need a type assertion, hidden from user API - return createCollection(options as any) as unknown as Collection< + const collection = createCollection(options as any) as unknown as Collection< TResult, string | number, - TUtils + LiveQueryCollectionUtils > + + const builder = getBuilderFromConfig(options) + if (builder) { + registerCollectionBuilder(collection, builder) + } + + return collection as unknown as Collection } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index f22e7a932..273ddd1d1 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -2,7 +2,11 @@ import { D2, output } from "@tanstack/db-ivm" import { compileQuery } from "../compiler/index.js" import { buildQuery, getQueryIR } from "../builder/index.js" import { MissingAliasInputsError } from "../../errors.js" +import { transactionScopedScheduler } from "../../scheduler.js" +import { getActiveTransaction } from "../../transactions.js" import { CollectionSubscriber } from "./collection-subscriber.js" +import { getCollectionBuilder } from "./collection-registry.js" +import type { SchedulerContextId } from "../../scheduler.js" import type { CollectionSubscription } from "../../collection/subscription.js" import type { RootStreamBuilder } from "@tanstack/db-ivm" import type { OrderByOptimizationInfo } from "../compiler/order-by.js" @@ -12,6 +16,7 @@ import type { KeyedStream, ResultStream, SyncConfig, + UtilsRecord, } from "../../types.js" import type { Context, GetResult } from "../builder/types.js" import type { BasicExpression, QueryIR } from "../ir.js" @@ -24,6 +29,15 @@ import type { } from "./types.js" import type { AllCollectionEvents } from "../../collection/events.js" +export type LiveQueryCollectionUtils = UtilsRecord & { + getRunCount: () => number + getBuilder: () => CollectionConfigBuilder +} + +type PendingGraphRun = { + loadCallbacks: Set<() => boolean> +} + // Global counter for auto-generated collection IDs let liveQueryCollectionCounter = 0 @@ -52,6 +66,14 @@ export class CollectionConfigBuilder< private readonly compare?: (val1: TResult, val2: TResult) => number private isGraphRunning = false + private runCount = 0 + + // Current sync session state (set when sync starts, cleared when it stops) + // Public for testing purposes (CollectionConfigBuilder is internal, not public API) + public currentSyncConfig: + | Parameters[`sync`]>[0] + | undefined + public currentSyncState: FullSyncState | undefined // Error state tracking private isInErrorState = false @@ -59,6 +81,28 @@ export class CollectionConfigBuilder< // Reference to the live query collection for error state transitions private liveQueryCollection?: Collection + private readonly aliasDependencies: Record< + string, + Array> + > = {} + + private readonly builderDependencies = new Set< + CollectionConfigBuilder + >() + + // Pending graph runs per scheduler context (e.g., per transaction) + // The builder manages its own state; the scheduler just orchestrates execution order + // Only stores callbacks - if sync ends, pending jobs gracefully no-op + private readonly pendingGraphRuns = new Map< + SchedulerContextId, + PendingGraphRun + >() + + // Unsubscribe function for scheduler's onClear listener + // Registered when sync starts, unregistered when sync stops + // Prevents memory leaks by releasing the scheduler's reference to this builder + private unsubscribeFromSchedulerClears?: () => void + private graphCache: D2 | undefined private inputsCache: Record> | undefined private pipelineCache: ResultStream | undefined @@ -107,7 +151,9 @@ export class CollectionConfigBuilder< this.compileBasePipeline() } - getConfig(): CollectionConfigSingleRowOption { + getConfig(): CollectionConfigSingleRowOption & { + utils: LiveQueryCollectionUtils + } { return { id: this.id, getKey: @@ -122,6 +168,10 @@ export class CollectionConfigBuilder< onDelete: this.config.onDelete, startSync: this.config.startSync, singleResult: this.query.singleResult, + utils: { + getRunCount: this.getRunCount.bind(this), + getBuilder: () => this, + }, } } @@ -159,12 +209,8 @@ export class CollectionConfigBuilder< // That can happen because even though we load N rows, the pipeline might filter some of these rows out // causing the orderBy operator to receive less than N rows or even no rows at all. // So this callback would notice that it doesn't have enough rows and load some more. - // The callback returns a boolean, when it's true it's done loading data. - maybeRunGraph( - config: SyncMethods, - syncState: FullSyncState, - callback?: () => boolean - ) { + // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. + maybeRunGraph(callback?: () => boolean) { if (this.isGraphRunning) { // no nested runs of the graph // which is possible if the `callback` @@ -172,10 +218,18 @@ export class CollectionConfigBuilder< return } + // Should only be called when sync is active + if (!this.currentSyncConfig || !this.currentSyncState) { + throw new Error( + `maybeRunGraph called without active sync session. This should not happen.` + ) + } + this.isGraphRunning = true try { - const { begin, commit } = config + const { begin, commit } = this.currentSyncConfig + const syncState = this.currentSyncState // Don't run if the live query is in an error state if (this.isInErrorState) { @@ -196,7 +250,7 @@ export class CollectionConfigBuilder< commit() // After initial commit, check if we should mark ready // (in case all sources were already ready before we subscribed) - this.updateLiveQueryStatus(config) + this.updateLiveQueryStatus(this.currentSyncConfig) } } } finally { @@ -204,6 +258,158 @@ export class CollectionConfigBuilder< } } + /** + * Schedules a graph run with the transaction-scoped scheduler. + * Ensures each builder runs at most once per transaction, with automatic dependency tracking + * to run parent queries before child queries. Outside a transaction, runs immediately. + * + * Multiple calls during a transaction are coalesced into a single execution. + * Dependencies are auto-discovered from subscribed live queries, or can be overridden. + * Load callbacks are combined when entries merge. + * + * Uses the current sync session's config and syncState from instance properties. + * + * @param callback - Optional callback to load more data if needed (returns true when done) + * @param options - Optional scheduling configuration + * @param options.contextId - Transaction ID to group work; defaults to active transaction + * @param options.jobId - Unique identifier for this job; defaults to this builder instance + * @param options.alias - Source alias that triggered this schedule; adds alias-specific dependencies + * @param options.dependencies - Explicit dependency list; overrides auto-discovered dependencies + */ + scheduleGraphRun( + callback?: () => boolean, + options?: { + contextId?: SchedulerContextId + jobId?: unknown + alias?: string + dependencies?: Array> + } + ) { + const contextId = options?.contextId ?? getActiveTransaction()?.id + // Use the builder instance as the job ID for deduplication. This is memory-safe + // because the scheduler's context Map is deleted after flushing (no long-term retention). + const jobId = options?.jobId ?? this + const dependentBuilders = (() => { + if (options?.dependencies) { + return options.dependencies + } + + const deps = new Set(this.builderDependencies) + if (options?.alias) { + const aliasDeps = this.aliasDependencies[options.alias] + if (aliasDeps) { + for (const dep of aliasDeps) { + deps.add(dep) + } + } + } + + deps.delete(this) + + return Array.from(deps) + })() + + // We intentionally scope deduplication to the builder instance. Each instance + // owns caches and compiled pipelines, so sharing work across instances that + // merely reuse the same string id would execute the wrong builder's graph. + + if (!this.currentSyncConfig || !this.currentSyncState) { + throw new Error( + `scheduleGraphRun called without active sync session. This should not happen.` + ) + } + + // Manage our own state - get or create pending callbacks for this context + let pending = contextId ? this.pendingGraphRuns.get(contextId) : undefined + if (!pending) { + pending = { + loadCallbacks: new Set(), + } + if (contextId) { + this.pendingGraphRuns.set(contextId, pending) + } + } + + // Add callback if provided (this is what accumulates between schedules) + if (callback) { + pending.loadCallbacks.add(callback) + } + + // Schedule execution (scheduler just orchestrates order, we manage state) + // For immediate execution (no contextId), pass pending directly since it won't be in the map + const pendingToPass = contextId ? undefined : pending + transactionScopedScheduler.schedule({ + contextId, + jobId, + dependencies: dependentBuilders, + run: () => this.executeGraphRun(contextId, pendingToPass), + }) + } + + /** + * Clears pending graph run state for a specific context. + * Called when the scheduler clears a context (e.g., transaction rollback/abort). + */ + clearPendingGraphRun(contextId: SchedulerContextId): void { + this.pendingGraphRuns.delete(contextId) + } + + /** + * Executes a pending graph run. Called by the scheduler when dependencies are satisfied. + * Clears the pending state BEFORE execution so that any re-schedules during the run + * create fresh state and don't interfere with the current execution. + * Uses instance sync state - if sync has ended, gracefully returns without executing. + * + * @param contextId - Optional context ID to look up pending state + * @param pendingParam - For immediate execution (no context), pending state is passed directly + */ + private executeGraphRun( + contextId?: SchedulerContextId, + pendingParam?: PendingGraphRun + ): void { + // Get pending state: either from parameter (no context) or from map (with context) + // Remove from map BEFORE checking sync state to prevent leaking entries when sync ends + // before the transaction flushes (e.g., unsubscribe during in-flight transaction) + const pending = + pendingParam ?? + (contextId ? this.pendingGraphRuns.get(contextId) : undefined) + if (contextId) { + this.pendingGraphRuns.delete(contextId) + } + + // If no pending state, nothing to execute (context was cleared) + if (!pending) { + return + } + + // If sync session has ended, don't execute (graph is finalized, subscriptions cleared) + if (!this.currentSyncConfig || !this.currentSyncState) { + return + } + + this.incrementRunCount() + + const combinedLoader = () => { + let allDone = true + let firstError: unknown + pending.loadCallbacks.forEach((loader) => { + try { + allDone = loader() && allDone + } catch (error) { + allDone = false + firstError ??= error + } + }) + if (firstError) { + throw firstError + } + // Returning false signals that callers should schedule another pass. + return allDone + } + + this.maybeRunGraph(combinedLoader) + } + private getSyncConfig(): SyncConfig { return { rowUpdateMode: `full`, @@ -211,9 +417,19 @@ export class CollectionConfigBuilder< } } + incrementRunCount() { + this.runCount++ + } + + getRunCount() { + return this.runCount + } + private syncFn(config: SyncMethods) { // Store reference to the live query collection for error state transitions this.liveQueryCollection = config.collection + // Store config and syncState as instance properties for the duration of this sync session + this.currentSyncConfig = config const syncState: SyncState = { messagesCount: 0, @@ -226,6 +442,15 @@ export class CollectionConfigBuilder< config, syncState ) + this.currentSyncState = fullSyncState + + // Listen for scheduler context clears to clean up our pending state + // Re-register on each sync start so the listener is active for the sync session's lifetime + this.unsubscribeFromSchedulerClears = transactionScopedScheduler.onClear( + (contextId) => { + this.clearPendingGraphRun(contextId) + } + ) const loadMoreDataCallbacks = this.subscribeToAllCollections( config, @@ -233,12 +458,20 @@ export class CollectionConfigBuilder< ) // Initial run with callback to load more data if needed - this.maybeRunGraph(config, fullSyncState, loadMoreDataCallbacks) + this.scheduleGraphRun(loadMoreDataCallbacks) // Return the unsubscribe function return () => { syncState.unsubscribeCallbacks.forEach((unsubscribe) => unsubscribe()) + // Clear current sync session state + this.currentSyncConfig = undefined + this.currentSyncState = undefined + + // Clear all pending graph runs to prevent memory leaks from in-flight transactions + // that may flush after the sync session ends + this.pendingGraphRuns.clear() + // Reset caches so a fresh graph/pipeline is compiled on next start // This avoids reusing a finalized D2 graph across GC restarts this.graphCache = undefined @@ -257,6 +490,11 @@ export class CollectionConfigBuilder< (key) => delete this.subscriptions[key] ) this.compiledAliasToCollectionId = {} + + // Unregister from scheduler's onClear listener to prevent memory leaks + // The scheduler's listener Set would otherwise keep a strong reference to this builder + this.unsubscribeFromSchedulerClears?.() + this.unsubscribeFromSchedulerClears = undefined } } @@ -486,14 +724,20 @@ export class CollectionConfigBuilder< const collection = this.collectionByAlias[alias] ?? this.collections[collectionId]! + const dependencyBuilder = getCollectionBuilder(collection) + if (dependencyBuilder && dependencyBuilder !== this) { + this.aliasDependencies[alias] = [dependencyBuilder] + this.builderDependencies.add(dependencyBuilder) + } else { + this.aliasDependencies[alias] = [] + } + // CollectionSubscriber handles the actual subscription to the source collection // and feeds data into the D2 graph inputs for this specific alias const collectionSubscriber = new CollectionSubscriber( alias, collectionId, collection, - config, - syncState, this ) diff --git a/packages/db/src/query/live/collection-registry.ts b/packages/db/src/query/live/collection-registry.ts new file mode 100644 index 000000000..72921604b --- /dev/null +++ b/packages/db/src/query/live/collection-registry.ts @@ -0,0 +1,47 @@ +import type { Collection } from "../../collection/index.js" +import type { CollectionConfigBuilder } from "./collection-config-builder.js" + +const collectionBuilderRegistry = new WeakMap< + Collection, + CollectionConfigBuilder +>() + +/** + * Retrieves the builder attached to a config object via its utils.getBuilder() method. + * + * @param config - The collection config object + * @returns The attached builder, or `undefined` if none exists + */ +export function getBuilderFromConfig( + config: object +): CollectionConfigBuilder | undefined { + return (config as any).utils?.getBuilder?.() +} + +/** + * Registers a builder for a collection in the global registry. + * Used to detect when a live query depends on another live query, + * enabling the scheduler to ensure parent queries run first. + * + * @param collection - The collection to register the builder for + * @param builder - The builder that produces this collection + */ +export function registerCollectionBuilder( + collection: Collection, + builder: CollectionConfigBuilder +): void { + collectionBuilderRegistry.set(collection, builder) +} + +/** + * Retrieves the builder registered for a collection. + * Used to discover dependencies when a live query subscribes to another live query. + * + * @param collection - The collection to look up + * @returns The registered builder, or `undefined` if none exists + */ +export function getCollectionBuilder( + collection: Collection +): CollectionConfigBuilder | undefined { + return collectionBuilderRegistry.get(collection) +} diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index c3d071564..0abed0022 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -4,16 +4,19 @@ import { convertToBasicExpression, } from "../compiler/expressions.js" import { WhereClauseConversionError } from "../../errors.js" -import type { FullSyncState } from "./types.js" import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" import type { Collection } from "../../collection/index.js" -import type { ChangeMessage, SyncConfig } from "../../types.js" +import type { ChangeMessage } from "../../types.js" import type { Context, GetResult } from "../builder/types.js" import type { BasicExpression } from "../ir.js" import type { OrderByOptimizationInfo } from "../compiler/order-by.js" import type { CollectionConfigBuilder } from "./collection-config-builder.js" import type { CollectionSubscription } from "../../collection/subscription.js" +const loadMoreCallbackSymbol = Symbol.for( + `@tanstack/db.collection-config-builder` +) + export class CollectionSubscriber< TContext extends Context, TResult extends object = GetResult, @@ -25,8 +28,6 @@ export class CollectionSubscriber< private alias: string, private collectionId: string, private collection: Collection, - private config: Parameters[`sync`]>[0], - private syncState: FullSyncState, private collectionConfigBuilder: CollectionConfigBuilder ) {} @@ -68,7 +69,11 @@ export class CollectionSubscriber< const unsubscribe = () => { subscription.unsubscribe() } - this.syncState.unsubscribeCallbacks.add(unsubscribe) + // currentSyncState is always defined when subscribe() is called + // (called during sync session setup) + this.collectionConfigBuilder.currentSyncState!.unsubscribeCallbacks.add( + unsubscribe + ) return subscription } @@ -76,7 +81,10 @@ export class CollectionSubscriber< changes: Iterable>, callback?: () => boolean ) { - const input = this.syncState.inputs[this.alias]! + // currentSyncState and input are always defined when this method is called + // (only called from active subscriptions during a sync session) + const input = + this.collectionConfigBuilder.currentSyncState!.inputs[this.alias]! const sentChanges = sendChangesToInput( input, changes, @@ -88,14 +96,12 @@ export class CollectionSubscriber< // otherwise we end up in an infinite loop trying to load more data const dataLoader = sentChanges > 0 ? callback : undefined - // Always call maybeRunGraph to process changes eagerly. - // The graph will run unless the live query is in an error state. - // Status management is handled separately via status:change event listeners. - this.collectionConfigBuilder.maybeRunGraph( - this.config, - this.syncState, - dataLoader - ) + // We need to schedule a graph run even if there's no data to load + // because we need to mark the collection as ready if it's not already + // and that's only done in `scheduleGraphRun` + this.collectionConfigBuilder.scheduleGraphRun(dataLoader, { + alias: this.alias, + }) } private subscribeToMatchingChanges( @@ -212,9 +218,22 @@ export class CollectionSubscriber< } const trackedChanges = this.trackSentValues(changes, orderByInfo.comparator) + + // Cache the loadMoreIfNeeded callback on the subscription using a symbol property. + // This ensures we pass the same function instance to the scheduler each time, + // allowing it to deduplicate callbacks when multiple changes arrive during a transaction. + type SubscriptionWithLoader = CollectionSubscription & { + [loadMoreCallbackSymbol]?: () => boolean + } + + const subscriptionWithLoader = subscription as SubscriptionWithLoader + + subscriptionWithLoader[loadMoreCallbackSymbol] ??= + this.loadMoreIfNeeded.bind(this, subscription) + this.sendChangesToPipeline( trackedChanges, - this.loadMoreIfNeeded.bind(this, subscription) + subscriptionWithLoader[loadMoreCallbackSymbol] ) } diff --git a/packages/db/src/scheduler.ts b/packages/db/src/scheduler.ts new file mode 100644 index 000000000..de67593ed --- /dev/null +++ b/packages/db/src/scheduler.ts @@ -0,0 +1,198 @@ +/** + * Identifier used to scope scheduled work. Maps to a transaction id for live queries. + */ +export type SchedulerContextId = string | symbol + +/** + * Options for {@link Scheduler.schedule}. Jobs are identified by `jobId` within a context + * and may declare dependencies. + */ +interface ScheduleOptions { + contextId?: SchedulerContextId + jobId: unknown + dependencies?: Iterable + run: () => void +} + +/** + * State per context. Queue preserves order, jobs hold run functions, dependencies track + * prerequisites, and completed records which jobs have run during the current flush. + */ +interface SchedulerContextState { + queue: Array + jobs: Map void> + dependencies: Map> + completed: Set +} + +/** + * Scoped scheduler that coalesces work by context and job. + * + * - **context** (e.g. transaction id) defines the batching boundary; work is queued until flushed. + * - **job id** deduplicates work within a context; scheduling the same job replaces the previous run function. + * - Without a context id, work executes immediately. + * + * Callers manage their own state; the scheduler only orchestrates execution order. + */ +export class Scheduler { + private contexts = new Map() + private clearListeners = new Set<(contextId: SchedulerContextId) => void>() + + /** + * Get or create the state bucket for a context. + */ + private getOrCreateContext( + contextId: SchedulerContextId + ): SchedulerContextState { + let context = this.contexts.get(contextId) + if (!context) { + context = { + queue: [], + jobs: new Map(), + dependencies: new Map(), + completed: new Set(), + } + this.contexts.set(contextId, context) + } + return context + } + + /** + * Schedule work. Without a context id, executes immediately. + * Otherwise queues the job to be flushed once dependencies are satisfied. + * Scheduling the same jobId again replaces the previous run function. + */ + schedule({ contextId, jobId, dependencies, run }: ScheduleOptions): void { + if (typeof contextId === `undefined`) { + run() + return + } + + const context = this.getOrCreateContext(contextId) + + // If this is a new job, add it to the queue + if (!context.jobs.has(jobId)) { + context.queue.push(jobId) + } + + // Store or replace the run function + context.jobs.set(jobId, run) + + // Update dependencies + if (dependencies) { + const depSet = new Set(dependencies) + depSet.delete(jobId) + context.dependencies.set(jobId, depSet) + } else if (!context.dependencies.has(jobId)) { + context.dependencies.set(jobId, new Set()) + } + + // Clear completion status since we're rescheduling + context.completed.delete(jobId) + } + + /** + * Flush all queued work for a context. Jobs with unmet dependencies are retried. + * Throws if a pass completes without running any job (dependency cycle). + */ + flush(contextId: SchedulerContextId): void { + const context = this.contexts.get(contextId) + if (!context) return + + const { queue, jobs, dependencies, completed } = context + + while (queue.length > 0) { + let ranThisPass = false + const jobsThisPass = queue.length + + for (let i = 0; i < jobsThisPass; i++) { + const jobId = queue.shift()! + const run = jobs.get(jobId) + if (!run) { + dependencies.delete(jobId) + completed.delete(jobId) + continue + } + + const deps = dependencies.get(jobId) + let ready = !deps + if (deps) { + ready = true + for (const dep of deps) { + if (dep !== jobId && !completed.has(dep)) { + ready = false + break + } + } + } + + if (ready) { + jobs.delete(jobId) + dependencies.delete(jobId) + // Run the job. If it throws, we don't mark it complete, allowing the + // error to propagate while maintaining scheduler state consistency. + run() + completed.add(jobId) + ranThisPass = true + } else { + queue.push(jobId) + } + } + + if (!ranThisPass) { + throw new Error( + `Scheduler detected unresolved dependencies for context ${String( + contextId + )}.` + ) + } + } + + this.contexts.delete(contextId) + } + + /** + * Flush all contexts with pending work. Useful during tear-down. + */ + flushAll(): void { + for (const contextId of Array.from(this.contexts.keys())) { + this.flush(contextId) + } + } + + /** Clear all scheduled jobs for a context. */ + clear(contextId: SchedulerContextId): void { + this.contexts.delete(contextId) + // Notify listeners that this context was cleared + this.clearListeners.forEach((listener) => listener(contextId)) + } + + /** Register a listener to be notified when a context is cleared. */ + onClear(listener: (contextId: SchedulerContextId) => void): () => void { + this.clearListeners.add(listener) + return () => this.clearListeners.delete(listener) + } + + /** Check if a context has pending jobs. */ + hasPendingJobs(contextId: SchedulerContextId): boolean { + const context = this.contexts.get(contextId) + return !!context && context.jobs.size > 0 + } + + /** Remove a single job from a context and clean up its dependencies. */ + clearJob(contextId: SchedulerContextId, jobId: unknown): void { + const context = this.contexts.get(contextId) + if (!context) return + + context.jobs.delete(jobId) + context.dependencies.delete(jobId) + context.completed.delete(jobId) + context.queue = context.queue.filter((id) => id !== jobId) + + if (context.jobs.size === 0) { + this.contexts.delete(contextId) + } + } +} + +export const transactionScopedScheduler = new Scheduler() diff --git a/packages/db/src/transactions.ts b/packages/db/src/transactions.ts index 8d4500c25..9fc79238b 100644 --- a/packages/db/src/transactions.ts +++ b/packages/db/src/transactions.ts @@ -5,6 +5,7 @@ import { TransactionNotPendingCommitError, TransactionNotPendingMutateError, } from "./errors" +import { transactionScopedScheduler } from "./scheduler.js" import type { Deferred } from "./deferred" import type { MutationFn, @@ -179,11 +180,21 @@ export function getActiveTransaction(): Transaction | undefined { } function registerTransaction(tx: Transaction) { + // Clear any stale work that may have been left behind if a previous mutate + // scope aborted before we could flush. + transactionScopedScheduler.clear(tx.id) transactionStack.push(tx) } function unregisterTransaction(tx: Transaction) { - transactionStack = transactionStack.filter((t) => t.id !== tx.id) + // Always flush pending work for this transaction before removing it from + // the ambient stack – this runs even if the mutate callback throws. + // If flush throws (e.g., due to a job error), we still clean up the stack. + try { + transactionScopedScheduler.flush(tx.id) + } finally { + transactionStack = transactionStack.filter((t) => t.id !== tx.id) + } } function removeFromPendingList(tx: Transaction) { diff --git a/packages/db/tests/query/scheduler.test.ts b/packages/db/tests/query/scheduler.test.ts new file mode 100644 index 000000000..a2b6026c5 --- /dev/null +++ b/packages/db/tests/query/scheduler.test.ts @@ -0,0 +1,527 @@ +import { afterEach, describe, expect, it, vi } from "vitest" +import { createCollection } from "../../src/collection/index.js" +import { createLiveQueryCollection, eq } from "../../src/query/index.js" +import { createTransaction } from "../../src/transactions.js" +import { transactionScopedScheduler } from "../../src/scheduler.js" +import { CollectionConfigBuilder } from "../../src/query/live/collection-config-builder.js" +import type { FullSyncState } from "../../src/query/live/types.js" +import type { SyncConfig } from "../../src/types.js" + +interface ChangeMessageLike { + type: string + value: any +} + +interface User { + id: number + name: string +} + +interface Task { + id: number + userId: number + title: string +} + +function setupLiveQueryCollections(id: string) { + const users = createCollection({ + id: `${id}-users`, + getKey: (user) => user.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const tasks = createCollection({ + id: `${id}-tasks`, + getKey: (task) => task.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const assignments = createLiveQueryCollection({ + id: `${id}-assignments`, + startSync: true, + query: (q) => + q + .from({ user: users }) + .join({ task: tasks }, ({ user, task }) => eq(user.id, task.userId)) + .select(({ user, task }) => ({ + userId: user.id, + taskId: task?.id, + title: task?.title, + })), + }) + + return { users, tasks, assignments } +} + +function recordBatches(collection: any) { + const batches: Array> = [] + const subscription = collection.subscribeChanges((changes: any) => { + batches.push(changes as Array) + }) + return { + batches, + unsubscribe: () => subscription.unsubscribe(), + } +} + +afterEach(() => { + transactionScopedScheduler.flushAll() +}) + +describe(`live query scheduler`, () => { + it(`runs the live query graph once per transaction that touches multiple collections`, async () => { + const { users, tasks, assignments } = + setupLiveQueryCollections(`single-batch`) + await assignments.preload() + + const recorder = recordBatches(assignments) + + const transaction = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + transaction.mutate(() => { + users.insert({ id: 1, name: `Alice` }) + tasks.insert({ id: 1, userId: 1, title: `Write tests` }) + }) + + expect(recorder.batches).toHaveLength(1) + expect(recorder.batches[0]).toHaveLength(1) + expect(recorder.batches[0]![0]).toMatchObject({ + type: `insert`, + value: { + userId: 1, + taskId: 1, + title: `Write tests`, + }, + }) + + recorder.unsubscribe() + transaction.rollback() + }) + + it(`handles nested transactions without emitting duplicate batches`, async () => { + const { users, tasks, assignments } = setupLiveQueryCollections(`nested`) + await assignments.preload() + + const recorder = recordBatches(assignments) + + const outerTx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + const innerTx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + outerTx.mutate(() => { + users.insert({ id: 11, name: `Nested User` }) + innerTx.mutate(() => { + tasks.insert({ id: 21, userId: 11, title: `Nested Task` }) + }) + }) + + expect(recorder.batches).toHaveLength(1) + expect(recorder.batches[0]![0]).toMatchObject({ + value: { + userId: 11, + taskId: 21, + title: `Nested Task`, + }, + }) + + recorder.unsubscribe() + innerTx.rollback() + outerTx.rollback() + }) + + it(`clears pending jobs when a transaction rolls back due to an error`, async () => { + const { users, tasks, assignments } = setupLiveQueryCollections(`rollback`) + await assignments.preload() + + const recorder = recordBatches(assignments) + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + expect(() => { + tx.mutate(() => { + users.insert({ id: 31, name: `Temp` }) + tasks.insert({ id: 41, userId: 31, title: `Temp Task` }) + throw new Error(`boom`) + }) + }).toThrowError(`boom`) + + tx.rollback() + + const batchesBeforeFlush = recorder.batches.length + transactionScopedScheduler.flush(tx.id) + expect(recorder.batches.length).toBeGreaterThanOrEqual(batchesBeforeFlush) + if (recorder.batches.length > batchesBeforeFlush) { + const latestBatch = recorder.batches.at(-1)! + expect(latestBatch[0]?.type).toBe(`delete`) + } + expect(transactionScopedScheduler.hasPendingJobs(tx.id)).toBe(false) + // We emit the optimistic insert and, after the explicit rollback, possibly a + // compensating delete – but no duplicate inserts. + expect(recorder.batches[0]![0]).toMatchObject({ type: `insert` }) + + recorder.unsubscribe() + }) + + it(`dedupes batches across multiple subscribers`, async () => { + const { users, tasks, assignments } = + setupLiveQueryCollections(`multi-subscriber`) + await assignments.preload() + + const first = recordBatches(assignments) + const second = recordBatches(assignments) + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + tx.mutate(() => { + users.insert({ id: 51, name: `Multi` }) + tasks.insert({ id: 61, userId: 51, title: `Subscriber Task` }) + }) + + expect(first.batches).toHaveLength(1) + expect(second.batches).toHaveLength(1) + expect(first.batches[0]![0]).toMatchObject({ + value: { + userId: 51, + taskId: 61, + title: `Subscriber Task`, + }, + }) + + first.unsubscribe() + second.unsubscribe() + tx.rollback() + }) + + it(`runs join live queries once after their parent queries settle`, async () => { + const collectionA = createCollection<{ id: number; value: string }>({ + id: `diamond-A`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const collectionB = createCollection<{ id: number; value: string }>({ + id: `diamond-B`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const liveQueryA = createLiveQueryCollection({ + id: `diamond-lqA`, + startSync: true, + query: (q) => + q + .from({ a: collectionA }) + .select(({ a }) => ({ id: a.id, value: a.value })), + }) + + const liveQueryB = createLiveQueryCollection({ + id: `diamond-lqB`, + startSync: true, + query: (q) => + q + .from({ b: collectionB }) + .select(({ b }) => ({ id: b.id, value: b.value })), + }) + + const liveQueryJoin = createLiveQueryCollection({ + id: `diamond-join`, + startSync: true, + query: (q) => + q + .from({ left: liveQueryA }) + .join( + { right: liveQueryB }, + ({ left, right }) => eq(left.id, right.id), + `full` + ) + .select(({ left, right }) => ({ + left: left?.value, + right: right?.value, + })), + }) + + await Promise.all([ + liveQueryA.preload(), + liveQueryB.preload(), + liveQueryJoin.preload(), + ]) + const baseRunCount = liveQueryJoin.utils.getRunCount() + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + tx.mutate(() => { + collectionA.insert({ id: 1, value: `A1` }) + collectionB.insert({ id: 1, value: `B1` }) + }) + + expect(liveQueryJoin.toArray).toEqual([{ left: `A1`, right: `B1` }]) + expect(liveQueryJoin.utils.getRunCount()).toBe(baseRunCount + 1) + + tx.mutate(() => { + collectionA.update(1, (draft) => { + draft.value = `A1b` + }) + collectionB.update(1, (draft) => { + draft.value = `B1b` + }) + }) + + expect(liveQueryJoin.toArray).toEqual([{ left: `A1b`, right: `B1b` }]) + expect(liveQueryJoin.utils.getRunCount()).toBe(baseRunCount + 2) + tx.rollback() + }) + + it(`runs hybrid joins once when they observe both a live query and a collection`, async () => { + const collectionA = createCollection<{ id: number; value: string }>({ + id: `hybrid-A`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const collectionB = createCollection<{ id: number; value: string }>({ + id: `hybrid-B`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const liveQueryA = createLiveQueryCollection({ + id: `hybrid-lqA`, + startSync: true, + query: (q) => + q + .from({ a: collectionA }) + .select(({ a }) => ({ id: a.id, value: a.value })), + }) + + const hybridJoin = createLiveQueryCollection({ + id: `hybrid-join`, + startSync: true, + query: (q) => + q + .from({ left: liveQueryA }) + .join( + { right: collectionB }, + ({ left, right }) => eq(left.id, right.id), + `full` + ) + .select(({ left, right }) => ({ + left: left?.value, + right: right?.value, + })), + }) + + await Promise.all([liveQueryA.preload(), hybridJoin.preload()]) + const baseRunCount = hybridJoin.utils.getRunCount() + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + tx.mutate(() => { + collectionA.insert({ id: 7, value: `A7` }) + collectionB.insert({ id: 7, value: `B7` }) + }) + + expect(hybridJoin.toArray).toEqual([{ left: `A7`, right: `B7` }]) + expect(hybridJoin.utils.getRunCount()).toBe(baseRunCount + 1) + + tx.mutate(() => { + collectionA.update(7, (draft) => { + draft.value = `A7b` + }) + collectionB.update(7, (draft) => { + draft.value = `B7b` + }) + }) + + expect(hybridJoin.toArray).toEqual([{ left: `A7b`, right: `B7b` }]) + expect(hybridJoin.utils.getRunCount()).toBe(baseRunCount + 2) + tx.rollback() + }) + + it(`currently single batch when the join sees right-side data before the left`, async () => { + const collectionA = createCollection<{ id: number; value: string }>({ + id: `ordering-A`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const collectionB = createCollection<{ id: number; value: string }>({ + id: `ordering-B`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const liveQueryA = createLiveQueryCollection({ + id: `ordering-lqA`, + startSync: true, + query: (q) => + q + .from({ a: collectionA }) + .select(({ a }) => ({ id: a.id, value: a.value })), + }) + + const join = createLiveQueryCollection({ + id: `ordering-join`, + startSync: true, + query: (q) => + q + .from({ left: liveQueryA }) + .join( + { right: collectionB }, + ({ left, right }) => eq(left.id, right.id), + `full` + ) + .select(({ left, right }) => ({ + left: left?.value, + right: right?.value, + })), + }) + + await Promise.all([liveQueryA.preload(), join.preload()]) + const baseRunCount = join.utils.getRunCount() + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + tx.mutate(() => { + collectionB.insert({ id: 42, value: `right-first` }) + collectionA.insert({ id: 42, value: `left-later` }) + }) + + expect(join.toArray).toEqual([{ left: `left-later`, right: `right-first` }]) + expect(join.utils.getRunCount()).toBe(baseRunCount + 1) + tx.rollback() + }) + + it(`coalesces load-more callbacks scheduled within the same context`, () => { + const baseCollection = createCollection({ + id: `loader-users`, + getKey: (user) => user.id, + sync: { + sync: () => () => {}, + }, + }) + + const builder = new CollectionConfigBuilder({ + id: `loader-builder`, + query: (q) => q.from({ user: baseCollection }), + }) + + const contextId = Symbol(`loader-context`) + const loader = vi.fn(() => true) + const config = { + begin: vi.fn(), + write: vi.fn(), + commit: vi.fn(), + markReady: vi.fn(), + truncate: vi.fn(), + } as unknown as Parameters[`sync`]>[0] + + const syncState = { + messagesCount: 0, + subscribedToAllCollections: true, + unsubscribeCallbacks: new Set<() => void>(), + graph: { + pendingWork: () => false, + run: vi.fn(), + }, + inputs: {}, + pipeline: {}, + } as unknown as FullSyncState + + const maybeRunGraphSpy = vi + .spyOn(builder, `maybeRunGraph`) + .mockImplementation((combinedLoader) => { + combinedLoader?.() + }) + + // Set instance properties since this test calls scheduleGraphRun directly + builder.currentSyncConfig = config + builder.currentSyncState = syncState + + builder.scheduleGraphRun(loader, { contextId }) + builder.scheduleGraphRun(loader, { contextId }) + + transactionScopedScheduler.flush(contextId) + + expect(loader).toHaveBeenCalledTimes(1) + expect(maybeRunGraphSpy).toHaveBeenCalledTimes(1) + + maybeRunGraphSpy.mockRestore() + }) +})