Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/soft-lazy-deps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Fix scheduler handling of lazy left-join/live-query dependencies: treat non-enqueued lazy deps as satisfied to avoid unresolved-dependency deadlocks, and block only when a dep actually has pending work.
17 changes: 17 additions & 0 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,16 @@ export class CollectionConfigBuilder<
return Array.from(deps)
})()

// Ensure dependent builders are actually scheduled in this context so that
// dependency edges always point to a real job (or a deduped no-op if already scheduled).
if (contextId) {
for (const dep of dependentBuilders) {
if (typeof dep.scheduleGraphRun === `function`) {
dep.scheduleGraphRun(undefined, { contextId })
}
}
}

// We intentionally scope deduplication to the builder instance. Each instance
// owns caches and compiled pipelines, so sharing work across instances that
// merely reuse the same string id would execute the wrong builder's graph.
Expand Down Expand Up @@ -451,6 +461,13 @@ export class CollectionConfigBuilder<
this.pendingGraphRuns.delete(contextId)
}

/**
* Returns true if this builder has a pending graph run for the given context.
*/
hasPendingGraphRun(contextId: SchedulerContextId): boolean {
return this.pendingGraphRuns.has(contextId)
}

/**
* Executes a pending graph run. Called by the scheduler when dependencies are satisfied.
* Clears the pending state BEFORE execution so that any re-schedules during the run
Expand Down
26 changes: 25 additions & 1 deletion packages/db/src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ interface SchedulerContextState {
completed: Set<unknown>
}

interface PendingAwareJob {
hasPendingGraphRun: (contextId: SchedulerContextId) => boolean
}

function isPendingAwareJob(dep: any): dep is PendingAwareJob {
return (
typeof dep === `object` &&
dep !== null &&
typeof dep.hasPendingGraphRun === `function`
)
}

/**
* Scoped scheduler that coalesces work by context and job.
*
Expand Down Expand Up @@ -119,7 +131,19 @@ export class Scheduler {
if (deps) {
ready = true
for (const dep of deps) {
if (dep !== jobId && !completed.has(dep)) {
if (dep === jobId) continue

const depHasPending =
isPendingAwareJob(dep) && dep.hasPendingGraphRun(contextId)

// Treat dependencies as blocking if the dep has a pending run in this
// context or if it's enqueued and not yet complete. If the dep is
// neither pending nor enqueued, consider it satisfied to avoid deadlocks
// on lazy sources that never schedule work.
if (
(jobs.has(dep) && !completed.has(dep)) ||
(!jobs.has(dep) && depHasPending)
) {
ready = false
break
}
Expand Down
237 changes: 236 additions & 1 deletion packages/db/tests/query/scheduler.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { afterEach, describe, expect, it, vi } from "vitest"
import { createCollection } from "../../src/collection/index.js"
import { createLiveQueryCollection, eq } from "../../src/query/index.js"
import { createLiveQueryCollection, eq, isNull } from "../../src/query/index.js"
import { createTransaction } from "../../src/transactions.js"
import { createOptimisticAction } from "../../src/optimistic-action.js"
import { transactionScopedScheduler } from "../../src/scheduler.js"
import { CollectionConfigBuilder } from "../../src/query/live/collection-config-builder.js"
import { mockSyncCollectionOptions } from "../utils.js"
import type { FullSyncState } from "../../src/query/live/types.js"
import type { SyncConfig } from "../../src/types.js"

Expand Down Expand Up @@ -524,4 +526,237 @@ describe(`live query scheduler`, () => {

maybeRunGraphSpy.mockRestore()
})

it(`should handle optimistic mutations with nested left joins without scheduler errors`, async () => {
// This test verifies that optimistic mutations on collections with nested live query
// collections using left joins complete successfully without scheduler errors.
//
// Expected behavior:
// 1. Collections are pre-populated with initialData (via mockSyncCollectionOptions)
// 2. Nested live query collections use left joins
// 3. An optimistic action updates an existing item using draft mutations
// 4. The scheduler should flush the transaction successfully without detecting unresolved dependencies

interface Account {
id: string
user_id: string
name: string
}

interface UserProfile {
id: string
profile: string
}

interface Team {
id: string
account_id: string
deleted_ts: string | null
}

// Use mockSyncCollectionOptions with initialData to match the failing test
// Note: mockSyncCollectionOptions already sets startSync: true internally
const accounts = createCollection<Account>(
mockSyncCollectionOptions({
id: `left-join-bug-accounts`,
getKey: (account) => account.id,
initialData: [
{ id: `account-1`, user_id: `user-1`, name: `Account 1` },
],
})
)

const users = createCollection<UserProfile>(
mockSyncCollectionOptions({
id: `left-join-bug-users`,
getKey: (user) => user.id,
initialData: [{ id: `user-1`, profile: `Profile 1` }],
})
)

const teams = createCollection<Team>(
mockSyncCollectionOptions({
id: `left-join-bug-teams`,
getKey: (team) => team.id,
initialData: [
{
id: `team-1`,
account_id: `account-1`,
deleted_ts: null as string | null,
},
],
})
)

// Create nested live query collections similar to the bug report
const accountsWithUsers = createLiveQueryCollection({
id: `left-join-bug-accounts-with-users`,
startSync: true,
query: (q) =>
q
.from({ account: accounts })
.join({ user: users }, ({ user, account }) =>
eq(user.id, account.user_id)
)
.select(({ account, user }) => ({
account: account,
profile: user?.profile,
})),
})

const activeTeams = createLiveQueryCollection({
id: `left-join-bug-active-teams`,
startSync: true,
query: (q) =>
q
.from({ team: teams })
.where(({ team }) => isNull(team.deleted_ts))
.select(({ team }) => ({ team })),
})

const accountsWithTeams = createLiveQueryCollection({
id: `left-join-bug-accounts-with-teams`,
startSync: true,
query: (q) =>
q
.from({ accountWithUser: accountsWithUsers })
.leftJoin({ team: activeTeams }, ({ accountWithUser, team }) =>
eq(team.team.account_id, accountWithUser.account.id)
)
.select(({ accountWithUser, team }) => ({
account: accountWithUser.account,
profile: accountWithUser.profile,
team: team?.team,
})),
})

// Wait for all queries to be ready
await Promise.all([
accountsWithUsers.preload(),
activeTeams.preload(),
accountsWithTeams.preload(),
])

// Create an optimistic action that mutates using draft
const testAction = createOptimisticAction<string>({
onMutate: (id) => {
// Update existing data using draft mutation
accounts.update(id, (draft) => {
draft.name = `new name here`
})
},
mutationFn: (_id, _params) => {
return Promise.resolve({ txid: 0 })
},
})

// Execute the optimistic action and flush - this should complete without scheduler errors
let error: Error | undefined
let transaction: any

try {
transaction = testAction(`account-1`)

// Wait for the transaction to process
await new Promise((resolve) => setTimeout(resolve, 10))

// The scheduler should flush successfully without detecting unresolved dependencies
transactionScopedScheduler.flushAll()
} catch (e) {
error = e as Error
}

// The scheduler should not throw unresolved dependency errors
expect(error).toBeUndefined()

// Verify the transaction was created successfully
expect(transaction).toBeDefined()
})

it(`should prevent stale data when lazy source also depends on modified collection`, async () => {
interface BaseItem {
id: string
value: number
}

// Base collection
const baseCollection = createCollection<BaseItem>(
mockSyncCollectionOptions({
id: `race-base`,
getKey: (item) => item.id,
initialData: [{ id: `1`, value: 10 }],
})
)

// QueryA: depends on base
const queryA = createLiveQueryCollection({
id: `race-queryA`,
startSync: true,
query: (q) =>
q.from({ item: baseCollection }).select(({ item }) => ({
id: item.id,
value: item.value,
})),
})

// QueryB: also depends on base (independent from queryA)
const queryB = createLiveQueryCollection({
id: `race-queryB`,
startSync: true,
query: (q) =>
q.from({ item: baseCollection }).select(({ item }) => ({
id: item.id,
value: item.value,
})),
})

// QueryC: depends on queryA, left joins queryB (lazy)
const queryC = createLiveQueryCollection({
id: `race-queryC`,
startSync: true,
query: (q) =>
q
.from({ a: queryA })
.leftJoin({ b: queryB }, ({ a, b }) => eq(a.id, b.id))
.select(({ a, b }) => ({
id: a.id,
aValue: a.value,
bValue: b?.value ?? null,
})),
})

// Wait for initial sync
await Promise.all([queryA.preload(), queryB.preload(), queryC.preload()])

// Verify initial state
const initialC = [...queryC.values()][0]
expect(initialC?.aValue).toBe(10)
expect(initialC?.bValue).toBe(10)

// Mutate the base collection
const action = createOptimisticAction<string>({
autoCommit: false,
onMutate: (id) => {
baseCollection.update(id, (draft) => {
draft.value = 100
})
},
mutationFn: (_id) => Promise.resolve({ txid: 0 }),
})

let error: Error | undefined
try {
action(`1`)
await new Promise((resolve) => setTimeout(resolve, 10))
transactionScopedScheduler.flushAll()
} catch (e) {
error = e as Error
}

expect(error).toBeUndefined()

const finalC = [...queryC.values()][0]
expect(finalC?.aValue).toBe(100)
expect(finalC?.bValue).toBe(100)
})
})
Loading