diff --git a/.changeset/soft-lazy-deps.md b/.changeset/soft-lazy-deps.md new file mode 100644 index 000000000..94b1e667c --- /dev/null +++ b/.changeset/soft-lazy-deps.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Fix scheduler handling of lazy left-join/live-query dependencies: treat non-enqueued lazy deps as satisfied to avoid unresolved-dependency deadlocks, and block only when a dep actually has pending work. diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index af3cb2400..dd28be356 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -406,6 +406,16 @@ export class CollectionConfigBuilder< return Array.from(deps) })() + // Ensure dependent builders are actually scheduled in this context so that + // dependency edges always point to a real job (or a deduped no-op if already scheduled). + if (contextId) { + for (const dep of dependentBuilders) { + if (typeof dep.scheduleGraphRun === `function`) { + dep.scheduleGraphRun(undefined, { contextId }) + } + } + } + // We intentionally scope deduplication to the builder instance. Each instance // owns caches and compiled pipelines, so sharing work across instances that // merely reuse the same string id would execute the wrong builder's graph. @@ -451,6 +461,13 @@ export class CollectionConfigBuilder< this.pendingGraphRuns.delete(contextId) } + /** + * Returns true if this builder has a pending graph run for the given context. + */ + hasPendingGraphRun(contextId: SchedulerContextId): boolean { + return this.pendingGraphRuns.has(contextId) + } + /** * Executes a pending graph run. Called by the scheduler when dependencies are satisfied. * Clears the pending state BEFORE execution so that any re-schedules during the run diff --git a/packages/db/src/scheduler.ts b/packages/db/src/scheduler.ts index de67593ed..30222f9e4 100644 --- a/packages/db/src/scheduler.ts +++ b/packages/db/src/scheduler.ts @@ -25,6 +25,18 @@ interface SchedulerContextState { completed: Set } +interface PendingAwareJob { + hasPendingGraphRun: (contextId: SchedulerContextId) => boolean +} + +function isPendingAwareJob(dep: any): dep is PendingAwareJob { + return ( + typeof dep === `object` && + dep !== null && + typeof dep.hasPendingGraphRun === `function` + ) +} + /** * Scoped scheduler that coalesces work by context and job. * @@ -119,7 +131,19 @@ export class Scheduler { if (deps) { ready = true for (const dep of deps) { - if (dep !== jobId && !completed.has(dep)) { + if (dep === jobId) continue + + const depHasPending = + isPendingAwareJob(dep) && dep.hasPendingGraphRun(contextId) + + // Treat dependencies as blocking if the dep has a pending run in this + // context or if it's enqueued and not yet complete. If the dep is + // neither pending nor enqueued, consider it satisfied to avoid deadlocks + // on lazy sources that never schedule work. + if ( + (jobs.has(dep) && !completed.has(dep)) || + (!jobs.has(dep) && depHasPending) + ) { ready = false break } diff --git a/packages/db/tests/query/scheduler.test.ts b/packages/db/tests/query/scheduler.test.ts index a2b6026c5..5088a8a2c 100644 --- a/packages/db/tests/query/scheduler.test.ts +++ b/packages/db/tests/query/scheduler.test.ts @@ -1,9 +1,11 @@ import { afterEach, describe, expect, it, vi } from "vitest" import { createCollection } from "../../src/collection/index.js" -import { createLiveQueryCollection, eq } from "../../src/query/index.js" +import { createLiveQueryCollection, eq, isNull } from "../../src/query/index.js" import { createTransaction } from "../../src/transactions.js" +import { createOptimisticAction } from "../../src/optimistic-action.js" import { transactionScopedScheduler } from "../../src/scheduler.js" import { CollectionConfigBuilder } from "../../src/query/live/collection-config-builder.js" +import { mockSyncCollectionOptions } from "../utils.js" import type { FullSyncState } from "../../src/query/live/types.js" import type { SyncConfig } from "../../src/types.js" @@ -524,4 +526,237 @@ describe(`live query scheduler`, () => { maybeRunGraphSpy.mockRestore() }) + + it(`should handle optimistic mutations with nested left joins without scheduler errors`, async () => { + // This test verifies that optimistic mutations on collections with nested live query + // collections using left joins complete successfully without scheduler errors. + // + // Expected behavior: + // 1. Collections are pre-populated with initialData (via mockSyncCollectionOptions) + // 2. Nested live query collections use left joins + // 3. An optimistic action updates an existing item using draft mutations + // 4. The scheduler should flush the transaction successfully without detecting unresolved dependencies + + interface Account { + id: string + user_id: string + name: string + } + + interface UserProfile { + id: string + profile: string + } + + interface Team { + id: string + account_id: string + deleted_ts: string | null + } + + // Use mockSyncCollectionOptions with initialData to match the failing test + // Note: mockSyncCollectionOptions already sets startSync: true internally + const accounts = createCollection( + mockSyncCollectionOptions({ + id: `left-join-bug-accounts`, + getKey: (account) => account.id, + initialData: [ + { id: `account-1`, user_id: `user-1`, name: `Account 1` }, + ], + }) + ) + + const users = createCollection( + mockSyncCollectionOptions({ + id: `left-join-bug-users`, + getKey: (user) => user.id, + initialData: [{ id: `user-1`, profile: `Profile 1` }], + }) + ) + + const teams = createCollection( + mockSyncCollectionOptions({ + id: `left-join-bug-teams`, + getKey: (team) => team.id, + initialData: [ + { + id: `team-1`, + account_id: `account-1`, + deleted_ts: null as string | null, + }, + ], + }) + ) + + // Create nested live query collections similar to the bug report + const accountsWithUsers = createLiveQueryCollection({ + id: `left-join-bug-accounts-with-users`, + startSync: true, + query: (q) => + q + .from({ account: accounts }) + .join({ user: users }, ({ user, account }) => + eq(user.id, account.user_id) + ) + .select(({ account, user }) => ({ + account: account, + profile: user?.profile, + })), + }) + + const activeTeams = createLiveQueryCollection({ + id: `left-join-bug-active-teams`, + startSync: true, + query: (q) => + q + .from({ team: teams }) + .where(({ team }) => isNull(team.deleted_ts)) + .select(({ team }) => ({ team })), + }) + + const accountsWithTeams = createLiveQueryCollection({ + id: `left-join-bug-accounts-with-teams`, + startSync: true, + query: (q) => + q + .from({ accountWithUser: accountsWithUsers }) + .leftJoin({ team: activeTeams }, ({ accountWithUser, team }) => + eq(team.team.account_id, accountWithUser.account.id) + ) + .select(({ accountWithUser, team }) => ({ + account: accountWithUser.account, + profile: accountWithUser.profile, + team: team?.team, + })), + }) + + // Wait for all queries to be ready + await Promise.all([ + accountsWithUsers.preload(), + activeTeams.preload(), + accountsWithTeams.preload(), + ]) + + // Create an optimistic action that mutates using draft + const testAction = createOptimisticAction({ + onMutate: (id) => { + // Update existing data using draft mutation + accounts.update(id, (draft) => { + draft.name = `new name here` + }) + }, + mutationFn: (_id, _params) => { + return Promise.resolve({ txid: 0 }) + }, + }) + + // Execute the optimistic action and flush - this should complete without scheduler errors + let error: Error | undefined + let transaction: any + + try { + transaction = testAction(`account-1`) + + // Wait for the transaction to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The scheduler should flush successfully without detecting unresolved dependencies + transactionScopedScheduler.flushAll() + } catch (e) { + error = e as Error + } + + // The scheduler should not throw unresolved dependency errors + expect(error).toBeUndefined() + + // Verify the transaction was created successfully + expect(transaction).toBeDefined() + }) + + it(`should prevent stale data when lazy source also depends on modified collection`, async () => { + interface BaseItem { + id: string + value: number + } + + // Base collection + const baseCollection = createCollection( + mockSyncCollectionOptions({ + id: `race-base`, + getKey: (item) => item.id, + initialData: [{ id: `1`, value: 10 }], + }) + ) + + // QueryA: depends on base + const queryA = createLiveQueryCollection({ + id: `race-queryA`, + startSync: true, + query: (q) => + q.from({ item: baseCollection }).select(({ item }) => ({ + id: item.id, + value: item.value, + })), + }) + + // QueryB: also depends on base (independent from queryA) + const queryB = createLiveQueryCollection({ + id: `race-queryB`, + startSync: true, + query: (q) => + q.from({ item: baseCollection }).select(({ item }) => ({ + id: item.id, + value: item.value, + })), + }) + + // QueryC: depends on queryA, left joins queryB (lazy) + const queryC = createLiveQueryCollection({ + id: `race-queryC`, + startSync: true, + query: (q) => + q + .from({ a: queryA }) + .leftJoin({ b: queryB }, ({ a, b }) => eq(a.id, b.id)) + .select(({ a, b }) => ({ + id: a.id, + aValue: a.value, + bValue: b?.value ?? null, + })), + }) + + // Wait for initial sync + await Promise.all([queryA.preload(), queryB.preload(), queryC.preload()]) + + // Verify initial state + const initialC = [...queryC.values()][0] + expect(initialC?.aValue).toBe(10) + expect(initialC?.bValue).toBe(10) + + // Mutate the base collection + const action = createOptimisticAction({ + autoCommit: false, + onMutate: (id) => { + baseCollection.update(id, (draft) => { + draft.value = 100 + }) + }, + mutationFn: (_id) => Promise.resolve({ txid: 0 }), + }) + + let error: Error | undefined + try { + action(`1`) + await new Promise((resolve) => setTimeout(resolve, 10)) + transactionScopedScheduler.flushAll() + } catch (e) { + error = e as Error + } + + expect(error).toBeUndefined() + + const finalC = [...queryC.values()][0] + expect(finalC?.aValue).toBe(100) + expect(finalC?.bValue).toBe(100) + }) })