From 1249a0810d91d283ead0c88efa07e35cfe745fdc Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 1 Oct 2025 19:10:46 +0100 Subject: [PATCH 01/14] wip --- .../query/live/collection-config-builder.ts | 43 +++++ .../src/query/live/collection-subscriber.ts | 112 ++++++------ packages/db/tests/query/join.test.ts | 166 ++++++++++++++++++ 3 files changed, 259 insertions(+), 62 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 72d90f905..db304c9b2 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -32,6 +32,7 @@ export class CollectionConfigBuilder< private readonly id: string readonly query: QueryIR private readonly collections: Record> + private readonly collectionAliasesById: Map> // WeakMap to store the keys of the results // so that we can retrieve them in the getKey function @@ -68,6 +69,7 @@ export class CollectionConfigBuilder< this.query = buildQueryFromConfig(config) this.collections = extractCollectionsFromQuery(this.query) + this.collectionAliasesById = extractCollectionAliases(this.query) // Create compare function for ordering if the query has orderBy if (this.query.orderBy && this.query.orderBy.length > 0) { @@ -96,6 +98,11 @@ export class CollectionConfigBuilder< } } + getCollectionAliases(collectionId: string): Array { + const aliases = this.collectionAliasesById.get(collectionId) + return aliases ? Array.from(aliases) : [] + } + // The callback function is called after the graph has run. // This gives the callback a chance to load more data if needed, // that's used to optimize orderBy operators that set a limit, @@ -444,6 +451,42 @@ function extractCollectionsFromQuery( return collections } +function extractCollectionAliases(query: QueryIR): Map> { + const aliasesById = new Map>() + + function recordAlias(source: any) { + if (!source) return + + if (source.type === `collectionRef`) { + const { id } = source.collection + const existing = aliasesById.get(id) + if (existing) { + existing.add(source.alias) + } else { + aliasesById.set(id, new Set([source.alias])) + } + } else if (source.type === `queryRef`) { + traverse(source.query) + } + } + + function traverse(q: QueryIR) { + if (!q) return + + recordAlias(q.from) + + if (q.join) { + for (const joinClause of q.join) { + recordAlias(joinClause.from) + } + } + } + + traverse(query) + + return aliasesById +} + function accumulateChanges( acc: Map>, [[key, tupleData], multiplicity]: [ diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index c1fb88b3a..4f6897aa1 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -1,5 +1,6 @@ import { MultiSet } from "@tanstack/db-ivm" import { convertToBasicExpression } from "../compiler/expressions.js" +import { Func } from "../ir.js" import type { FullSyncState } from "./types.js" import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" import type { Collection } from "../../collection/index.js" @@ -25,34 +26,14 @@ export class CollectionSubscriber< ) {} subscribe(): CollectionSubscription { - const collectionAlias = findCollectionAlias( - this.collectionId, - this.collectionConfigBuilder.query - ) - const whereClause = this.getWhereClauseFromAlias(collectionAlias) - - if (whereClause) { - // Convert WHERE clause to BasicExpression format for collection subscription - const whereExpression = convertToBasicExpression( - whereClause, - collectionAlias! - ) + const whereExpression = this.buildCollectionFilter() - if (whereExpression) { - // Use index optimization for this collection - return this.subscribeToChanges(whereExpression) - } else { - // This should not happen - if we have a whereClause but can't create whereExpression, - // it indicates a bug in our optimization logic - throw new Error( - `Failed to convert WHERE clause to collection filter for collection '${this.collectionId}'. ` + - `This indicates a bug in the query optimization logic.` - ) - } - } else { - // No WHERE clause for this collection, use regular subscription - return this.subscribeToChanges() + if (whereExpression) { + return this.subscribeToChanges(whereExpression) } + + // No applicable filter for this collection, use regular subscription + return this.subscribeToChanges() } private subscribeToChanges(whereExpression?: BasicExpression) { @@ -240,15 +221,38 @@ export class CollectionSubscriber< }) } - private getWhereClauseFromAlias( - collectionAlias: string | undefined - ): BasicExpression | undefined { + private buildCollectionFilter(): BasicExpression | undefined { const collectionWhereClausesCache = this.collectionConfigBuilder.collectionWhereClausesCache - if (collectionAlias && collectionWhereClausesCache) { - return collectionWhereClausesCache.get(collectionAlias) + if (!collectionWhereClausesCache) { + return undefined } - return undefined + + const aliases = this.collectionConfigBuilder.getCollectionAliases( + this.collectionId + ) + if (aliases.length === 0) { + return undefined + } + + const convertedClauses: Array> = [] + for (const alias of aliases) { + const clause = collectionWhereClausesCache.get(alias) + if (!clause) { + // At least one alias requires the full collection, so we cannot safely filter + return undefined + } + + const converted = convertToBasicExpression(clause, alias) + if (!converted) { + // Conversion failed which means we cannot use this filter at the subscription level + return undefined + } + + convertedClauses.push(converted) + } + + return combineWithOr(convertedClauses) } private *trackSentValues( @@ -267,36 +271,6 @@ export class CollectionSubscriber< } } -/** - * Finds the alias for a collection ID in the query - */ -function findCollectionAlias( - collectionId: string, - query: any -): string | undefined { - // Check FROM clause - if ( - query.from?.type === `collectionRef` && - query.from.collection?.id === collectionId - ) { - return query.from.alias - } - - // Check JOIN clauses - if (query.join) { - for (const joinClause of query.join) { - if ( - joinClause.from?.type === `collectionRef` && - joinClause.from.collection?.id === collectionId - ) { - return joinClause.from.alias - } - } - } - - return undefined -} - /** * Helper function to send changes to a D2 input stream */ @@ -326,6 +300,20 @@ function sendChangesToInput( return multiSetArray.length } +function combineWithOr( + expressions: Array> +): BasicExpression | undefined { + if (expressions.length === 0) { + return undefined + } + + if (expressions.length === 1) { + return expressions[0]! + } + + return new Func(`or`, expressions) +} + /** Splits updates into a delete of the old value and an insert of the new value */ function* splitUpdates< T extends object = Record, diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index ab8c2442b..3b4dc81e9 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -4,6 +4,9 @@ import { createLiveQueryCollection, eq, isUndefined, + lt, + gt, + not, } from "../../src/query/index.js" import { createCollection } from "../../src/collection/index.js" import { mockSyncCollectionOptions } from "../utils.js" @@ -1475,6 +1478,169 @@ function createJoinTests(autoIndex: `off` | `eager`): void { const userIds = results.map((r) => r.user.id).sort() expect(userIds).toEqual([2, 3, 4]) }) + + test(`should handle where clause on a self-join query`, () => { + // This test reproduces the bug where a WHERE clause combined with a LEFT JOIN + // on the same collection causes the joined parent to be undefined + type Event = { + id: string + parent_id: string | undefined + name: string + } + + const sampleEvents: Array = [ + { + id: `ba224e71-a464-418d-a0a9-5959b490775d`, + parent_id: undefined, + name: `Parent Event`, + }, + { + id: `3770a4a6-3260-4566-9f79-f50864ebdd46`, + parent_id: `ba224e71-a464-418d-a0a9-5959b490775d`, + name: `Child Event`, + }, + { + id: `another-child-id`, + parent_id: `ba224e71-a464-418d-a0a9-5959b490775d`, + name: `Another Child`, + }, + ] + + const eventCollection = createCollection( + mockSyncCollectionOptions({ + id: `test-events-self-join-bug`, + getKey: (event) => event.id, + initialData: sampleEvents, + autoIndex, + }) + ) + + const queryWithWhere = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ event: eventCollection }) + .where(({ event }) => + eq(event.id, `3770a4a6-3260-4566-9f79-f50864ebdd46`) + ) + .join( + { parent: eventCollection }, + ({ event, parent }) => eq(parent.id, event.parent_id), + `left` + ) + .select(({ event, parent }) => ({ + id: event.id, + parent_id: event.parent_id, + parent: { + id: parent?.id, + }, + })), + }) + + const resultsWithWhere = queryWithWhere.toArray + expect(resultsWithWhere).toHaveLength(1) + + const childEventWithWhere = resultsWithWhere[0]! + expect(childEventWithWhere).toBeDefined() + + expect(childEventWithWhere.id).toBe( + `3770a4a6-3260-4566-9f79-f50864ebdd46` + ) + expect(childEventWithWhere.parent_id).toBe( + `ba224e71-a464-418d-a0a9-5959b490775d` + ) + expect(childEventWithWhere.parent.id).toBe( + `ba224e71-a464-418d-a0a9-5959b490775d` + ) + }) + }) + + test(`should handle multiple joins with where clauses to the same source collection`, () => { + type Collection1 = { + id: number + value: number + } + + type Collection2 = { + id: number + value: number + other: number + } + + const collection1Data: Array = [{ id: 1, value: 1 }] + + const collection2Data: Array = [ + { id: 1, value: 1, other: 10 }, + { id: 2, value: 1, other: 30 }, + ] + + const collection1 = createCollection( + mockSyncCollectionOptions({ + id: `test-collection1-multiple-joins`, + getKey: (item) => item.id, + initialData: collection1Data, + autoIndex, + }) + ) + + const collection2 = createCollection( + mockSyncCollectionOptions({ + id: `test-collection2-multiple-joins`, + getKey: (item) => item.id, + initialData: collection2Data, + autoIndex, + }) + ) + + const multipleJoinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ main: collection1 }) + .join( + { + join1: q + .from({ join1: collection2 }) + .where(({ join1 }) => not(gt(join1.other, 20))), + }, + ({ main, join1 }) => eq(main.value, join1.value), + "left" + ) + .join( + { + join2: q + .from({ join2: collection2 }) + .where(({ join2 }) => not(lt(join2.other, 20))), + }, + ({ main, join2 }) => eq(main.value, join2.value), + "left" + ), + }) + + const multipleResults = multipleJoinQuery.toArray + + console.log(multipleResults) + + // This should work - we're filtering for records where join1 has 'a' AND join2 has 'b' + // But it might fail due to the sequential WHERE clause issue + expect(multipleResults).toHaveLength(1) + + const result = multipleResults[0]! + expect(result).toBeDefined() + + // Should have the main item + expect(result.main.id).toBe(1) + + // Should have both joined items with their respective filters + expect(result.join1).toBeDefined() + expect(result.join1!.id).toBe(1) + expect(result.join1!.value).toBe(1) + expect(result.join1!.other).toBe(10) + + expect(result.join2).toBeDefined() + expect(result.join2!.id).toBe(2) + expect(result.join2!.value).toBe(1) + expect(result.join2!.other).toBe(30) }) } From 8baf80cec2bee6d0f6dc7be0296db13be6c5b18e Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 1 Oct 2025 20:38:06 +0100 Subject: [PATCH 02/14] convert to a subscription per alias --- packages/db/src/errors.ts | 15 ++- packages/db/src/query/compiler/index.ts | 4 +- packages/db/src/query/compiler/joins.ts | 16 ++- packages/db/src/query/compiler/order-by.ts | 7 ++ .../query/live/collection-config-builder.ts | 43 +++++-- .../src/query/live/collection-subscriber.ts | 116 +++++++----------- packages/db/tests/query/join.test.ts | 10 +- 7 files changed, 113 insertions(+), 98 deletions(-) diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 7e107d0b8..431df1160 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -350,8 +350,11 @@ export class LimitOffsetRequireOrderByError extends QueryCompilationError { } export class CollectionInputNotFoundError extends QueryCompilationError { - constructor(collectionId: string) { - super(`Input for collection "${collectionId}" not found in inputs map`) + constructor(alias: string, collectionId?: string) { + const details = collectionId + ? `alias "${alias}" (collection "${collectionId}")` + : `collection "${alias}"` + super(`Input for ${details} not found in inputs map`) } } @@ -563,3 +566,11 @@ export class CannotCombineEmptyExpressionListError extends QueryOptimizerError { super(`Cannot combine empty expression list`) } } + +export class WhereClauseConversionError extends QueryOptimizerError { + constructor(collectionId: string, alias: string) { + super( + `Failed to convert WHERE clause to collection filter for collection '${collectionId}' alias '${alias}'. This indicates a bug in the query optimization logic.` + ) + } +} diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index b7d060abf..c4e9cb6a5 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -337,9 +337,9 @@ function processFrom( ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { - const input = allInputs[from.collection.id] + const input = allInputs[from.alias] ?? allInputs[from.collection.id] if (!input) { - throw new CollectionInputNotFoundError(from.collection.id) + throw new CollectionInputNotFoundError(from.alias, from.collection.id) } return { alias: from.alias, input, collectionId: from.collection.id } } diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 1beb93728..c9c158ae9 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -253,12 +253,16 @@ function processJoin( [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( tap((data) => { - const lazyCollectionSubscription = subscriptions[lazyCollection.id] + const lazyAliasCandidate = + activeCollection === `main` ? joinedTableAlias : mainTableAlias + const lazyCollectionSubscription = + subscriptions[lazyAliasCandidate] ?? + subscriptions[`__collection:${lazyCollection.id}`] if (!lazyCollectionSubscription) { - throw new Error( - `Internal error: subscription for collection is missing in join pipeline. Make sure the live query collection sets the subscription before running the pipeline.` - ) + // The alias was not subscribed (e.g. belongs to a nested subquery), + // so we skip the lazy loading optimization for this join. + return } if (lazyCollectionSubscription.hasLoadedInitialState()) { @@ -401,9 +405,9 @@ function processJoinSource( ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { - const input = allInputs[from.collection.id] + const input = allInputs[from.alias] ?? allInputs[from.collection.id] if (!input) { - throw new CollectionInputNotFoundError(from.collection.id) + throw new CollectionInputNotFoundError(from.alias, from.collection.id) } return { alias: from.alias, input, collectionId: from.collection.id } } diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 240b7e856..13ac6be0b 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -13,6 +13,7 @@ import type { BaseIndex } from "../../indexes/base-index.js" import type { Collection } from "../../collection/index.js" export type OrderByOptimizationInfo = { + alias: string offset: number limit: number comparator: ( @@ -157,7 +158,13 @@ export function processOrderBy( if (index && index.supports(`gt`)) { // We found an index that we can use to lazily load ordered data + const orderByAlias = + orderByExpression.path.length > 1 + ? String(orderByExpression.path[0]) + : rawQuery.from.alias + const orderByOptimizationInfo = { + alias: orderByAlias, offset: offset ?? 0, limit, comparator, diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index db304c9b2..c19496118 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -32,7 +32,8 @@ export class CollectionConfigBuilder< private readonly id: string readonly query: QueryIR private readonly collections: Record> - private readonly collectionAliasesById: Map> + private readonly aliasToCollectionId: Record + private readonly collectionByAlias: Record> // WeakMap to store the keys of the results // so that we can retrieve them in the getKey function @@ -52,7 +53,7 @@ export class CollectionConfigBuilder< | Map> | undefined - // Map of collection ID to subscription + // Map of collection alias to subscription readonly subscriptions: Record = {} // Map of collection IDs to functions that load keys for that lazy collection lazyCollectionsCallbacks: Record = {} @@ -69,7 +70,18 @@ export class CollectionConfigBuilder< this.query = buildQueryFromConfig(config) this.collections = extractCollectionsFromQuery(this.query) - this.collectionAliasesById = extractCollectionAliases(this.query) + const collectionAliasesById = extractCollectionAliases(this.query) + + this.aliasToCollectionId = {} + this.collectionByAlias = {} + for (const [collectionId, aliases] of collectionAliasesById.entries()) { + const collection = this.collections[collectionId] + if (!collection) continue + for (const alias of aliases) { + this.aliasToCollectionId[alias] = collectionId + this.collectionByAlias[alias] = collection + } + } // Create compare function for ordering if the query has orderBy if (this.query.orderBy && this.query.orderBy.length > 0) { @@ -98,9 +110,12 @@ export class CollectionConfigBuilder< } } - getCollectionAliases(collectionId: string): Array { - const aliases = this.collectionAliasesById.get(collectionId) - return aliases ? Array.from(aliases) : [] + getCollectionIdForAlias(alias: string): string { + const collectionId = this.aliasToCollectionId[alias] + if (!collectionId) { + throw new Error(`Unknown collection alias "${alias}"`) + } + return collectionId } // The callback function is called after the graph has run. @@ -203,8 +218,8 @@ export class CollectionConfigBuilder< private compileBasePipeline() { this.graphCache = new D2() this.inputsCache = Object.fromEntries( - Object.entries(this.collections).map(([key]) => [ - key, + Object.keys(this.collectionByAlias).map((alias) => [ + alias, this.graphCache!.newInput(), ]) ) @@ -340,9 +355,11 @@ export class CollectionConfigBuilder< config: Parameters[`sync`]>[0], syncState: FullSyncState ) { - const loaders = Object.entries(this.collections).map( - ([collectionId, collection]) => { + const loaders = Object.entries(this.collectionByAlias).map( + ([alias, collection]) => { + const collectionId = this.aliasToCollectionId[alias]! const collectionSubscriber = new CollectionSubscriber( + alias, collectionId, collection, config, @@ -351,7 +368,9 @@ export class CollectionConfigBuilder< ) const subscription = collectionSubscriber.subscribe() - this.subscriptions[collectionId] = subscription + this.subscriptions[alias] = subscription + const collectionKey = `__collection:${collectionId}` + this.subscriptions[collectionKey] = subscription const loadMore = collectionSubscriber.loadMoreIfNeeded.bind( collectionSubscriber, @@ -470,7 +489,7 @@ function extractCollectionAliases(query: QueryIR): Map> { } } - function traverse(q: QueryIR) { + function traverse(q?: QueryIR) { if (!q) return recordAlias(q.from) diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 4f6897aa1..ab85efc3d 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -1,12 +1,13 @@ import { MultiSet } from "@tanstack/db-ivm" import { convertToBasicExpression } from "../compiler/expressions.js" -import { Func } from "../ir.js" +import { WhereClauseConversionError } from "../../errors.js" import type { FullSyncState } from "./types.js" import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" import type { Collection } from "../../collection/index.js" import type { ChangeMessage, SyncConfig } from "../../types.js" import type { Context, GetResult } from "../builder/types.js" import type { BasicExpression } from "../ir.js" +import type { OrderByOptimizationInfo } from "../compiler/order-by.js" import type { CollectionConfigBuilder } from "./collection-config-builder.js" import type { CollectionSubscription } from "../../collection/subscription.js" @@ -18,6 +19,7 @@ export class CollectionSubscriber< private biggest: any = undefined constructor( + private alias: string, private collectionId: string, private collection: Collection, private config: Parameters[`sync`]>[0], @@ -26,25 +28,29 @@ export class CollectionSubscriber< ) {} subscribe(): CollectionSubscription { - const whereExpression = this.buildCollectionFilter() + const whereClause = this.getWhereClauseForAlias() - if (whereExpression) { - return this.subscribeToChanges(whereExpression) + if (whereClause) { + const whereExpression = convertToBasicExpression(whereClause, this.alias) + + if (whereExpression) { + return this.subscribeToChanges(whereExpression) + } + + throw new WhereClauseConversionError(this.collectionId, this.alias) } - // No applicable filter for this collection, use regular subscription return this.subscribeToChanges() } private subscribeToChanges(whereExpression?: BasicExpression) { let subscription: CollectionSubscription - if ( - Object.hasOwn( - this.collectionConfigBuilder.optimizableOrderByCollections, - this.collectionId + const orderByInfo = this.getOrderByInfo() + if (orderByInfo) { + subscription = this.subscribeToOrderedChanges( + whereExpression, + orderByInfo ) - ) { - subscription = this.subscribeToOrderedChanges(whereExpression) } else { // If the collection is lazy then we should not include the initial state const includeInitialState = @@ -66,7 +72,7 @@ export class CollectionSubscriber< changes: Iterable>, callback?: () => boolean ) { - const input = this.syncState.inputs[this.collectionId]! + const input = this.syncState.inputs[this.alias]! const sentChanges = sendChangesToInput( input, changes, @@ -107,12 +113,10 @@ export class CollectionSubscriber< } private subscribeToOrderedChanges( - whereExpression: BasicExpression | undefined + whereExpression: BasicExpression | undefined, + orderByInfo: OrderByOptimizationInfo ) { - const { offset, limit, comparator, dataNeeded, index } = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ]! + const { offset, limit, comparator, dataNeeded, index } = orderByInfo const sendChangesInRange = ( changes: Iterable> @@ -122,7 +126,7 @@ export class CollectionSubscriber< // because they can't affect the topK (and if later we need more data, we will dynamically load more data) const splittedChanges = splitUpdates(changes) let filteredChanges = splittedChanges - if (dataNeeded!() === 0) { + if (dataNeeded && dataNeeded() === 0) { // If the topK is full [..., maxSentValue] then we do not need to send changes > maxSentValue // because they can never make it into the topK. // However, if the topK isn't full yet, we need to also send changes > maxSentValue @@ -158,10 +162,7 @@ export class CollectionSubscriber< // after each iteration of the query pipeline // to ensure that the orderBy operator has enough data to work with loadMoreIfNeeded(subscription: CollectionSubscription) { - const orderByInfo = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ] + const orderByInfo = this.getOrderByInfo() if (!orderByInfo) { // This query has no orderBy operator @@ -192,11 +193,13 @@ export class CollectionSubscriber< changes: Iterable>, subscription: CollectionSubscription ) { - const { comparator } = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ]! - const trackedChanges = this.trackSentValues(changes, comparator) + const orderByInfo = this.getOrderByInfo() + if (!orderByInfo) { + this.sendChangesToPipeline(changes) + return + } + + const trackedChanges = this.trackSentValues(changes, orderByInfo.comparator) this.sendChangesToPipeline( trackedChanges, this.loadMoreIfNeeded.bind(this, subscription) @@ -206,10 +209,11 @@ export class CollectionSubscriber< // Loads the next `n` items from the collection // starting from the biggest item it has sent private loadNextItems(n: number, subscription: CollectionSubscription) { - const { valueExtractorForRawRow } = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ]! + const orderByInfo = this.getOrderByInfo() + if (!orderByInfo) { + return + } + const { valueExtractorForRawRow } = orderByInfo const biggestSentRow = this.biggest const biggestSentValue = biggestSentRow ? valueExtractorForRawRow(biggestSentRow) @@ -221,38 +225,24 @@ export class CollectionSubscriber< }) } - private buildCollectionFilter(): BasicExpression | undefined { + private getWhereClauseForAlias(): BasicExpression | undefined { const collectionWhereClausesCache = this.collectionConfigBuilder.collectionWhereClausesCache if (!collectionWhereClausesCache) { return undefined } + return collectionWhereClausesCache.get(this.alias) + } - const aliases = this.collectionConfigBuilder.getCollectionAliases( - this.collectionId - ) - if (aliases.length === 0) { - return undefined - } - - const convertedClauses: Array> = [] - for (const alias of aliases) { - const clause = collectionWhereClausesCache.get(alias) - if (!clause) { - // At least one alias requires the full collection, so we cannot safely filter - return undefined - } - - const converted = convertToBasicExpression(clause, alias) - if (!converted) { - // Conversion failed which means we cannot use this filter at the subscription level - return undefined - } - - convertedClauses.push(converted) + private getOrderByInfo(): OrderByOptimizationInfo | undefined { + const info = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ] + if (info && info.alias === this.alias) { + return info } - - return combineWithOr(convertedClauses) + return undefined } private *trackSentValues( @@ -300,20 +290,6 @@ function sendChangesToInput( return multiSetArray.length } -function combineWithOr( - expressions: Array> -): BasicExpression | undefined { - if (expressions.length === 0) { - return undefined - } - - if (expressions.length === 1) { - return expressions[0]! - } - - return new Func(`or`, expressions) -} - /** Splits updates into a delete of the old value and an insert of the new value */ function* splitUpdates< T extends object = Record, diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index 3b4dc81e9..22c8ee738 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -3,9 +3,9 @@ import { concat, createLiveQueryCollection, eq, + gt, isUndefined, lt, - gt, not, } from "../../src/query/index.js" import { createCollection } from "../../src/collection/index.js" @@ -1554,7 +1554,7 @@ function createJoinTests(autoIndex: `off` | `eager`): void { ) }) }) - + test(`should handle multiple joins with where clauses to the same source collection`, () => { type Collection1 = { id: number @@ -1604,7 +1604,7 @@ function createJoinTests(autoIndex: `off` | `eager`): void { .where(({ join1 }) => not(gt(join1.other, 20))), }, ({ main, join1 }) => eq(main.value, join1.value), - "left" + `left` ) .join( { @@ -1613,14 +1613,12 @@ function createJoinTests(autoIndex: `off` | `eager`): void { .where(({ join2 }) => not(lt(join2.other, 20))), }, ({ main, join2 }) => eq(main.value, join2.value), - "left" + `left` ), }) const multipleResults = multipleJoinQuery.toArray - console.log(multipleResults) - // This should work - we're filtering for records where join1 has 'a' AND join2 has 'b' // But it might fail due to the sequential WHERE clause issue expect(multipleResults).toHaveLength(1) From 42f81bb56b3937c3b995f6896b044b7bfa6ee4da Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 09:58:04 +0100 Subject: [PATCH 03/14] change to subscription per source alias, rather than per colleciton --- packages/db/src/query/compiler/index.ts | 20 ++- packages/db/src/query/compiler/joins.ts | 33 +++-- .../query/live/collection-config-builder.ts | 117 ++++++++++++------ .../tests/query/compiler/subqueries.test.ts | 29 ++++- 4 files changed, 144 insertions(+), 55 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index c4e9cb6a5..3868d5945 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -40,6 +40,8 @@ export interface CompilationResult { pipeline: ResultStream /** Map of collection aliases to their WHERE clauses for index optimization */ collectionWhereClauses: Map> + /** Map of alias to underlying collection id used during compilation */ + aliasToCollectionId: Record } /** @@ -78,6 +80,10 @@ export function compileQuery( // Create a copy of the inputs map to avoid modifying the original const allInputs = { ...inputs } + // Track alias to collection id relationships discovered during compilation so + // the live layer can subscribe to every alias the optimiser introduces. + const aliasToCollectionId: Record = {} + // Create a map of table aliases to inputs const tables: Record = {} @@ -95,7 +101,8 @@ export function compileQuery( lazyCollections, optimizableOrderByCollections, cache, - queryMapping + queryMapping, + aliasToCollectionId ) tables[mainTableAlias] = mainInput @@ -128,7 +135,8 @@ export function compileQuery( lazyCollections, optimizableOrderByCollections, rawQuery, - compileQuery + compileQuery, + aliasToCollectionId ) } @@ -287,6 +295,7 @@ export function compileQuery( collectionId: mainCollectionId, pipeline: result, collectionWhereClauses, + aliasToCollectionId, } cache.set(rawQuery, compilationResult) @@ -315,6 +324,7 @@ export function compileQuery( collectionId: mainCollectionId, pipeline: result, collectionWhereClauses, + aliasToCollectionId, } cache.set(rawQuery, compilationResult) @@ -333,7 +343,8 @@ function processFrom( lazyCollections: Set, optimizableOrderByCollections: Record, cache: QueryCache, - queryMapping: QueryMapping + queryMapping: QueryMapping, + aliasToCollectionId: Record ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { @@ -341,6 +352,7 @@ function processFrom( if (!input) { throw new CollectionInputNotFoundError(from.alias, from.collection.id) } + aliasToCollectionId[from.alias] = from.collection.id return { alias: from.alias, input, collectionId: from.collection.id } } case `queryRef`: { @@ -360,6 +372,8 @@ function processFrom( queryMapping ) + Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) + // Extract the pipeline from the compilation result const subQueryInput = subQueryResult.pipeline diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index c9c158ae9..49e0b5835 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -63,7 +63,8 @@ export function processJoins( lazyCollections: Set, optimizableOrderByCollections: Record, rawQuery: QueryIR, - onCompileSubquery: CompileQueryFn + onCompileSubquery: CompileQueryFn, + aliasToCollectionId: Record ): NamespacedAndKeyedStream { let resultPipeline = pipeline @@ -83,7 +84,8 @@ export function processJoins( lazyCollections, optimizableOrderByCollections, rawQuery, - onCompileSubquery + onCompileSubquery, + aliasToCollectionId ) } @@ -108,8 +110,11 @@ function processJoin( lazyCollections: Set, optimizableOrderByCollections: Record, rawQuery: QueryIR, - onCompileSubquery: CompileQueryFn + onCompileSubquery: CompileQueryFn, + aliasToCollectionId: Record ): NamespacedAndKeyedStream { + const isCollectionRef = joinClause.from.type === `collectionRef` + // Get the joined table alias and input stream const { alias: joinedTableAlias, @@ -125,11 +130,17 @@ function processJoin( optimizableOrderByCollections, cache, queryMapping, - onCompileSubquery + onCompileSubquery, + aliasToCollectionId ) // Add the joined table to the tables map tables[joinedTableAlias] = joinedInput + if (isCollectionRef) { + // Only direct collection references form new alias bindings. Subquery + // aliases reuse the mapping returned from the recursive compilation above. + aliasToCollectionId[joinedTableAlias] = joinedCollectionId + } const mainCollection = collections[mainTableId] const joinedCollection = collections[joinedCollectionId] @@ -260,9 +271,9 @@ function processJoin( subscriptions[`__collection:${lazyCollection.id}`] if (!lazyCollectionSubscription) { - // The alias was not subscribed (e.g. belongs to a nested subquery), - // so we skip the lazy loading optimization for this join. - return + throw new Error( + `Internal error: subscription for collection is missing in join pipeline. Make sure the live query collection sets the subscription before running the pipeline.` + ) } if (lazyCollectionSubscription.hasLoadedInitialState()) { @@ -401,7 +412,8 @@ function processJoinSource( optimizableOrderByCollections: Record, cache: QueryCache, queryMapping: QueryMapping, - onCompileSubquery: CompileQueryFn + onCompileSubquery: CompileQueryFn, + aliasToCollectionId: Record ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { @@ -409,6 +421,7 @@ function processJoinSource( if (!input) { throw new CollectionInputNotFoundError(from.alias, from.collection.id) } + aliasToCollectionId[from.alias] = from.collection.id return { alias: from.alias, input, collectionId: from.collection.id } } case `queryRef`: { @@ -428,6 +441,10 @@ function processJoinSource( queryMapping ) + // Pull the nested alias map up so the caller can subscribe to those aliases + // and keep the current alias pointing at the subquery's collection. + Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) + // Extract the pipeline from the compilation result const subQueryInput = subQueryResult.pipeline diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index c19496118..2efec87cf 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -32,8 +32,9 @@ export class CollectionConfigBuilder< private readonly id: string readonly query: QueryIR private readonly collections: Record> - private readonly aliasToCollectionId: Record private readonly collectionByAlias: Record> + // Populated during compilation to include optimizer-generated aliases + private compiledAliasToCollectionId: Record = {} // WeakMap to store the keys of the results // so that we can retrieve them in the getKey function @@ -72,13 +73,11 @@ export class CollectionConfigBuilder< this.collections = extractCollectionsFromQuery(this.query) const collectionAliasesById = extractCollectionAliases(this.query) - this.aliasToCollectionId = {} this.collectionByAlias = {} for (const [collectionId, aliases] of collectionAliasesById.entries()) { const collection = this.collections[collectionId] if (!collection) continue for (const alias of aliases) { - this.aliasToCollectionId[alias] = collectionId this.collectionByAlias[alias] = collection } } @@ -111,11 +110,15 @@ export class CollectionConfigBuilder< } getCollectionIdForAlias(alias: string): string { - const collectionId = this.aliasToCollectionId[alias] - if (!collectionId) { - throw new Error(`Unknown collection alias "${alias}"`) + const compiled = this.compiledAliasToCollectionId[alias] + if (compiled) { + return compiled } - return collectionId + const collection = this.collectionByAlias[alias] + if (collection) { + return collection.id + } + throw new Error(`Unknown collection alias "${alias}"`) } // The callback function is called after the graph has run. @@ -224,11 +227,8 @@ export class CollectionConfigBuilder< ]) ) - // Compile the query and get both pipeline and collection WHERE clauses - const { - pipeline: pipelineCache, - collectionWhereClauses: collectionWhereClausesCache, - } = compileQuery( + // Compile the query and capture alias metadata produced during optimisation + let compilation = compileQuery( this.query, this.inputsCache as Record, this.collections, @@ -238,8 +238,37 @@ export class CollectionConfigBuilder< this.optimizableOrderByCollections ) - this.pipelineCache = pipelineCache - this.collectionWhereClausesCache = collectionWhereClausesCache + this.pipelineCache = compilation.pipeline + this.collectionWhereClausesCache = compilation.collectionWhereClauses + this.compiledAliasToCollectionId = compilation.aliasToCollectionId + // Optimized queries can introduce aliases beyond those declared on the + // builder. If that happens, provision inputs for the missing aliases and + // recompile so the pipeline is fully wired before execution. + const missingAliases = Object.keys(this.compiledAliasToCollectionId).filter( + (alias) => !Object.hasOwn(this.inputsCache!, alias) + ) + + if (missingAliases.length > 0) { + for (const alias of missingAliases) { + this.inputsCache[alias] = this.graphCache.newInput() + } + + compilation = compileQuery( + this.query, + this.inputsCache as Record, + this.collections, + this.subscriptions, + this.lazyCollectionsCallbacks, + this.lazyCollections, + this.optimizableOrderByCollections, + new WeakMap(), + new WeakMap() + ) + + this.pipelineCache = compilation.pipeline + this.collectionWhereClausesCache = compilation.collectionWhereClauses + this.compiledAliasToCollectionId = compilation.aliasToCollectionId + } } private maybeCompileBasePipeline() { @@ -355,31 +384,41 @@ export class CollectionConfigBuilder< config: Parameters[`sync`]>[0], syncState: FullSyncState ) { - const loaders = Object.entries(this.collectionByAlias).map( - ([alias, collection]) => { - const collectionId = this.aliasToCollectionId[alias]! - const collectionSubscriber = new CollectionSubscriber( - alias, - collectionId, - collection, - config, - syncState, - this - ) - - const subscription = collectionSubscriber.subscribe() - this.subscriptions[alias] = subscription - const collectionKey = `__collection:${collectionId}` - this.subscriptions[collectionKey] = subscription - - const loadMore = collectionSubscriber.loadMoreIfNeeded.bind( - collectionSubscriber, - subscription - ) - - return loadMore - } - ) + const compiledAliases = Object.entries(this.compiledAliasToCollectionId) + if (compiledAliases.length === 0) { + throw new Error( + `Compiler returned no alias metadata for query '${this.id}'. This should not happen; please report.` + ) + } + + // Subscribe to each alias the compiler reported. + const aliasEntries = compiledAliases + + const loaders = aliasEntries.map(([alias, collectionId]) => { + const collection = + this.collectionByAlias[alias] ?? this.collections[collectionId]! + + const collectionSubscriber = new CollectionSubscriber( + alias, + collectionId, + collection, + config, + syncState, + this + ) + + const subscription = collectionSubscriber.subscribe() + this.subscriptions[alias] = subscription + const collectionKey = `__collection:${collectionId}` + this.subscriptions[collectionKey] = subscription + + const loadMore = collectionSubscriber.loadMoreIfNeeded.bind( + collectionSubscriber, + subscription + ) + + return loadMore + }) const loadMoreDataCallback = () => { loaders.map((loader) => loader()) diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index 9903f7a6c..a33df324d 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -4,6 +4,7 @@ import { Query, getQueryIR } from "../../../src/query/builder/index.js" import { compileQuery } from "../../../src/query/compiler/index.js" import { CollectionImpl } from "../../../src/collection/index.js" import { avg, count, eq } from "../../../src/query/builder/functions.js" +import type { CollectionSubscription } from "../../../src/collection/subscription.js" // Test schema types interface Issue { @@ -272,27 +273,45 @@ describe(`Query2 Subqueries`, () => { const usersSubscription = usersCollection.subscribeChanges(() => {}) const issuesSubscription = issuesCollection.subscribeChanges(() => {}) + const subscriptions: Record = { + [usersCollection.id]: usersSubscription, + [issuesCollection.id]: issuesSubscription, + } // Compile and execute the query const graph = new D2() const issuesInput = createIssueInput(graph) const usersInput = createUserInput(graph) const lazyCollections = new Set() - const { pipeline } = compileQuery( + const compilation = compileQuery( builtQuery, { issues: issuesInput, users: usersInput, }, { issues: issuesCollection, users: usersCollection }, - { - [usersCollection.id]: usersSubscription, - [issuesCollection.id]: issuesSubscription, - }, + subscriptions, { issues: dummyCallbacks, users: dummyCallbacks }, lazyCollections, {} ) + const { pipeline } = compilation + + for (const [alias, collectionId] of Object.entries( + compilation.aliasToCollectionId + )) { + if (!subscriptions[alias]) { + subscriptions[alias] = + collectionId === usersCollection.id + ? usersSubscription + : issuesSubscription + } + + const collectionKey = `__collection:${collectionId}` + if (!subscriptions[collectionKey]) { + subscriptions[collectionKey] = subscriptions[alias] + } + } // Since we're doing a left join, the collection on the right should be handled lazily expect(lazyCollections).contains(usersCollection.id) From 85f51e99e9260b893622921e7045743dfb07ff10 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 10:32:25 +0100 Subject: [PATCH 04/14] address gpt5 review --- packages/db/src/errors.ts | 11 ++- packages/db/src/query/builder/types.ts | 2 +- packages/db/src/query/compiler/index.ts | 20 +++--- packages/db/src/query/compiler/joins.ts | 9 ++- .../query/live/collection-config-builder.ts | 19 +++-- .../src/query/live/collection-subscriber.ts | 8 +-- packages/db/src/query/optimizer.ts | 26 +++---- .../tests/query/compiler/subqueries.test.ts | 71 +++++++++++++++++++ packages/db/tests/query/order-by.test.ts | 52 ++++++++++++++ 9 files changed, 183 insertions(+), 35 deletions(-) diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 431df1160..4ad8d673f 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -350,11 +350,18 @@ export class LimitOffsetRequireOrderByError extends QueryCompilationError { } export class CollectionInputNotFoundError extends QueryCompilationError { - constructor(alias: string, collectionId?: string) { + constructor( + alias: string, + collectionId?: string, + availableKeys?: Array + ) { const details = collectionId ? `alias "${alias}" (collection "${collectionId}")` : `collection "${alias}"` - super(`Input for ${details} not found in inputs map`) + const availableKeysMsg = availableKeys?.length + ? `. Available keys: ${availableKeys.join(`, `)}` + : `` + super(`Input for ${details} not found in inputs map${availableKeysMsg}`) } } diff --git a/packages/db/src/query/builder/types.ts b/packages/db/src/query/builder/types.ts index bef1c2bed..972301e93 100644 --- a/packages/db/src/query/builder/types.ts +++ b/packages/db/src/query/builder/types.ts @@ -104,7 +104,7 @@ export type SchemaFromSource = Prettify<{ * GetAliases - Extracts all table aliases available in a query context * * Simple utility type that returns the keys of the schema, representing - * all table/collection aliases that can be referenced in the current query. + * all table/source aliases that can be referenced in the current query. */ export type GetAliases = keyof TContext[`schema`] diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 3868d5945..864089156 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -38,8 +38,8 @@ export interface CompilationResult { collectionId: string /** The compiled query pipeline */ pipeline: ResultStream - /** Map of collection aliases to their WHERE clauses for index optimization */ - collectionWhereClauses: Map> + /** Map of source aliases to their WHERE clauses for index optimization */ + sourceWhereClauses: Map> /** Map of alias to underlying collection id used during compilation */ aliasToCollectionId: Record } @@ -70,8 +70,7 @@ export function compileQuery( } // Optimize the query before compilation - const { optimizedQuery: query, collectionWhereClauses } = - optimizeQuery(rawQuery) + const { optimizedQuery: query, sourceWhereClauses } = optimizeQuery(rawQuery) // Create mapping from optimized query to original for caching queryMapping.set(query, rawQuery) @@ -81,10 +80,11 @@ export function compileQuery( const allInputs = { ...inputs } // Track alias to collection id relationships discovered during compilation so - // the live layer can subscribe to every alias the optimiser introduces. + // the live layer can subscribe to every alias the optimizer introduces. const aliasToCollectionId: Record = {} // Create a map of table aliases to inputs + // Note: alias keys take precedence over collection keys for input resolution const tables: Record = {} // Process the FROM clause to get the main table @@ -294,7 +294,7 @@ export function compileQuery( const compilationResult = { collectionId: mainCollectionId, pipeline: result, - collectionWhereClauses, + sourceWhereClauses, aliasToCollectionId, } cache.set(rawQuery, compilationResult) @@ -323,7 +323,7 @@ export function compileQuery( const compilationResult = { collectionId: mainCollectionId, pipeline: result, - collectionWhereClauses, + sourceWhereClauses, aliasToCollectionId, } cache.set(rawQuery, compilationResult) @@ -350,7 +350,11 @@ function processFrom( case `collectionRef`: { const input = allInputs[from.alias] ?? allInputs[from.collection.id] if (!input) { - throw new CollectionInputNotFoundError(from.alias, from.collection.id) + throw new CollectionInputNotFoundError( + from.alias, + from.collection.id, + Object.keys(allInputs) + ) } aliasToCollectionId[from.alias] = from.collection.id return { alias: from.alias, input, collectionId: from.collection.id } diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 49e0b5835..193fc7b3d 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -264,6 +264,9 @@ function processJoin( [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( tap((data) => { + // For outer joins (LEFT/RIGHT), the driving side determines which alias's + // subscription we consult for lazy loading. The main table drives LEFT joins, + // joined table drives RIGHT joins. const lazyAliasCandidate = activeCollection === `main` ? joinedTableAlias : mainTableAlias const lazyCollectionSubscription = @@ -419,7 +422,11 @@ function processJoinSource( case `collectionRef`: { const input = allInputs[from.alias] ?? allInputs[from.collection.id] if (!input) { - throw new CollectionInputNotFoundError(from.alias, from.collection.id) + throw new CollectionInputNotFoundError( + from.alias, + from.collection.id, + Object.keys(allInputs) + ) } aliasToCollectionId[from.alias] = from.collection.id return { alias: from.alias, input, collectionId: from.collection.id } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 2efec87cf..c7c01f749 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -50,11 +50,11 @@ export class CollectionConfigBuilder< private graphCache: D2 | undefined private inputsCache: Record> | undefined private pipelineCache: ResultStream | undefined - public collectionWhereClausesCache: + public sourceWhereClausesCache: | Map> | undefined - // Map of collection alias to subscription + // Map of source alias to subscription readonly subscriptions: Record = {} // Map of collection IDs to functions that load keys for that lazy collection lazyCollectionsCallbacks: Record = {} @@ -118,7 +118,7 @@ export class CollectionConfigBuilder< if (collection) { return collection.id } - throw new Error(`Unknown collection alias "${alias}"`) + throw new Error(`Unknown source alias "${alias}"`) } // The callback function is called after the graph has run. @@ -209,12 +209,19 @@ export class CollectionConfigBuilder< this.graphCache = undefined this.inputsCache = undefined this.pipelineCache = undefined - this.collectionWhereClausesCache = undefined + this.sourceWhereClausesCache = undefined // Reset lazy collection state this.lazyCollections.clear() this.optimizableOrderByCollections = {} this.lazyCollectionsCallbacks = {} + + // Clear subscription references to prevent memory leaks + // Note: Individual subscriptions are already unsubscribed via unsubscribeCallbacks + Object.keys(this.subscriptions).forEach( + (key) => delete this.subscriptions[key] + ) + this.compiledAliasToCollectionId = {} } } @@ -239,7 +246,7 @@ export class CollectionConfigBuilder< ) this.pipelineCache = compilation.pipeline - this.collectionWhereClausesCache = compilation.collectionWhereClauses + this.sourceWhereClausesCache = compilation.sourceWhereClauses this.compiledAliasToCollectionId = compilation.aliasToCollectionId // Optimized queries can introduce aliases beyond those declared on the // builder. If that happens, provision inputs for the missing aliases and @@ -266,7 +273,7 @@ export class CollectionConfigBuilder< ) this.pipelineCache = compilation.pipeline - this.collectionWhereClausesCache = compilation.collectionWhereClauses + this.sourceWhereClausesCache = compilation.sourceWhereClauses this.compiledAliasToCollectionId = compilation.aliasToCollectionId } } diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index ab85efc3d..3e9460bad 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -226,12 +226,12 @@ export class CollectionSubscriber< } private getWhereClauseForAlias(): BasicExpression | undefined { - const collectionWhereClausesCache = - this.collectionConfigBuilder.collectionWhereClausesCache - if (!collectionWhereClausesCache) { + const sourceWhereClausesCache = + this.collectionConfigBuilder.sourceWhereClausesCache + if (!sourceWhereClausesCache) { return undefined } - return collectionWhereClausesCache.get(this.alias) + return sourceWhereClausesCache.get(this.alias) } private getOrderByInfo(): OrderByOptimizationInfo | undefined { diff --git a/packages/db/src/query/optimizer.ts b/packages/db/src/query/optimizer.ts index 55b3c083b..d024093a1 100644 --- a/packages/db/src/query/optimizer.ts +++ b/packages/db/src/query/optimizer.ts @@ -162,8 +162,8 @@ export interface GroupedWhereClauses { export interface OptimizationResult { /** The optimized query with WHERE clauses potentially moved to subqueries */ optimizedQuery: QueryIR - /** Map of collection aliases to their extracted WHERE clauses for index optimization */ - collectionWhereClauses: Map> + /** Map of source aliases to their extracted WHERE clauses for index optimization */ + sourceWhereClauses: Map> } /** @@ -184,14 +184,14 @@ export interface OptimizationResult { * where: [eq(u.dept_id, 1), gt(p.views, 100)] * } * - * const { optimizedQuery, collectionWhereClauses } = optimizeQuery(originalQuery) + * const { optimizedQuery, sourceWhereClauses } = optimizeQuery(originalQuery) * // Result: Single-source clauses moved to deepest possible subqueries - * // collectionWhereClauses: Map { 'u' => eq(u.dept_id, 1), 'p' => gt(p.views, 100) } + * // sourceWhereClauses: Map { 'u' => eq(u.dept_id, 1), 'p' => gt(p.views, 100) } * ``` */ export function optimizeQuery(query: QueryIR): OptimizationResult { - // First, extract collection WHERE clauses before optimization - const collectionWhereClauses = extractCollectionWhereClauses(query) + // First, extract source WHERE clauses before optimization + const sourceWhereClauses = extractSourceWhereClauses(query) // Apply multi-level predicate pushdown with iterative convergence let optimized = query @@ -214,7 +214,7 @@ export function optimizeQuery(query: QueryIR): OptimizationResult { return { optimizedQuery: cleaned, - collectionWhereClauses, + sourceWhereClauses, } } @@ -224,16 +224,16 @@ export function optimizeQuery(query: QueryIR): OptimizationResult { * to specific collections, but only for simple queries without joins. * * @param query - The original QueryIR to analyze - * @returns Map of collection aliases to their WHERE clauses + * @returns Map of source aliases to their WHERE clauses */ -function extractCollectionWhereClauses( +function extractSourceWhereClauses( query: QueryIR ): Map> { - const collectionWhereClauses = new Map>() + const sourceWhereClauses = new Map>() // Only analyze queries that have WHERE clauses if (!query.where || query.where.length === 0) { - return collectionWhereClauses + return sourceWhereClauses } // Split all AND clauses at the root level for granular analysis @@ -254,12 +254,12 @@ function extractCollectionWhereClauses( if (isCollectionReference(query, sourceAlias)) { // Check if the WHERE clause can be converted to collection-compatible format if (isConvertibleToCollectionFilter(whereClause)) { - collectionWhereClauses.set(sourceAlias, whereClause) + sourceWhereClauses.set(sourceAlias, whereClause) } } } - return collectionWhereClauses + return sourceWhereClauses } /** diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index a33df324d..c96641164 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -352,6 +352,77 @@ describe(`Query2 Subqueries`, () => { }) describe(`Complex composable queries`, () => { + it(`exports alias metadata from nested subqueries`, () => { + // Create a nested subquery structure to test alias metadata propagation + const innerQuery = new Query() + .from({ user: usersCollection }) + .where(({ user }) => eq(user.status, `active`)) + + const middleQuery = new Query() + .from({ activeUser: innerQuery }) + .select(({ activeUser }) => ({ + id: activeUser.id, + name: activeUser.name, + })) + + const outerQuery = new Query() + .from({ issue: issuesCollection }) + .join({ userInfo: middleQuery }, ({ issue, userInfo }) => + eq(issue.userId, userInfo.id) + ) + .select(({ issue, userInfo }) => ({ + issueId: issue.id, + issueTitle: issue.title, + userName: userInfo?.name, + })) + + const builtQuery = getQueryIR(outerQuery) + + const usersSubscription = usersCollection.subscribeChanges(() => {}) + const issuesSubscription = issuesCollection.subscribeChanges(() => {}) + const subscriptions: Record = { + [usersCollection.id]: usersSubscription, + [issuesCollection.id]: issuesSubscription, + } + + const dummyCallbacks = { + loadKeys: (_: any) => {}, + loadInitialState: () => {}, + } + + // Compile the query + const graph = new D2() + const issuesInput = createIssueInput(graph) + const usersInput = createUserInput(graph) + const lazyCollections = new Set() + const compilation = compileQuery( + builtQuery, + { + issues: issuesInput, + users: usersInput, + }, + { issues: issuesCollection, users: usersCollection }, + subscriptions, + { issues: dummyCallbacks, users: dummyCallbacks }, + lazyCollections, + {} + ) + + // Verify that alias metadata includes aliases from the query + const aliasToCollectionId = compilation.aliasToCollectionId + + // Should include the main table alias (note: alias is 'issue', not 'issues') + expect(aliasToCollectionId.issue).toBe(issuesCollection.id) + + // Should include the user alias from the subquery + expect(aliasToCollectionId.user).toBe(usersCollection.id) + + // Verify that the compiler correctly maps aliases to collection IDs + expect(Object.keys(aliasToCollectionId)).toHaveLength(2) + expect(aliasToCollectionId.issue).toBe(issuesCollection.id) + expect(aliasToCollectionId.user).toBe(usersCollection.id) + }) + it(`executes simple aggregate subquery`, () => { // Create a base query that filters issues for project 1 const baseQuery = new Query() diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index b26603c6a..20bf0353e 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -1467,6 +1467,58 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { } ) + itWhenAutoIndex( + `optimizes orderBy with alias paths in joins`, + async () => { + // Patch getConfig to expose the builder on the returned config for test access + const { CollectionConfigBuilder } = await import( + `../../src/query/live/collection-config-builder.js` + ) + const originalGetConfig = CollectionConfigBuilder.prototype.getConfig + + CollectionConfigBuilder.prototype.getConfig = function (this: any) { + const cfg = originalGetConfig.call(this) + ;(cfg as any).__builder = this + return cfg + } + + try { + const collection = createLiveQueryCollection((q) => + q + .from({ employees: employeesCollection }) + .join( + { departments: departmentsCollection }, + ({ employees, departments }) => + eq(employees.department_id, departments.id) + ) + .orderBy(({ departments }) => departments.name, `asc`) + .limit(5) + .select(({ employees, departments }) => ({ + employeeId: employees.id, + employeeName: employees.name, + departmentName: departments.name, + })) + ) + + await collection.preload() + + const builder = (collection as any).config.__builder + expect(builder).toBeTruthy() + + // Verify that the order-by optimization is scoped to the departments alias + const orderByInfo = Object.values( + builder.optimizableOrderByCollections + )[0] as any + expect(orderByInfo).toBeDefined() + expect(orderByInfo.alias).toBe(`departments`) + expect(orderByInfo.offset).toBe(0) + expect(orderByInfo.limit).toBe(5) + } finally { + CollectionConfigBuilder.prototype.getConfig = originalGetConfig + } + } + ) + itWhenAutoIndex( `optimizes single-column orderBy when passed as array with single element`, async () => { From d9b2014628d2407b974bb2e0c653c2e4ed87237e Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 11:05:58 +0100 Subject: [PATCH 05/14] wip --- packages/db/src/query/live/collection-config-builder.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index c7c01f749..cf38f43d7 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -260,6 +260,9 @@ export class CollectionConfigBuilder< this.inputsCache[alias] = this.graphCache.newInput() } + // Note: Using fresh WeakMaps here loses cached subquery results, but ensures + // clean compilation with the new alias inputs. For complex queries with many + // subqueries, this could be optimized to preserve the cache. compilation = compileQuery( this.query, this.inputsCache as Record, @@ -416,6 +419,8 @@ export class CollectionConfigBuilder< const subscription = collectionSubscriber.subscribe() this.subscriptions[alias] = subscription + // Also store under collection key for backward compatibility with join logic + // that may reference collection-level subscriptions const collectionKey = `__collection:${collectionId}` this.subscriptions[collectionKey] = subscription From 9a794a3107e639edaf385bb24b20facf94fd20b2 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 12:49:52 +0100 Subject: [PATCH 06/14] rename stuff --- packages/db/src/query/compiler/index.ts | 35 ++-- packages/db/src/query/compiler/joins.ts | 155 +++++++++--------- .../query/live/collection-config-builder.ts | 30 ++-- .../src/query/live/collection-subscriber.ts | 7 +- .../tests/query/compiler/subqueries.test.ts | 15 +- .../query/compiler/subquery-caching.test.ts | 47 ++++-- packages/db/tests/query/join.test.ts | 82 +++++++++ 7 files changed, 236 insertions(+), 135 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 864089156..3d02a5afb 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -58,7 +58,7 @@ export function compileQuery( collections: Record>, subscriptions: Record, callbacks: Record, - lazyCollections: Set, + lazySources: Set, optimizableOrderByCollections: Record, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap() @@ -83,13 +83,16 @@ export function compileQuery( // the live layer can subscribe to every alias the optimizer introduces. const aliasToCollectionId: Record = {} - // Create a map of table aliases to inputs - // Note: alias keys take precedence over collection keys for input resolution - const tables: Record = {} + // Create a map of source aliases to input streams. + // Note: During input resolution, alias keys take precedence over collection ID keys. + // This enables per-alias subscriptions: when looking up an input stream, we first check + // for `inputs[alias]` before falling back to `inputs[collectionId]`. This allows different + // aliases of the same collection (e.g., self-joins) to have independent filtered streams. + const sources: Record = {} - // Process the FROM clause to get the main table + // Process the FROM clause to get the main source const { - alias: mainTableAlias, + alias: mainSource, input: mainInput, collectionId: mainCollectionId, } = processFrom( @@ -98,19 +101,19 @@ export function compileQuery( collections, subscriptions, callbacks, - lazyCollections, + lazySources, optimizableOrderByCollections, cache, queryMapping, aliasToCollectionId ) - tables[mainTableAlias] = mainInput + sources[mainSource] = mainInput - // Prepare the initial pipeline with the main table wrapped in its alias + // Prepare the initial pipeline with the main source wrapped in its alias let pipeline: NamespacedAndKeyedStream = mainInput.pipe( map(([key, row]) => { // Initialize the record with a nested structure - const ret = [key, { [mainTableAlias]: row }] as [ + const ret = [key, { [mainSource]: row }] as [ string, Record, ] @@ -123,16 +126,16 @@ export function compileQuery( pipeline = processJoins( pipeline, query.join, - tables, + sources, mainCollectionId, - mainTableAlias, + mainSource, allInputs, cache, queryMapping, collections, subscriptions, callbacks, - lazyCollections, + lazySources, optimizableOrderByCollections, rawQuery, compileQuery, @@ -193,7 +196,7 @@ export function compileQuery( map(([key, namespacedRow]) => { const selectResults = !query.join && !query.groupBy - ? namespacedRow[mainTableAlias] + ? namespacedRow[mainSource] : namespacedRow return [ @@ -340,7 +343,7 @@ function processFrom( collections: Record, subscriptions: Record, callbacks: Record, - lazyCollections: Set, + lazySources: Set, optimizableOrderByCollections: Record, cache: QueryCache, queryMapping: QueryMapping, @@ -370,7 +373,7 @@ function processFrom( collections, subscriptions, callbacks, - lazyCollections, + lazySources, optimizableOrderByCollections, cache, queryMapping diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 193fc7b3d..c156ebf76 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -51,16 +51,16 @@ export type LazyCollectionCallbacks = { export function processJoins( pipeline: NamespacedAndKeyedStream, joinClauses: Array, - tables: Record, - mainTableId: string, - mainTableAlias: string, + sources: Record, + mainCollectionId: string, + mainSource: string, allInputs: Record, cache: QueryCache, queryMapping: QueryMapping, collections: Record, subscriptions: Record, callbacks: Record, - lazyCollections: Set, + lazySources: Set, optimizableOrderByCollections: Record, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, @@ -72,16 +72,16 @@ export function processJoins( resultPipeline = processJoin( resultPipeline, joinClause, - tables, - mainTableId, - mainTableAlias, + sources, + mainCollectionId, + mainSource, allInputs, cache, queryMapping, collections, subscriptions, callbacks, - lazyCollections, + lazySources, optimizableOrderByCollections, rawQuery, onCompileSubquery, @@ -98,16 +98,16 @@ export function processJoins( function processJoin( pipeline: NamespacedAndKeyedStream, joinClause: JoinClause, - tables: Record, - mainTableId: string, - mainTableAlias: string, + sources: Record, + mainCollectionId: string, + mainSource: string, allInputs: Record, cache: QueryCache, queryMapping: QueryMapping, collections: Record, subscriptions: Record, callbacks: Record, - lazyCollections: Set, + lazySources: Set, optimizableOrderByCollections: Record, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, @@ -115,9 +115,9 @@ function processJoin( ): NamespacedAndKeyedStream { const isCollectionRef = joinClause.from.type === `collectionRef` - // Get the joined table alias and input stream + // Get the joined source alias and input stream const { - alias: joinedTableAlias, + alias: joinedSource, input: joinedInput, collectionId: joinedCollectionId, } = processJoinSource( @@ -126,7 +126,7 @@ function processJoin( collections, subscriptions, callbacks, - lazyCollections, + lazySources, optimizableOrderByCollections, cache, queryMapping, @@ -134,38 +134,38 @@ function processJoin( aliasToCollectionId ) - // Add the joined table to the tables map - tables[joinedTableAlias] = joinedInput + // Add the joined source to the sources map + sources[joinedSource] = joinedInput if (isCollectionRef) { // Only direct collection references form new alias bindings. Subquery // aliases reuse the mapping returned from the recursive compilation above. - aliasToCollectionId[joinedTableAlias] = joinedCollectionId + aliasToCollectionId[joinedSource] = joinedCollectionId } - const mainCollection = collections[mainTableId] + const mainCollection = collections[mainCollectionId] const joinedCollection = collections[joinedCollectionId] if (!mainCollection) { - throw new JoinCollectionNotFoundError(mainTableId) + throw new JoinCollectionNotFoundError(mainCollectionId) } if (!joinedCollection) { throw new JoinCollectionNotFoundError(joinedCollectionId) } - const { activeCollection, lazyCollection } = getActiveAndLazyCollections( + const { activeSource, lazySource } = getActiveAndLazySources( joinClause.type, mainCollection, joinedCollection ) - // Analyze which table each expression refers to and swap if necessary - const availableTableAliases = Object.keys(tables) + // Analyze which source each expression refers to and swap if necessary + const availableSources = Object.keys(sources) const { mainExpr, joinedExpr } = analyzeJoinExpressions( joinClause.left, joinClause.right, - availableTableAliases, - joinedTableAlias + availableSources, + joinedSource ) // Pre-compile the join expressions @@ -190,7 +190,7 @@ function processJoin( let joinedPipeline = joinedInput.pipe( map(([currentKey, row]) => { // Wrap the row in a namespaced structure - const namespacedRow: NamespacedRow = { [joinedTableAlias]: row } + const namespacedRow: NamespacedRow = { [joinedSource]: row } // Extract the join key from the joined table expression const joinedKey = compiledJoinedExpr(namespacedRow) @@ -208,13 +208,12 @@ function processJoin( throw new UnsupportedJoinTypeError(joinClause.type) } - if (activeCollection) { + if (activeSource) { // If the lazy collection comes from a subquery that has a limit and/or an offset clause // then we need to deoptimize the join because we don't know which rows are in the result set // since we simply lookup matching keys in the index but the index contains all rows // (not just the ones that pass the limit and offset clauses) - const lazyFrom = - activeCollection === `main` ? joinClause.from : rawQuery.from + const lazyFrom = activeSource === `main` ? joinClause.from : rawQuery.from const limitedSubquery = lazyFrom.type === `queryRef` && (lazyFrom.query.limit || lazyFrom.query.offset) @@ -230,24 +229,25 @@ function processJoin( // based on the value of the joinKey and by looking up // matching rows in the index of the lazy collection - // Mark the lazy collection as lazy + // Mark the lazy source alias as lazy // this Set is passed by the liveQueryCollection to the compiler // such that the liveQueryCollection can check it after compilation - // to know which collections are lazy collections - lazyCollections.add(lazyCollection.id) + // to know which source aliases should load data lazily (not initially) + const lazyAlias = activeSource === `main` ? joinedSource : mainSource + lazySources.add(lazyAlias) const activePipeline = - activeCollection === `main` ? mainPipeline : joinedPipeline + activeSource === `main` ? mainPipeline : joinedPipeline - const lazyCollectionJoinExpr = - activeCollection === `main` + const lazySourceJoinExpr = + activeSource === `main` ? (joinedExpr as PropRef) : (mainExpr as PropRef) const followRefResult = followRef( rawQuery, - lazyCollectionJoinExpr, - lazyCollection + lazySourceJoinExpr, + lazySource )! const followRefCollection = followRefResult.collection @@ -268,37 +268,48 @@ function processJoin( // subscription we consult for lazy loading. The main table drives LEFT joins, // joined table drives RIGHT joins. const lazyAliasCandidate = - activeCollection === `main` ? joinedTableAlias : mainTableAlias - const lazyCollectionSubscription = - subscriptions[lazyAliasCandidate] ?? - subscriptions[`__collection:${lazyCollection.id}`] + activeSource === `main` ? joinedSource : mainSource + + // The alias candidate might be a subquery alias without a direct subscription. + // In that case, find an alias from aliasToCollectionId that maps to the lazy collection. + let lazySourceSubscription = subscriptions[lazyAliasCandidate] + if (!lazySourceSubscription) { + // Search for any alias that maps to the lazy collection ID + const matchingAlias = Object.entries(aliasToCollectionId).find( + ([_alias, collId]) => collId === lazySource.id + )?.[0] + + if (matchingAlias) { + lazySourceSubscription = subscriptions[matchingAlias] + } + } - if (!lazyCollectionSubscription) { + if (!lazySourceSubscription) { throw new Error( - `Internal error: subscription for collection is missing in join pipeline. Make sure the live query collection sets the subscription before running the pipeline.` + `Internal error: subscription for alias '${lazyAliasCandidate}' (collection '${lazySource.id}') is missing in join pipeline. Available aliases: ${Object.keys(subscriptions).join(`, `)}. This indicates a bug in alias tracking.` ) } - if (lazyCollectionSubscription.hasLoadedInitialState()) { + if (lazySourceSubscription.hasLoadedInitialState()) { // Entire state was already loaded because we deoptimized the join return } const joinKeys = data.getInner().map(([[joinKey]]) => joinKey) const lazyJoinRef = new PropRef(followRefResult.path) - const loaded = lazyCollectionSubscription.requestSnapshot({ + const loaded = lazySourceSubscription.requestSnapshot({ where: inArray(lazyJoinRef, joinKeys), optimizedOnly: true, }) if (!loaded) { // Snapshot wasn't sent because it could not be loaded from the indexes - lazyCollectionSubscription.requestSnapshot() + lazySourceSubscription.requestSnapshot() } }) ) - if (activeCollection === `main`) { + if (activeSource === `main`) { mainPipeline = activePipelineWithLoading } else { joinedPipeline = activePipelineWithLoading @@ -321,11 +332,11 @@ function analyzeJoinExpressions( left: BasicExpression, right: BasicExpression, allAvailableTableAliases: Array, - joinedTableAlias: string + joinedSource: string ): { mainExpr: BasicExpression; joinedExpr: BasicExpression } { // Filter out the joined table alias from the available table aliases - const availableTableAliases = allAvailableTableAliases.filter( - (alias) => alias !== joinedTableAlias + const availableSources = allAvailableTableAliases.filter( + (alias) => alias !== joinedSource ) const leftTableAlias = getTableAliasFromExpression(left) @@ -334,17 +345,17 @@ function analyzeJoinExpressions( // If left expression refers to an available table and right refers to joined table, keep as is if ( leftTableAlias && - availableTableAliases.includes(leftTableAlias) && - rightTableAlias === joinedTableAlias + availableSources.includes(leftTableAlias) && + rightTableAlias === joinedSource ) { return { mainExpr: left, joinedExpr: right } } // If left expression refers to joined table and right refers to an available table, swap them if ( - leftTableAlias === joinedTableAlias && + leftTableAlias === joinedSource && rightTableAlias && - availableTableAliases.includes(rightTableAlias) + availableSources.includes(rightTableAlias) ) { return { mainExpr: right, joinedExpr: left } } @@ -363,13 +374,13 @@ function analyzeJoinExpressions( // Left side must refer to an available table // This cannot happen with the query builder as there is no way to build a ref // to an unavailable table, but just in case, but could happen with the IR - if (!availableTableAliases.includes(leftTableAlias)) { + if (!availableSources.includes(leftTableAlias)) { throw new InvalidJoinConditionLeftTableError(leftTableAlias) } // Right side must refer to the joined table - if (rightTableAlias !== joinedTableAlias) { - throw new InvalidJoinConditionRightTableError(joinedTableAlias) + if (rightTableAlias !== joinedSource) { + throw new InvalidJoinConditionRightTableError(joinedSource) } // This should not be reachable given the logic above, but just in case @@ -411,7 +422,7 @@ function processJoinSource( collections: Record, subscriptions: Record, callbacks: Record, - lazyCollections: Set, + lazySources: Set, optimizableOrderByCollections: Record, cache: QueryCache, queryMapping: QueryMapping, @@ -442,7 +453,7 @@ function processJoinSource( collections, subscriptions, callbacks, - lazyCollections, + lazySources, optimizableOrderByCollections, cache, queryMapping @@ -552,34 +563,28 @@ function processJoinResults(joinType: string) { * @param rightCollection - The right collection * @returns The active and lazy collections. They are undefined if we need to loop over both collections (i.e. both are active) */ -function getActiveAndLazyCollections( +function getActiveAndLazySources( joinType: JoinClause[`type`], leftCollection: Collection, rightCollection: Collection ): - | { activeCollection: `main` | `joined`; lazyCollection: Collection } - | { activeCollection: undefined; lazyCollection: undefined } { - if (leftCollection.id === rightCollection.id) { - // We can't apply this optimization if there's only one collection - // because `liveQueryCollection` will detect that the collection is lazy - // and treat it lazily (because the collection is shared) - // and thus it will not load any keys because both sides of the join - // will be handled lazily - return { activeCollection: undefined, lazyCollection: undefined } - } + | { activeSource: `main` | `joined`; lazySource: Collection } + | { activeSource: undefined; lazySource: undefined } { + // Self-joins can now be optimized since we track lazy loading by source alias + // rather than collection ID. Each alias has its own subscription and lazy state. switch (joinType) { case `left`: - return { activeCollection: `main`, lazyCollection: rightCollection } + return { activeSource: `main`, lazySource: rightCollection } case `right`: - return { activeCollection: `joined`, lazyCollection: leftCollection } + return { activeSource: `joined`, lazySource: leftCollection } case `inner`: // The smallest collection should be the active collection // and the biggest collection should be lazy return leftCollection.size < rightCollection.size - ? { activeCollection: `main`, lazyCollection: rightCollection } - : { activeCollection: `joined`, lazyCollection: leftCollection } + ? { activeSource: `main`, lazySource: rightCollection } + : { activeSource: `joined`, lazySource: leftCollection } default: - return { activeCollection: undefined, lazyCollection: undefined } + return { activeSource: undefined, lazySource: undefined } } } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index cf38f43d7..a81d1df2a 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -56,10 +56,10 @@ export class CollectionConfigBuilder< // Map of source alias to subscription readonly subscriptions: Record = {} - // Map of collection IDs to functions that load keys for that lazy collection - lazyCollectionsCallbacks: Record = {} - // Set of collection IDs that are lazy collections - readonly lazyCollections = new Set() + // Map of source aliases to functions that load keys for that lazy source + lazySourcesCallbacks: Record = {} + // Set of source aliases that are lazy (don't load initial state) + readonly lazySources = new Set() // Set of collection IDs that include an optimizable ORDER BY clause optimizableOrderByCollections: Record = {} @@ -121,6 +121,10 @@ export class CollectionConfigBuilder< throw new Error(`Unknown source alias "${alias}"`) } + isLazyAlias(alias: string): boolean { + return this.lazySources.has(alias) + } + // The callback function is called after the graph has run. // This gives the callback a chance to load more data if needed, // that's used to optimize orderBy operators that set a limit, @@ -211,10 +215,10 @@ export class CollectionConfigBuilder< this.pipelineCache = undefined this.sourceWhereClausesCache = undefined - // Reset lazy collection state - this.lazyCollections.clear() + // Reset lazy source alias state + this.lazySources.clear() this.optimizableOrderByCollections = {} - this.lazyCollectionsCallbacks = {} + this.lazySourcesCallbacks = {} // Clear subscription references to prevent memory leaks // Note: Individual subscriptions are already unsubscribed via unsubscribeCallbacks @@ -240,8 +244,8 @@ export class CollectionConfigBuilder< this.inputsCache as Record, this.collections, this.subscriptions, - this.lazyCollectionsCallbacks, - this.lazyCollections, + this.lazySourcesCallbacks, + this.lazySources, this.optimizableOrderByCollections ) @@ -268,8 +272,8 @@ export class CollectionConfigBuilder< this.inputsCache as Record, this.collections, this.subscriptions, - this.lazyCollectionsCallbacks, - this.lazyCollections, + this.lazySourcesCallbacks, + this.lazySources, this.optimizableOrderByCollections, new WeakMap(), new WeakMap() @@ -419,10 +423,6 @@ export class CollectionConfigBuilder< const subscription = collectionSubscriber.subscribe() this.subscriptions[alias] = subscription - // Also store under collection key for backward compatibility with join logic - // that may reference collection-level subscriptions - const collectionKey = `__collection:${collectionId}` - this.subscriptions[collectionKey] = subscription const loadMore = collectionSubscriber.loadMoreIfNeeded.bind( collectionSubscriber, diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 3e9460bad..d196ecd77 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -52,9 +52,10 @@ export class CollectionSubscriber< orderByInfo ) } else { - // If the collection is lazy then we should not include the initial state - const includeInitialState = - !this.collectionConfigBuilder.lazyCollections.has(this.collectionId) + // If the source alias is lazy then we should not include the initial state + const includeInitialState = !this.collectionConfigBuilder.isLazyAlias( + this.alias + ) subscription = this.subscribeToMatchingChanges( whereExpression, diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index c96641164..5a6f3a686 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -282,7 +282,7 @@ describe(`Query2 Subqueries`, () => { const graph = new D2() const issuesInput = createIssueInput(graph) const usersInput = createUserInput(graph) - const lazyCollections = new Set() + const lazySources = new Set() const compilation = compileQuery( builtQuery, { @@ -292,7 +292,7 @@ describe(`Query2 Subqueries`, () => { { issues: issuesCollection, users: usersCollection }, subscriptions, { issues: dummyCallbacks, users: dummyCallbacks }, - lazyCollections, + lazySources, {} ) const { pipeline } = compilation @@ -306,15 +306,12 @@ describe(`Query2 Subqueries`, () => { ? usersSubscription : issuesSubscription } - - const collectionKey = `__collection:${collectionId}` - if (!subscriptions[collectionKey]) { - subscriptions[collectionKey] = subscriptions[alias] - } } - // Since we're doing a left join, the collection on the right should be handled lazily - expect(lazyCollections).contains(usersCollection.id) + // Since we're doing a left join, the alias on the right (from the subquery) should be handled lazily + // The subquery uses 'user' alias, but the join uses 'activeUser' - we expect the lazy alias + // to be the one that's marked (which is 'activeUser' since it's the joinedTableAlias) + expect(lazySources).contains(`activeUser`) const messages: Array> = [] pipeline.pipe( diff --git a/packages/db/tests/query/compiler/subquery-caching.test.ts b/packages/db/tests/query/compiler/subquery-caching.test.ts index 211452336..eed392e7e 100644 --- a/packages/db/tests/query/compiler/subquery-caching.test.ts +++ b/packages/db/tests/query/compiler/subquery-caching.test.ts @@ -5,12 +5,24 @@ import { CollectionRef, PropRef, QueryRef } from "../../../src/query/ir.js" import type { QueryIR } from "../../../src/query/ir.js" import type { CollectionImpl } from "../../../src/collection/index.js" +// Helper to create a minimal mock collection for compiler tests +function createMockCollection(id: string): CollectionImpl { + return { + id, + autoIndex: `off`, + config: { + autoIndex: `off`, + getKey: (item: any) => item.id, + sync: { sync: () => {} }, + }, + size: 0, + } as any +} + describe(`Subquery Caching`, () => { it(`should cache compiled subqueries and avoid duplicate compilation`, () => { // Create a mock collection - const usersCollection = { - id: `users`, - } as CollectionImpl + const usersCollection = createMockCollection(`users`) // Create a subquery that will be used in multiple places const subquery: QueryIR = { @@ -47,6 +59,7 @@ describe(`Subquery Caching`, () => { // First compilation without shared cache const cache1 = new WeakMap() + const queryMapping1 = new WeakMap() const result1 = compileQuery( mainQuery, inputs, @@ -55,7 +68,8 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - cache1 + cache1, + queryMapping1 ) // Verify subquery is in first cache @@ -64,6 +78,7 @@ describe(`Subquery Caching`, () => { // Second compilation with different cache (should recompile everything) const cache2 = new WeakMap() + const queryMapping2 = new WeakMap() const result2 = compileQuery( mainQuery, inputs, @@ -72,7 +87,8 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - cache2 + cache2, + queryMapping2 ) // Results should be different objects (different compilation) @@ -91,7 +107,8 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - cache2 + cache2, + new WeakMap() ) // Result should be the same object as #2 (reused from cache) @@ -110,7 +127,8 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - cache2 + cache2, + new WeakMap() ) const subqueryResult2 = compileQuery( subquery, @@ -120,7 +138,8 @@ describe(`Subquery Caching`, () => { {}, new Set(), {}, - cache2 + cache2, + new WeakMap() ) // Both subquery compilations should return the same cached result @@ -128,9 +147,7 @@ describe(`Subquery Caching`, () => { }) it(`should reuse cached results for the same query object`, () => { - const usersCollection = { - id: `users`, - } as CollectionImpl + const usersCollection = createMockCollection(`users`) const subquery: QueryIR = { from: new CollectionRef(usersCollection, `u`), @@ -175,9 +192,7 @@ describe(`Subquery Caching`, () => { }) it(`should compile different query objects separately even with shared cache`, () => { - const usersCollection = { - id: `users`, - } as CollectionImpl + const usersCollection = createMockCollection(`users`) // Create two structurally identical but different query objects const subquery1: QueryIR = { @@ -236,9 +251,7 @@ describe(`Subquery Caching`, () => { }) it(`should use cache to avoid recompilation in nested subqueries`, () => { - const usersCollection = { - id: `users`, - } as CollectionImpl + const usersCollection = createMockCollection(`users`) // Create a deeply nested subquery that references the same query multiple times const innerSubquery: QueryIR = { diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index 22c8ee738..c0d8ad0ff 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -4,9 +4,11 @@ import { createLiveQueryCollection, eq, gt, + isNull, isUndefined, lt, not, + or, } from "../../src/query/index.js" import { createCollection } from "../../src/collection/index.js" import { mockSyncCollectionOptions } from "../utils.js" @@ -1553,6 +1555,86 @@ function createJoinTests(autoIndex: `off` | `eager`): void { `ba224e71-a464-418d-a0a9-5959b490775d` ) }) + + test(`should handle self-join with different WHERE clauses on each alias`, () => { + // This test ensures that different aliases of the same collection + // can maintain independent WHERE filters in per-alias subscriptions + type Person = { + id: number + name: string + age: number + manager_id: number | undefined + } + + const samplePeople: Array = [ + { id: 1, name: `Alice`, age: 35, manager_id: undefined }, + { id: 2, name: `Bob`, age: 40, manager_id: 1 }, + { id: 3, name: `Charlie`, age: 28, manager_id: 2 }, + { id: 4, name: `Dave`, age: 32, manager_id: 2 }, + { id: 5, name: `Eve`, age: 45, manager_id: 1 }, + ] + + const peopleCollection = createCollection( + mockSyncCollectionOptions({ + id: `test-people-self-join-where`, + getKey: (person) => person.id, + initialData: samplePeople, + autoIndex, + }) + ) + + // Query: Find employees aged > 30 and their managers aged > 35 + const selfJoinWithFilters = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ employee: peopleCollection }) + .where(({ employee }) => gt(employee.age, 30)) + .join( + { manager: peopleCollection }, + ({ employee, manager }) => eq(employee.manager_id, manager.id), + `left` + ) + .where(({ manager }) => or(isNull(manager.id), gt(manager.age, 35))) + .select(({ employee, manager }) => ({ + employeeId: employee.id, + employeeName: employee.name, + employeeAge: employee.age, + managerId: manager?.id, + managerName: manager?.name, + managerAge: manager?.age, + })), + }) + + const results = selfJoinWithFilters.toArray + + // Expected logic: + // - Alice (35, no manager) - employee filter passes (35 > 30), manager is null so filter passes + // - Bob (40, manager Alice 35) - employee filter passes (40 > 30), but manager filter fails (35 NOT > 35) + // - Charlie (28, manager Bob 40) - employee filter fails (28 NOT > 30) + // - Dave (32, manager Bob 40) - employee filter passes (32 > 30), manager filter passes (40 > 35) + // - Eve (45, manager Alice 35) - employee filter passes (45 > 30), but manager filter fails (35 NOT > 35) + + // The optimizer pushes WHERE clauses into subqueries, so: + // - "employee" alias gets: WHERE age > 30 + // - "manager" alias gets: WHERE age > 35 OR id IS NULL (but manager join is LEFT, so null handling is different) + + // After optimization, only Dave should match because: + // - His age (32) > 30 (employee filter) + // - His manager Bob's age (40) > 35 (manager filter) + // Alice would match if the isNull check works correctly for outer joins + + // Let's verify we get at least Dave + expect(results.length).toBeGreaterThanOrEqual(1) + + const dave = results.find((r) => r.employeeId === 4) + expect(dave).toBeDefined() + expect(dave!.employeeName).toBe(`Dave`) + expect(dave!.employeeAge).toBe(32) + expect(dave!.managerId).toBe(2) + expect(dave!.managerName).toBe(`Bob`) + expect(dave!.managerAge).toBe(40) + }) }) test(`should handle multiple joins with where clauses to the same source collection`, () => { From 04688eb0b21dd3ecaf77369043e8f83fdff5b0c4 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 13:08:16 +0100 Subject: [PATCH 07/14] remove fallback to collecitonId --- packages/db/src/query/compiler/index.ts | 20 +++++++++++-------- packages/db/src/query/compiler/joins.ts | 2 +- .../tests/query/compiler/subqueries.test.ts | 16 +++++++-------- .../query/compiler/subquery-caching.test.ts | 10 +++++----- 4 files changed, 26 insertions(+), 22 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 3d02a5afb..ca4219bf4 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -45,12 +45,17 @@ export interface CompilationResult { } /** - * Compiles a query2 IR into a D2 pipeline + * Compiles a query IR into a D2 pipeline * @param rawQuery The query IR to compile - * @param inputs Mapping of collection names to input streams + * @param inputs Mapping of source aliases to input streams (e.g., `{ employee: input1, manager: input2 }`) + * @param collections Mapping of collection IDs to Collection instances + * @param subscriptions Mapping of source aliases to CollectionSubscription instances + * @param callbacks Mapping of source aliases to lazy loading callbacks + * @param lazySources Set of source aliases that should load data lazily + * @param optimizableOrderByCollections Map of collection IDs to order-by optimization info * @param cache Optional cache for compiled subqueries (used internally for recursion) * @param queryMapping Optional mapping from optimized queries to original queries - * @returns A CompilationResult with the pipeline and collection WHERE clauses + * @returns A CompilationResult with the pipeline, source WHERE clauses, and alias metadata */ export function compileQuery( rawQuery: QueryIR, @@ -84,10 +89,9 @@ export function compileQuery( const aliasToCollectionId: Record = {} // Create a map of source aliases to input streams. - // Note: During input resolution, alias keys take precedence over collection ID keys. - // This enables per-alias subscriptions: when looking up an input stream, we first check - // for `inputs[alias]` before falling back to `inputs[collectionId]`. This allows different - // aliases of the same collection (e.g., self-joins) to have independent filtered streams. + // Inputs MUST be keyed by alias (e.g., `{ employee: input1, manager: input2 }`), + // not by collection ID. This enables per-alias subscriptions where different aliases + // of the same collection (e.g., self-joins) maintain independent filtered streams. const sources: Record = {} // Process the FROM clause to get the main source @@ -351,7 +355,7 @@ function processFrom( ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { - const input = allInputs[from.alias] ?? allInputs[from.collection.id] + const input = allInputs[from.alias] if (!input) { throw new CollectionInputNotFoundError( from.alias, diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index c156ebf76..135946edc 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -431,7 +431,7 @@ function processJoinSource( ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { - const input = allInputs[from.alias] ?? allInputs[from.collection.id] + const input = allInputs[from.alias] if (!input) { throw new CollectionInputNotFoundError( from.alias, diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index 5a6f3a686..82539bb5b 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -172,7 +172,7 @@ describe(`Query2 Subqueries`, () => { const issuesInput = createIssueInput(graph) const { pipeline } = compileQuery( builtQuery, - { issues: issuesInput }, + { issue: issuesInput }, { issues: issuesCollection }, {}, {}, @@ -286,12 +286,12 @@ describe(`Query2 Subqueries`, () => { const compilation = compileQuery( builtQuery, { - issues: issuesInput, - users: usersInput, + issue: issuesInput, + user: usersInput, }, { issues: issuesCollection, users: usersCollection }, subscriptions, - { issues: dummyCallbacks, users: dummyCallbacks }, + { issue: dummyCallbacks, user: dummyCallbacks }, lazySources, {} ) @@ -395,12 +395,12 @@ describe(`Query2 Subqueries`, () => { const compilation = compileQuery( builtQuery, { - issues: issuesInput, - users: usersInput, + issue: issuesInput, + user: usersInput, }, { issues: issuesCollection, users: usersCollection }, subscriptions, - { issues: dummyCallbacks, users: dummyCallbacks }, + { issue: dummyCallbacks, user: dummyCallbacks }, lazyCollections, {} ) @@ -441,7 +441,7 @@ describe(`Query2 Subqueries`, () => { const issuesInput = createIssueInput(graph) const { pipeline } = compileQuery( builtQuery, - { issues: issuesInput }, + { issue: issuesInput }, { issues: issuesCollection }, {}, {}, diff --git a/packages/db/tests/query/compiler/subquery-caching.test.ts b/packages/db/tests/query/compiler/subquery-caching.test.ts index eed392e7e..3df84cbee 100644 --- a/packages/db/tests/query/compiler/subquery-caching.test.ts +++ b/packages/db/tests/query/compiler/subquery-caching.test.ts @@ -50,10 +50,10 @@ describe(`Subquery Caching`, () => { }, } - // Set up D2 inputs + // Set up D2 inputs - keyed by alias, not collection ID const graph = new D2() const userInput = graph.newInput<[number, any]>() - const inputs = { users: userInput } + const inputs = { u: userInput } // Test: Compile the main query twice - first without shared cache, then with shared cache @@ -159,7 +159,7 @@ describe(`Subquery Caching`, () => { const graph = new D2() const userInput = graph.newInput<[number, any]>() - const inputs = { users: userInput } + const inputs = { u: userInput } // Create a shared cache const sharedCache = new WeakMap() @@ -216,7 +216,7 @@ describe(`Subquery Caching`, () => { const graph = new D2() const userInput = graph.newInput<[number, any]>() - const inputs = { users: userInput } + const inputs = { u: userInput } const sharedCache = new WeakMap() @@ -287,7 +287,7 @@ describe(`Subquery Caching`, () => { const graph = new D2() const userInput = graph.newInput<[number, any]>() - const inputs = { users: userInput } + const inputs = { u: userInput } const sharedCache = new WeakMap() From 4ee665aedaea6afe8f2002a964a5b6a79606b447 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 13:26:30 +0100 Subject: [PATCH 08/14] better mapping of subquery aliases --- packages/db/src/query/compiler/index.ts | 31 +++++++++-- packages/db/src/query/compiler/joins.ts | 53 +++++++++++-------- .../tests/query/compiler/subqueries.test.ts | 23 +++----- 3 files changed, 68 insertions(+), 39 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index ca4219bf4..4f2634592 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -42,6 +42,8 @@ export interface CompilationResult { sourceWhereClauses: Map> /** Map of alias to underlying collection id used during compilation */ aliasToCollectionId: Record + /** Map of outer alias to inner alias for subquery aliasing (e.g., 'activeUser' → 'user') */ + aliasRemapping: Record } /** @@ -88,6 +90,11 @@ export function compileQuery( // the live layer can subscribe to every alias the optimizer introduces. const aliasToCollectionId: Record = {} + // Track alias remapping for subqueries (outer alias → inner alias) + // e.g., when .join({ activeUser: subquery }) where subquery uses .from({ user: collection }) + // we store: aliasRemapping['activeUser'] = 'user' + const aliasRemapping: Record = {} + // Create a map of source aliases to input streams. // Inputs MUST be keyed by alias (e.g., `{ employee: input1, manager: input2 }`), // not by collection ID. This enables per-alias subscriptions where different aliases @@ -109,7 +116,8 @@ export function compileQuery( optimizableOrderByCollections, cache, queryMapping, - aliasToCollectionId + aliasToCollectionId, + aliasRemapping ) sources[mainSource] = mainInput @@ -143,7 +151,8 @@ export function compileQuery( optimizableOrderByCollections, rawQuery, compileQuery, - aliasToCollectionId + aliasToCollectionId, + aliasRemapping ) } @@ -303,6 +312,7 @@ export function compileQuery( pipeline: result, sourceWhereClauses, aliasToCollectionId, + aliasRemapping, } cache.set(rawQuery, compilationResult) @@ -332,6 +342,7 @@ export function compileQuery( pipeline: result, sourceWhereClauses, aliasToCollectionId, + aliasRemapping, } cache.set(rawQuery, compilationResult) @@ -351,7 +362,8 @@ function processFrom( optimizableOrderByCollections: Record, cache: QueryCache, queryMapping: QueryMapping, - aliasToCollectionId: Record + aliasToCollectionId: Record, + aliasRemapping: Record ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { @@ -383,7 +395,20 @@ function processFrom( queryMapping ) + // Pull up the inner alias mappings Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) + Object.assign(aliasRemapping, subQueryResult.aliasRemapping) + + // For subqueries, the outer alias (from.alias) may differ from inner aliases. + // Find the inner alias that corresponds to the subquery's main collection and create a remapping. + const innerAlias = Object.keys(subQueryResult.aliasToCollectionId).find( + (alias) => + subQueryResult.aliasToCollectionId[alias] === + subQueryResult.collectionId + ) + if (innerAlias && innerAlias !== from.alias) { + aliasRemapping[from.alias] = innerAlias + } // Extract the pipeline from the compilation result const subQueryInput = subQueryResult.pipeline diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 135946edc..cbc08f657 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -64,7 +64,8 @@ export function processJoins( optimizableOrderByCollections: Record, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, - aliasToCollectionId: Record + aliasToCollectionId: Record, + aliasRemapping: Record ): NamespacedAndKeyedStream { let resultPipeline = pipeline @@ -85,7 +86,8 @@ export function processJoins( optimizableOrderByCollections, rawQuery, onCompileSubquery, - aliasToCollectionId + aliasToCollectionId, + aliasRemapping ) } @@ -111,7 +113,8 @@ function processJoin( optimizableOrderByCollections: Record, rawQuery: QueryIR, onCompileSubquery: CompileQueryFn, - aliasToCollectionId: Record + aliasToCollectionId: Record, + aliasRemapping: Record ): NamespacedAndKeyedStream { const isCollectionRef = joinClause.from.type === `collectionRef` @@ -131,7 +134,8 @@ function processJoin( cache, queryMapping, onCompileSubquery, - aliasToCollectionId + aliasToCollectionId, + aliasRemapping ) // Add the joined source to the sources map @@ -270,23 +274,18 @@ function processJoin( const lazyAliasCandidate = activeSource === `main` ? joinedSource : mainSource - // The alias candidate might be a subquery alias without a direct subscription. - // In that case, find an alias from aliasToCollectionId that maps to the lazy collection. - let lazySourceSubscription = subscriptions[lazyAliasCandidate] - if (!lazySourceSubscription) { - // Search for any alias that maps to the lazy collection ID - const matchingAlias = Object.entries(aliasToCollectionId).find( - ([_alias, collId]) => collId === lazySource.id - )?.[0] - - if (matchingAlias) { - lazySourceSubscription = subscriptions[matchingAlias] - } - } + // Find the subscription for lazy loading. + // For subqueries, the outer join alias (e.g., 'activeUser') may differ from the + // inner alias (e.g., 'user'). Use aliasRemapping to resolve outer → inner alias. + // Example: .join({ activeUser: subquery }) where subquery uses .from({ user: collection }) + // → aliasRemapping['activeUser'] = 'user' + const resolvedAlias = + aliasRemapping[lazyAliasCandidate] || lazyAliasCandidate + const lazySourceSubscription = subscriptions[resolvedAlias] if (!lazySourceSubscription) { throw new Error( - `Internal error: subscription for alias '${lazyAliasCandidate}' (collection '${lazySource.id}') is missing in join pipeline. Available aliases: ${Object.keys(subscriptions).join(`, `)}. This indicates a bug in alias tracking.` + `Internal error: subscription for alias '${resolvedAlias}' (remapped from '${lazyAliasCandidate}', collection '${lazySource.id}') is missing in join pipeline. Available aliases: ${Object.keys(subscriptions).join(`, `)}. This indicates a bug in alias tracking.` ) } @@ -427,7 +426,8 @@ function processJoinSource( cache: QueryCache, queryMapping: QueryMapping, onCompileSubquery: CompileQueryFn, - aliasToCollectionId: Record + aliasToCollectionId: Record, + aliasRemapping: Record ): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { @@ -459,9 +459,20 @@ function processJoinSource( queryMapping ) - // Pull the nested alias map up so the caller can subscribe to those aliases - // and keep the current alias pointing at the subquery's collection. + // Pull up the inner alias mappings Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) + Object.assign(aliasRemapping, subQueryResult.aliasRemapping) + + // For subqueries, the outer alias (from.alias) may differ from inner aliases. + // Find the inner alias that corresponds to the subquery's main collection and create a remapping. + const innerAlias = Object.keys(subQueryResult.aliasToCollectionId).find( + (alias) => + subQueryResult.aliasToCollectionId[alias] === + subQueryResult.collectionId + ) + if (innerAlias && innerAlias !== from.alias) { + aliasRemapping[from.alias] = innerAlias + } // Extract the pipeline from the compilation result const subQueryInput = subQueryResult.pipeline diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index 82539bb5b..ae291ecfb 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -273,9 +273,11 @@ describe(`Query2 Subqueries`, () => { const usersSubscription = usersCollection.subscribeChanges(() => {}) const issuesSubscription = issuesCollection.subscribeChanges(() => {}) + + // Create subscriptions keyed by alias (matching production behavior) const subscriptions: Record = { - [usersCollection.id]: usersSubscription, - [issuesCollection.id]: issuesSubscription, + issue: issuesSubscription, + user: usersSubscription, } // Compile and execute the query @@ -297,17 +299,6 @@ describe(`Query2 Subqueries`, () => { ) const { pipeline } = compilation - for (const [alias, collectionId] of Object.entries( - compilation.aliasToCollectionId - )) { - if (!subscriptions[alias]) { - subscriptions[alias] = - collectionId === usersCollection.id - ? usersSubscription - : issuesSubscription - } - } - // Since we're doing a left join, the alias on the right (from the subquery) should be handled lazily // The subquery uses 'user' alias, but the join uses 'activeUser' - we expect the lazy alias // to be the one that's marked (which is 'activeUser' since it's the joinedTableAlias) @@ -377,9 +368,11 @@ describe(`Query2 Subqueries`, () => { const usersSubscription = usersCollection.subscribeChanges(() => {}) const issuesSubscription = issuesCollection.subscribeChanges(() => {}) + + // Create subscriptions keyed by alias (matching production behavior) const subscriptions: Record = { - [usersCollection.id]: usersSubscription, - [issuesCollection.id]: issuesSubscription, + issue: issuesSubscription, + user: usersSubscription, } const dummyCallbacks = { From 0f7798b78319251bcc0ad22c154f2dd05576c4d3 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 13:40:11 +0100 Subject: [PATCH 09/14] rename stuff and tidy --- packages/db/src/errors.ts | 43 +++++++--- packages/db/src/query/compiler/joins.ts | 101 ++++++++++++----------- packages/db/src/query/compiler/select.ts | 5 +- packages/db/tests/query/join.test.ts | 20 +++-- packages/db/tests/query/order-by.test.ts | 4 +- 5 files changed, 99 insertions(+), 74 deletions(-) diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 4ad8d673f..2d2dce4c4 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -409,32 +409,32 @@ export class UnsupportedJoinTypeError extends JoinError { } } -export class InvalidJoinConditionSameTableError extends JoinError { - constructor(tableAlias: string) { +export class InvalidJoinConditionSameSourceError extends JoinError { + constructor(sourceAlias: string) { super( - `Invalid join condition: both expressions refer to the same table "${tableAlias}"` + `Invalid join condition: both expressions refer to the same source "${sourceAlias}"` ) } } -export class InvalidJoinConditionTableMismatchError extends JoinError { +export class InvalidJoinConditionSourceMismatchError extends JoinError { constructor() { - super(`Invalid join condition: expressions must reference table aliases`) + super(`Invalid join condition: expressions must reference source aliases`) } } -export class InvalidJoinConditionLeftTableError extends JoinError { - constructor(tableAlias: string) { +export class InvalidJoinConditionLeftSourceError extends JoinError { + constructor(sourceAlias: string) { super( - `Invalid join condition: left expression refers to an unavailable table "${tableAlias}"` + `Invalid join condition: left expression refers to an unavailable source "${sourceAlias}"` ) } } -export class InvalidJoinConditionRightTableError extends JoinError { - constructor(tableAlias: string) { +export class InvalidJoinConditionRightSourceError extends JoinError { + constructor(sourceAlias: string) { super( - `Invalid join condition: right expression does not refer to the joined table "${tableAlias}"` + `Invalid join condition: right expression does not refer to the joined source "${sourceAlias}"` ) } } @@ -581,3 +581,24 @@ export class WhereClauseConversionError extends QueryOptimizerError { ) } } + +export class SubscriptionNotFoundError extends QueryCompilationError { + constructor( + resolvedAlias: string, + originalAlias: string, + collectionId: string, + availableAliases: Array + ) { + super( + `Internal error: subscription for alias '${resolvedAlias}' (remapped from '${originalAlias}', collection '${collectionId}') is missing in join pipeline. Available aliases: ${availableAliases.join(`, `)}. This indicates a bug in alias tracking.` + ) + } +} + +export class AggregateNotSupportedError extends QueryCompilationError { + constructor() { + super( + `Aggregate expressions are not supported in this context. Use GROUP BY clause for aggregates.` + ) + } +} diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index cbc08f657..e18c4e7ba 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -8,11 +8,12 @@ import { import { CollectionInputNotFoundError, InvalidJoinCondition, - InvalidJoinConditionLeftTableError, - InvalidJoinConditionRightTableError, - InvalidJoinConditionSameTableError, - InvalidJoinConditionTableMismatchError, + InvalidJoinConditionLeftSourceError, + InvalidJoinConditionRightSourceError, + InvalidJoinConditionSameSourceError, + InvalidJoinConditionSourceMismatchError, JoinCollectionNotFoundError, + SubscriptionNotFoundError, UnsupportedJoinSourceTypeError, UnsupportedJoinTypeError, } from "../../errors.js" @@ -179,7 +180,7 @@ function processJoin( // Prepare the main pipeline for joining let mainPipeline = pipeline.pipe( map(([currentKey, namespacedRow]) => { - // Extract the join key from the main table expression + // Extract the join key from the main source expression const mainKey = compiledMainExpr(namespacedRow) // Return [joinKey, [originalKey, namespacedRow]] @@ -196,7 +197,7 @@ function processJoin( // Wrap the row in a namespaced structure const namespacedRow: NamespacedRow = { [joinedSource]: row } - // Extract the join key from the joined table expression + // Extract the join key from the joined source expression const joinedKey = compiledJoinedExpr(namespacedRow) // Return [joinKey, [originalKey, namespacedRow]] @@ -269,8 +270,8 @@ function processJoin( > = activePipeline.pipe( tap((data) => { // For outer joins (LEFT/RIGHT), the driving side determines which alias's - // subscription we consult for lazy loading. The main table drives LEFT joins, - // joined table drives RIGHT joins. + // subscription we consult for lazy loading. The main source drives LEFT joins, + // joined source drives RIGHT joins. const lazyAliasCandidate = activeSource === `main` ? joinedSource : mainSource @@ -284,8 +285,11 @@ function processJoin( const lazySourceSubscription = subscriptions[resolvedAlias] if (!lazySourceSubscription) { - throw new Error( - `Internal error: subscription for alias '${resolvedAlias}' (remapped from '${lazyAliasCandidate}', collection '${lazySource.id}') is missing in join pipeline. Available aliases: ${Object.keys(subscriptions).join(`, `)}. This indicates a bug in alias tracking.` + throw new SubscriptionNotFoundError( + resolvedAlias, + lazyAliasCandidate, + lazySource.id, + Object.keys(subscriptions) ) } @@ -324,62 +328,61 @@ function processJoin( } /** - * Analyzes join expressions to determine which refers to which table - * and returns them in the correct order (available table expression first, joined table expression second) + * Analyzes join expressions to determine which refers to which source + * and returns them in the correct order (available source expression first, joined source expression second) */ function analyzeJoinExpressions( left: BasicExpression, right: BasicExpression, - allAvailableTableAliases: Array, + allAvailableSourceAliases: Array, joinedSource: string ): { mainExpr: BasicExpression; joinedExpr: BasicExpression } { - // Filter out the joined table alias from the available table aliases - const availableSources = allAvailableTableAliases.filter( + // Filter out the joined source alias from the available source aliases + const availableSources = allAvailableSourceAliases.filter( (alias) => alias !== joinedSource ) - const leftTableAlias = getTableAliasFromExpression(left) - const rightTableAlias = getTableAliasFromExpression(right) + const leftSourceAlias = getSourceAliasFromExpression(left) + const rightSourceAlias = getSourceAliasFromExpression(right) - // If left expression refers to an available table and right refers to joined table, keep as is + // If left expression refers to an available source and right refers to joined source, keep as is if ( - leftTableAlias && - availableSources.includes(leftTableAlias) && - rightTableAlias === joinedSource + leftSourceAlias && + availableSources.includes(leftSourceAlias) && + rightSourceAlias === joinedSource ) { return { mainExpr: left, joinedExpr: right } } - // If left expression refers to joined table and right refers to an available table, swap them + // If left expression refers to joined source and right refers to an available source, swap them if ( - leftTableAlias === joinedSource && - rightTableAlias && - availableSources.includes(rightTableAlias) + leftSourceAlias === joinedSource && + rightSourceAlias && + availableSources.includes(rightSourceAlias) ) { return { mainExpr: right, joinedExpr: left } } - // If one expression doesn't refer to any table, this is an invalid join - if (!leftTableAlias || !rightTableAlias) { - // For backward compatibility, use the first available table alias in error message - throw new InvalidJoinConditionTableMismatchError() + // If one expression doesn't refer to any source, this is an invalid join + if (!leftSourceAlias || !rightSourceAlias) { + throw new InvalidJoinConditionSourceMismatchError() } // If both expressions refer to the same alias, this is an invalid join - if (leftTableAlias === rightTableAlias) { - throw new InvalidJoinConditionSameTableError(leftTableAlias) + if (leftSourceAlias === rightSourceAlias) { + throw new InvalidJoinConditionSameSourceError(leftSourceAlias) } - // Left side must refer to an available table + // Left side must refer to an available source // This cannot happen with the query builder as there is no way to build a ref - // to an unavailable table, but just in case, but could happen with the IR - if (!availableSources.includes(leftTableAlias)) { - throw new InvalidJoinConditionLeftTableError(leftTableAlias) + // to an unavailable source, but just in case, but could happen with the IR + if (!availableSources.includes(leftSourceAlias)) { + throw new InvalidJoinConditionLeftSourceError(leftSourceAlias) } - // Right side must refer to the joined table - if (rightTableAlias !== joinedSource) { - throw new InvalidJoinConditionRightTableError(joinedSource) + // Right side must refer to the joined source + if (rightSourceAlias !== joinedSource) { + throw new InvalidJoinConditionRightSourceError(joinedSource) } // This should not be reachable given the logic above, but just in case @@ -387,27 +390,27 @@ function analyzeJoinExpressions( } /** - * Extracts the table alias from a join expression + * Extracts the source alias from a join expression */ -function getTableAliasFromExpression(expr: BasicExpression): string | null { +function getSourceAliasFromExpression(expr: BasicExpression): string | null { switch (expr.type) { case `ref`: - // PropRef path has the table alias as the first element + // PropRef path has the source alias as the first element return expr.path[0] || null case `func`: { - // For function expressions, we need to check if all arguments refer to the same table - const tableAliases = new Set() + // For function expressions, we need to check if all arguments refer to the same source + const sourceAliases = new Set() for (const arg of expr.args) { - const alias = getTableAliasFromExpression(arg) + const alias = getSourceAliasFromExpression(arg) if (alias) { - tableAliases.add(alias) + sourceAliases.add(alias) } } - // If all arguments refer to the same table, return that table alias - return tableAliases.size === 1 ? Array.from(tableAliases)[0]! : null + // If all arguments refer to the same source, return that source alias + return sourceAliases.size === 1 ? Array.from(sourceAliases)[0]! : null } default: - // Values (type='val') don't reference any table + // Values (type='val') don't reference any source return null } } @@ -567,7 +570,7 @@ function processJoinResults(joinType: string) { /** * Returns the active and lazy collections for a join clause. * The active collection is the one that we need to fully iterate over - * and it can be the main table (i.e. left collection) or the joined table (i.e. right collection). + * and it can be the main source (i.e. left collection) or the joined source (i.e. right collection). * The lazy collection is the one that we should join-in lazily based on matches in the active collection. * @param joinClause - The join clause to analyze * @param leftCollection - The left collection diff --git a/packages/db/src/query/compiler/select.ts b/packages/db/src/query/compiler/select.ts index 322d9155f..f036fb29c 100644 --- a/packages/db/src/query/compiler/select.ts +++ b/packages/db/src/query/compiler/select.ts @@ -1,5 +1,6 @@ import { map } from "@tanstack/db-ivm" import { PropRef, Value as ValClass, isExpressionLike } from "../ir.js" +import { AggregateNotSupportedError } from "../../errors.js" import { compileExpression } from "./evaluators.js" import type { Aggregate, BasicExpression, Select } from "../ir.js" import type { @@ -157,9 +158,7 @@ export function processArgument( namespacedRow: NamespacedRow ): any { if (isAggregateExpression(arg)) { - throw new Error( - `Aggregate expressions are not supported in this context. Use GROUP BY clause for aggregates.` - ) + throw new AggregateNotSupportedError() } // Pre-compile the expression and evaluate immediately diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index c0d8ad0ff..67d302293 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -1327,7 +1327,7 @@ function createJoinTests(autoIndex: `off` | `eager`): void { }) }) - test(`should throw error when both expressions refer to the same table`, () => { + test(`should throw error when both expressions refer to the same source`, () => { const usersCollection = createCollection( mockSyncCollectionOptions({ id: `test-users-same-table`, @@ -1349,11 +1349,11 @@ function createJoinTests(autoIndex: `off` | `eager`): void { ), }) }).toThrow( - `Invalid join condition: both expressions refer to the same table "user"` + `Invalid join condition: both expressions refer to the same source "user"` ) }) - test(`should throw error when expressions don't reference table aliases`, () => { + test(`should throw error when expressions don't reference source aliases`, () => { const usersCollection = createCollection( mockSyncCollectionOptions({ id: `test-users-no-refs`, @@ -1375,11 +1375,11 @@ function createJoinTests(autoIndex: `off` | `eager`): void { ), }) }).toThrow( - `Invalid join condition: expressions must reference table aliases` + `Invalid join condition: expressions must reference source aliases` ) }) - test(`should throw error when right side doesn't match joined table`, () => { + test(`should throw error when right side doesn't match joined source`, () => { const usersCollection = createCollection( mockSyncCollectionOptions({ id: `test-users-no-refs`, @@ -1410,11 +1410,11 @@ function createJoinTests(autoIndex: `off` | `eager`): void { ), }) }).toThrow( - `Invalid join condition: right expression does not refer to the joined table "dept2"` + `Invalid join condition: right expression does not refer to the joined source "dept2"` ) }) - test(`should throw error when function expression has mixed table references`, () => { + test(`should throw error when function expression has mixed source references`, () => { const usersCollection = createCollection( mockSyncCollectionOptions({ id: `test-users-mixed-refs`, @@ -1436,7 +1436,7 @@ function createJoinTests(autoIndex: `off` | `eager`): void { ), }) }).toThrow( - `Invalid join condition: both expressions refer to the same table "user"` + `Invalid join condition: both expressions refer to the same source "user"` ) }) @@ -1595,7 +1595,9 @@ function createJoinTests(autoIndex: `off` | `eager`): void { ({ employee, manager }) => eq(employee.manager_id, manager.id), `left` ) - .where(({ manager }) => or(isNull(manager.id), gt(manager.age, 35))) + .where(({ manager }) => + or(isNull(manager?.id), gt(manager?.age, 35)) + ) .select(({ employee, manager }) => ({ employeeId: employee.id, employeeName: employee.name, diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index 20bf0353e..68bce07e7 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -1491,12 +1491,12 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { ({ employees, departments }) => eq(employees.department_id, departments.id) ) - .orderBy(({ departments }) => departments.name, `asc`) + .orderBy(({ departments }) => departments?.name, `asc`) .limit(5) .select(({ employees, departments }) => ({ employeeId: employees.id, employeeName: employees.name, - departmentName: departments.name, + departmentName: departments?.name, })) ) From 17ffc3194d297bb3636cd6ca9c4cd86b6ea7391b Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 2 Oct 2025 14:00:27 +0100 Subject: [PATCH 10/14] changeset --- .changeset/fix-self-join-bug.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-self-join-bug.md diff --git a/.changeset/fix-self-join-bug.md b/.changeset/fix-self-join-bug.md new file mode 100644 index 000000000..909d9bd29 --- /dev/null +++ b/.changeset/fix-self-join-bug.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Fix self-join bug by implementing per-alias subscriptions in live queries From 50316cdd2c5bb521f6a4f6c60aef3ce96c32ad26 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 4 Oct 2025 16:41:58 +0100 Subject: [PATCH 11/14] better comments --- packages/db/src/errors.ts | 14 +++++++++ packages/db/src/query/compiler/index.ts | 29 ++++++++++++++----- packages/db/src/query/compiler/joins.ts | 14 +++++++-- .../query/live/collection-config-builder.ts | 21 ++++++++++---- 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 2d2dce4c4..f983a73af 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -349,6 +349,10 @@ export class LimitOffsetRequireOrderByError extends QueryCompilationError { } } +/** + * Error thrown when a collection input stream is not found during query compilation. + * In self-joins, each alias (e.g., 'employee', 'manager') requires its own input stream. + */ export class CollectionInputNotFoundError extends QueryCompilationError { constructor( alias: string, @@ -574,6 +578,9 @@ export class CannotCombineEmptyExpressionListError extends QueryOptimizerError { } } +/** + * Internal error when the query optimizer fails to convert a WHERE clause to a collection filter. + */ export class WhereClauseConversionError extends QueryOptimizerError { constructor(collectionId: string, alias: string) { super( @@ -582,6 +589,10 @@ export class WhereClauseConversionError extends QueryOptimizerError { } } +/** + * Error when a subscription cannot be found during lazy join processing. + * For subqueries, aliases may be remapped (e.g., 'activeUser' → 'user'). + */ export class SubscriptionNotFoundError extends QueryCompilationError { constructor( resolvedAlias: string, @@ -595,6 +606,9 @@ export class SubscriptionNotFoundError extends QueryCompilationError { } } +/** + * Error thrown when aggregate expressions are used outside of a GROUP BY context. + */ export class AggregateNotSupportedError extends QueryCompilationError { constructor() { super( diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 4f2634592..9b2c539a7 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -31,18 +31,28 @@ import type { import type { QueryCache, QueryMapping } from "./types.js" /** - * Result of query compilation including both the pipeline and collection-specific WHERE clauses + * Result of query compilation including both the pipeline and source-specific WHERE clauses */ export interface CompilationResult { /** The ID of the main collection */ collectionId: string - /** The compiled query pipeline */ + + /** The compiled query pipeline (D2 stream) */ pipeline: ResultStream + /** Map of source aliases to their WHERE clauses for index optimization */ sourceWhereClauses: Map> - /** Map of alias to underlying collection id used during compilation */ + + /** + * Maps each source alias to its collection ID. Enables per-alias subscriptions for self-joins. + * Example: `{ employee: 'employees-col-id', manager: 'employees-col-id' }` + */ aliasToCollectionId: Record - /** Map of outer alias to inner alias for subquery aliasing (e.g., 'activeUser' → 'user') */ + + /** + * Maps outer alias to inner alias for subqueries (e.g., `{ activeUser: 'user' }`). + * Used to resolve subscriptions during lazy loading when aliases differ. + */ aliasRemapping: Record } @@ -350,7 +360,8 @@ export function compileQuery( } /** - * Processes the FROM clause to extract the main table alias and input stream + * Processes the FROM clause, handling direct collection references and subqueries. + * Populates `aliasToCollectionId` and `aliasRemapping` for per-alias subscription tracking. */ function processFrom( from: CollectionRef | QueryRef, @@ -395,12 +406,14 @@ function processFrom( queryMapping ) - // Pull up the inner alias mappings + // Pull up inner alias mappings from subquery compilation Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) Object.assign(aliasRemapping, subQueryResult.aliasRemapping) - // For subqueries, the outer alias (from.alias) may differ from inner aliases. - // Find the inner alias that corresponds to the subquery's main collection and create a remapping. + // Create remapping when outer alias differs from inner alias. + // Example: .join({ activeUser: subquery }) where subquery uses .from({ user: ... }) + // Creates: aliasRemapping['activeUser'] = 'user' + // Needed for subscription resolution during lazy loading. const innerAlias = Object.keys(subQueryResult.aliasToCollectionId).find( (alias) => subQueryResult.aliasToCollectionId[alias] === diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index e18c4e7ba..451005310 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -40,14 +40,18 @@ import type { import type { QueryCache, QueryMapping } from "./types.js" import type { CollectionSubscription } from "../../collection/subscription.js" +/** Function type for loading specific keys into a lazy collection */ export type LoadKeysFn = (key: Set) => void + +/** Callbacks for managing lazy-loaded collections in optimized joins */ export type LazyCollectionCallbacks = { loadKeys: LoadKeysFn loadInitialState: () => void } /** - * Processes all join clauses in a query + * Processes all join clauses, applying lazy loading optimizations and maintaining + * alias tracking for per-alias subscriptions (enables self-joins). */ export function processJoins( pipeline: NamespacedAndKeyedStream, @@ -96,7 +100,8 @@ export function processJoins( } /** - * Processes a single join clause + * Processes a single join clause with lazy loading optimization. + * For LEFT/RIGHT/INNER joins, marks one side as "lazy" (loads on-demand based on join keys). */ function processJoin( pipeline: NamespacedAndKeyedStream, @@ -265,6 +270,8 @@ function processJoin( ) } + // Set up lazy loading: intercept active side's stream and dynamically load + // matching rows from lazy side based on join keys. const activePipelineWithLoading: IStreamBuilder< [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( @@ -298,6 +305,7 @@ function processJoin( return } + // Request filtered snapshot from lazy collection for matching join keys const joinKeys = data.getInner().map(([[joinKey]]) => joinKey) const lazyJoinRef = new PropRef(followRefResult.path) const loaded = lazySourceSubscription.requestSnapshot({ @@ -462,7 +470,7 @@ function processJoinSource( queryMapping ) - // Pull up the inner alias mappings + // Pull up alias mappings from subquery Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) Object.assign(aliasRemapping, subQueryResult.aliasRemapping) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index a81d1df2a..18bfe8fcf 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -229,6 +229,12 @@ export class CollectionConfigBuilder< } } + /** + * Compiles the query pipeline in two phases: + * 1. Initial compilation with declared aliases + * 2. Recompile if optimizer introduces new aliases (provisions missing inputs) + * Ensures all aliases have input streams for per-alias subscriptions. + */ private compileBasePipeline() { this.graphCache = new D2() this.inputsCache = Object.fromEntries( @@ -238,6 +244,7 @@ export class CollectionConfigBuilder< ]) ) + // Phase 1: Initial compilation // Compile the query and capture alias metadata produced during optimisation let compilation = compileQuery( this.query, @@ -252,6 +259,8 @@ export class CollectionConfigBuilder< this.pipelineCache = compilation.pipeline this.sourceWhereClausesCache = compilation.sourceWhereClauses this.compiledAliasToCollectionId = compilation.aliasToCollectionId + + // Phase 2: Handle optimizer-generated aliases (provision inputs and recompile) // Optimized queries can introduce aliases beyond those declared on the // builder. If that happens, provision inputs for the missing aliases and // recompile so the pipeline is fully wired before execution. @@ -394,6 +403,11 @@ export class CollectionConfigBuilder< ) } + /** + * Creates per-alias subscriptions enabling self-join support. + * Each alias gets its own subscription with independent filters, even for the same collection. + * Example: `{ employee: col, manager: col }` creates two separate subscriptions. + */ private subscribeToAllCollections( config: Parameters[`sync`]>[0], syncState: FullSyncState @@ -405,10 +419,7 @@ export class CollectionConfigBuilder< ) } - // Subscribe to each alias the compiler reported. - const aliasEntries = compiledAliases - - const loaders = aliasEntries.map(([alias, collectionId]) => { + const loaders = compiledAliases.map(([alias, collectionId]) => { const collection = this.collectionByAlias[alias] ?? this.collections[collectionId]! @@ -422,7 +433,7 @@ export class CollectionConfigBuilder< ) const subscription = collectionSubscriber.subscribe() - this.subscriptions[alias] = subscription + this.subscriptions[alias] = subscription // Keyed by alias for lazy loading lookup const loadMore = collectionSubscriber.loadMoreIfNeeded.bind( collectionSubscriber, From 2f6ca4fcc51b23664ab4f3b3e90b067e26753dad Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 4 Oct 2025 16:59:52 +0100 Subject: [PATCH 12/14] remove unnecessary second pass of the compiler --- packages/db/src/errors.ts | 13 ++++++ packages/db/src/query/compiler/index.ts | 4 +- .../query/live/collection-config-builder.ts | 43 ++++--------------- 3 files changed, 23 insertions(+), 37 deletions(-) diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index f983a73af..209e7b7a6 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -616,3 +616,16 @@ export class AggregateNotSupportedError extends QueryCompilationError { ) } } + +/** + * Internal error when the compiler returns aliases that don't have corresponding input streams. + * This should never happen since all aliases come from user declarations. + */ +export class MissingAliasInputsError extends QueryCompilationError { + constructor(missingAliases: Array) { + super( + `Internal error: compiler returned aliases without inputs: ${missingAliases.join(`, `)}. ` + + `This indicates a bug in query compilation. Please report this issue.` + ) + } +} diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 9b2c539a7..711c76aa0 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -96,8 +96,8 @@ export function compileQuery( // Create a copy of the inputs map to avoid modifying the original const allInputs = { ...inputs } - // Track alias to collection id relationships discovered during compilation so - // the live layer can subscribe to every alias the optimizer introduces. + // Track alias to collection id relationships discovered during compilation. + // This includes all user-declared aliases plus inner aliases from subqueries. const aliasToCollectionId: Record = {} // Track alias remapping for subqueries (outer alias → inner alias) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 18bfe8fcf..5473aa808 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,6 +1,7 @@ import { D2, output } from "@tanstack/db-ivm" import { compileQuery } from "../compiler/index.js" import { buildQuery, getQueryIR } from "../builder/index.js" +import { MissingAliasInputsError } from "../../errors.js" import { CollectionSubscriber } from "./collection-subscriber.js" import type { CollectionSubscription } from "../../collection/subscription.js" import type { RootStreamBuilder } from "@tanstack/db-ivm" @@ -33,7 +34,7 @@ export class CollectionConfigBuilder< readonly query: QueryIR private readonly collections: Record> private readonly collectionByAlias: Record> - // Populated during compilation to include optimizer-generated aliases + // Populated during compilation with all aliases (including subquery inner aliases) private compiledAliasToCollectionId: Record = {} // WeakMap to store the keys of the results @@ -230,10 +231,7 @@ export class CollectionConfigBuilder< } /** - * Compiles the query pipeline in two phases: - * 1. Initial compilation with declared aliases - * 2. Recompile if optimizer introduces new aliases (provisions missing inputs) - * Ensures all aliases have input streams for per-alias subscriptions. + * Compiles the query pipeline with all declared aliases. */ private compileBasePipeline() { this.graphCache = new D2() @@ -244,9 +242,7 @@ export class CollectionConfigBuilder< ]) ) - // Phase 1: Initial compilation - // Compile the query and capture alias metadata produced during optimisation - let compilation = compileQuery( + const compilation = compileQuery( this.query, this.inputsCache as Record, this.collections, @@ -260,37 +256,14 @@ export class CollectionConfigBuilder< this.sourceWhereClausesCache = compilation.sourceWhereClauses this.compiledAliasToCollectionId = compilation.aliasToCollectionId - // Phase 2: Handle optimizer-generated aliases (provision inputs and recompile) - // Optimized queries can introduce aliases beyond those declared on the - // builder. If that happens, provision inputs for the missing aliases and - // recompile so the pipeline is fully wired before execution. + // Defensive check: verify all compiled aliases have corresponding inputs + // This should never happen since all aliases come from user declarations, + // but catch it early if the assumption is violated in the future. const missingAliases = Object.keys(this.compiledAliasToCollectionId).filter( (alias) => !Object.hasOwn(this.inputsCache!, alias) ) - if (missingAliases.length > 0) { - for (const alias of missingAliases) { - this.inputsCache[alias] = this.graphCache.newInput() - } - - // Note: Using fresh WeakMaps here loses cached subquery results, but ensures - // clean compilation with the new alias inputs. For complex queries with many - // subqueries, this could be optimized to preserve the cache. - compilation = compileQuery( - this.query, - this.inputsCache as Record, - this.collections, - this.subscriptions, - this.lazySourcesCallbacks, - this.lazySources, - this.optimizableOrderByCollections, - new WeakMap(), - new WeakMap() - ) - - this.pipelineCache = compilation.pipeline - this.sourceWhereClausesCache = compilation.sourceWhereClauses - this.compiledAliasToCollectionId = compilation.aliasToCollectionId + throw new MissingAliasInputsError(missingAliases) } } From 88b30928f6ef1f957a999f9f21875f52c77c3aa8 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 6 Oct 2025 21:45:29 +0100 Subject: [PATCH 13/14] remove deplicate code --- packages/db/src/query/compiler/joins.ts | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 451005310..f1d672ab8 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -276,25 +276,18 @@ function processJoin( [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( tap((data) => { - // For outer joins (LEFT/RIGHT), the driving side determines which alias's - // subscription we consult for lazy loading. The main source drives LEFT joins, - // joined source drives RIGHT joins. - const lazyAliasCandidate = - activeSource === `main` ? joinedSource : mainSource - // Find the subscription for lazy loading. // For subqueries, the outer join alias (e.g., 'activeUser') may differ from the // inner alias (e.g., 'user'). Use aliasRemapping to resolve outer → inner alias. // Example: .join({ activeUser: subquery }) where subquery uses .from({ user: collection }) // → aliasRemapping['activeUser'] = 'user' - const resolvedAlias = - aliasRemapping[lazyAliasCandidate] || lazyAliasCandidate + const resolvedAlias = aliasRemapping[lazyAlias] || lazyAlias const lazySourceSubscription = subscriptions[resolvedAlias] if (!lazySourceSubscription) { throw new SubscriptionNotFoundError( resolvedAlias, - lazyAliasCandidate, + lazyAlias, lazySource.id, Object.keys(subscriptions) ) From 7eca9219e8ab0a094d0f2d3da018153f9ec71e07 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 7 Oct 2025 09:16:16 +0100 Subject: [PATCH 14/14] comments on aliasRemapping and additional tests to confirm --- packages/db/src/query/compiler/index.ts | 32 ++- packages/db/src/query/compiler/joins.ts | 24 +- packages/db/tests/query/join-subquery.test.ts | 252 ++++++++++++++++++ 3 files changed, 296 insertions(+), 12 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 711c76aa0..d143cf3bb 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -50,8 +50,18 @@ export interface CompilationResult { aliasToCollectionId: Record /** - * Maps outer alias to inner alias for subqueries (e.g., `{ activeUser: 'user' }`). - * Used to resolve subscriptions during lazy loading when aliases differ. + * Flattened mapping from outer alias to innermost alias for subqueries. + * Always provides one-hop lookups, never recursive chains. + * + * Example: `{ activeUser: 'user' }` when `.from({ activeUser: subquery })` + * where the subquery uses `.from({ user: collection })`. + * + * For deeply nested subqueries, the mapping goes directly to the innermost alias: + * `{ author: 'user' }` (not `{ author: 'activeUser' }`), so `aliasRemapping[alias]` + * always resolves in a single lookup. + * + * Used to resolve subscriptions during lazy loading when join aliases differ from + * the inner aliases where collection subscriptions were created. */ aliasRemapping: Record } @@ -406,13 +416,23 @@ function processFrom( queryMapping ) - // Pull up inner alias mappings from subquery compilation + // Pull up alias mappings from subquery to parent scope. + // This includes both the innermost alias-to-collection mappings AND + // any existing remappings from nested subquery levels. Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) Object.assign(aliasRemapping, subQueryResult.aliasRemapping) - // Create remapping when outer alias differs from inner alias. - // Example: .join({ activeUser: subquery }) where subquery uses .from({ user: ... }) - // Creates: aliasRemapping['activeUser'] = 'user' + // Create a FLATTENED remapping from outer alias to innermost alias. + // For nested subqueries, this ensures one-hop lookups (not recursive chains). + // + // Example with 3-level nesting: + // Inner: .from({ user: usersCollection }) + // Middle: .from({ activeUser: innerSubquery }) → creates: activeUser → user + // Outer: .from({ author: middleSubquery }) → creates: author → user (not author → activeUser) + // + // The key insight: We search through the PULLED-UP aliasToCollectionId (which contains + // the innermost 'user' alias), so we always map directly to the deepest level. + // This means aliasRemapping[alias] is always a single lookup, never recursive. // Needed for subscription resolution during lazy loading. const innerAlias = Object.keys(subQueryResult.aliasToCollectionId).find( (alias) => diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index f1d672ab8..b8e852539 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -277,10 +277,11 @@ function processJoin( > = activePipeline.pipe( tap((data) => { // Find the subscription for lazy loading. - // For subqueries, the outer join alias (e.g., 'activeUser') may differ from the - // inner alias (e.g., 'user'). Use aliasRemapping to resolve outer → inner alias. + // Subscriptions are keyed by the innermost alias (where the collection subscription + // was actually created). For subqueries, the join alias may differ from the inner alias. + // aliasRemapping provides a flattened one-hop lookup from outer → innermost alias. // Example: .join({ activeUser: subquery }) where subquery uses .from({ user: collection }) - // → aliasRemapping['activeUser'] = 'user' + // → aliasRemapping['activeUser'] = 'user' (always maps directly to innermost, never recursive) const resolvedAlias = aliasRemapping[lazyAlias] || lazyAlias const lazySourceSubscription = subscriptions[resolvedAlias] @@ -463,12 +464,23 @@ function processJoinSource( queryMapping ) - // Pull up alias mappings from subquery + // Pull up alias mappings from subquery to parent scope. + // This includes both the innermost alias-to-collection mappings AND + // any existing remappings from nested subquery levels. Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId) Object.assign(aliasRemapping, subQueryResult.aliasRemapping) - // For subqueries, the outer alias (from.alias) may differ from inner aliases. - // Find the inner alias that corresponds to the subquery's main collection and create a remapping. + // Create a flattened remapping from outer alias to innermost alias. + // For nested subqueries, this ensures one-hop lookups (not recursive chains). + // + // Example with 3-level nesting: + // Inner: .from({ user: usersCollection }) + // Middle: .from({ activeUser: innerSubquery }) → creates: activeUser → user + // Outer: .join({ author: middleSubquery }, ...) → creates: author → user (not author → activeUser) + // + // We search through the PULLED-UP aliasToCollectionId (which contains the + // innermost 'user' alias), so we always map directly to the deepest level. + // This means aliasRemapping[lazyAlias] is always a single lookup, never recursive. const innerAlias = Object.keys(subQueryResult.aliasToCollectionId).find( (alias) => subQueryResult.aliasToCollectionId[alias] === diff --git a/packages/db/tests/query/join-subquery.test.ts b/packages/db/tests/query/join-subquery.test.ts index e2fd684e8..bb7092d63 100644 --- a/packages/db/tests/query/join-subquery.test.ts +++ b/packages/db/tests/query/join-subquery.test.ts @@ -22,6 +22,13 @@ type User = { departmentId: number | undefined } +type Profile = { + id: number + userId: number + bio: string + avatar: string +} + // Sample data const sampleIssues: Array = [ { @@ -102,6 +109,27 @@ const sampleUsers: Array = [ }, ] +const sampleProfiles: Array = [ + { + id: 1, + userId: 1, + bio: `Senior developer with 10 years experience`, + avatar: `alice.jpg`, + }, + { + id: 2, + userId: 2, + bio: `Full-stack engineer`, + avatar: `bob.jpg`, + }, + { + id: 3, + userId: 3, + bio: `Frontend specialist`, + avatar: `charlie.jpg`, + }, +] + const sampleProducts = [ { id: 1, a: `8` }, { id: 2, a: `6` }, @@ -138,6 +166,17 @@ function createUsersCollection(autoIndex: `off` | `eager` = `eager`) { ) } +function createProfilesCollection(autoIndex: `off` | `eager` = `eager`) { + return createCollection( + mockSyncCollectionOptions({ + id: `join-subquery-test-profiles`, + getKey: (profile) => profile.id, + initialData: sampleProfiles, + autoIndex, + }) + ) +} + function createProductsCollection(autoIndex: `off` | `eager` = `eager`) { return createCollection( mockSyncCollectionOptions({ @@ -602,6 +641,219 @@ function createJoinSubqueryTests(autoIndex: `off` | `eager`): void { ]) }) }) + + describe(`nested subqueries with joins (alias remapping)`, () => { + let issuesCollection: ReturnType + let usersCollection: ReturnType + let profilesCollection: ReturnType + + beforeEach(() => { + issuesCollection = createIssuesCollection(autoIndex) + usersCollection = createUsersCollection(autoIndex) + profilesCollection = createProfilesCollection(autoIndex) + }) + + test(`should handle subquery with join used in FROM clause (tests alias remapping)`, () => { + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => { + // Level 1: Subquery WITH a join (user + profile) + // This creates two inner aliases: 'user' and 'profile' + // Filter for active users at the subquery level to avoid WHERE on SELECT fields bug + const activeUsersWithProfiles = q + .from({ user: usersCollection }) + .join( + { profile: profilesCollection }, + ({ user, profile }) => eq(user.id, profile.userId), + `inner` + ) + .where(({ user }) => eq(user.status, `active`)) + .select(({ user, profile }) => ({ + userId: user.id, + userName: user.name, + userEmail: user.email, + profileBio: profile.bio, + profileAvatar: profile.avatar, + })) + + // Level 2: Use the joined subquery in FROM clause + // Outer alias: 'activeUser', inner aliases: 'user', 'profile' + // This tests that aliasRemapping['activeUser'] = 'user' (flattened to innermost) + return q + .from({ activeUser: activeUsersWithProfiles }) + .join( + { issue: issuesCollection }, + ({ activeUser, issue }) => eq(issue.userId, activeUser.userId), + `inner` + ) + .select(({ activeUser, issue }) => ({ + issue_title: issue.title, + issue_status: issue.status, + user_name: activeUser.userName, + user_email: activeUser.userEmail, + profile_bio: activeUser.profileBio, + profile_avatar: activeUser.profileAvatar, + })) + }, + }) + + const results = joinQuery.toArray + // Alice (id:1) and Bob (id:2) are active with profiles + // Their issues: 1, 3 (Alice), 2, 5 (Bob) = 4 issues total + expect(results).toHaveLength(4) + + const sortedResults = results.sort((a, b) => + a.issue_title.localeCompare(b.issue_title) + ) + + // Verify structure - should have both user data AND profile data + sortedResults.forEach((result) => { + expect(result).toHaveProperty(`issue_title`) + expect(result).toHaveProperty(`user_name`) + expect(result).toHaveProperty(`user_email`) + expect(result).toHaveProperty(`profile_bio`) + expect(result).toHaveProperty(`profile_avatar`) + }) + + // Verify Alice's issue with profile data (validates alias remapping worked) + const aliceIssue = results.find((r) => r.issue_title === `Bug 1`) + expect(aliceIssue).toMatchObject({ + user_name: `Alice`, + user_email: `alice@example.com`, + profile_bio: `Senior developer with 10 years experience`, + profile_avatar: `alice.jpg`, + }) + + // Verify Bob's issue with profile data (validates alias remapping worked) + const bobIssue = results.find((r) => r.issue_title === `Bug 2`) + expect(bobIssue).toMatchObject({ + user_name: `Bob`, + user_email: `bob@example.com`, + profile_bio: `Full-stack engineer`, + profile_avatar: `bob.jpg`, + }) + + // Charlie's issue should NOT appear (inactive user was filtered in subquery) + const charlieIssue = results.find((r) => r.issue_title === `Bug 3`) + expect(charlieIssue).toBeUndefined() + }) + + test(`should handle subquery with join used in JOIN clause (tests alias remapping)`, () => { + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => { + // Level 1: Subquery WITH a join (user + profile) + const usersWithProfiles = q + .from({ user: usersCollection }) + .join( + { profile: profilesCollection }, + ({ user, profile }) => eq(user.id, profile.userId), + `inner` + ) + .where(({ user }) => eq(user.status, `active`)) + .select(({ user, profile }) => ({ + userId: user.id, + userName: user.name, + profileBio: profile.bio, + })) + + // Level 2: Use the joined subquery in JOIN clause + // Outer alias: 'author', inner aliases: 'user', 'profile' + // This tests that aliasRemapping['author'] = 'user' for lazy loading + return q + .from({ issue: issuesCollection }) + .join( + { author: usersWithProfiles }, + ({ issue, author }) => eq(issue.userId, author.userId), + `left` + ) + .select(({ issue, author }) => ({ + issue_id: issue.id, + issue_title: issue.title, + author_name: author?.userName, + author_bio: author?.profileBio, + })) + }, + }) + + const results = joinQuery.toArray + expect(results).toHaveLength(5) // All issues + + // Active users with profiles should have author data + const withAuthors = results.filter((r) => r.author_name !== undefined) + expect(withAuthors).toHaveLength(4) // Issues 1, 2, 3, 5 (Alice and Bob) + + // Charlie (inactive) issue should have no author data + const charlieIssue = results.find((r) => r.issue_id === 4) + expect(charlieIssue).toMatchObject({ + issue_title: `Bug 3`, + author_name: undefined, + author_bio: undefined, + }) + }) + + test(`should handle deeply nested subqueries with joins (3 levels)`, () => { + const joinQuery = createLiveQueryCollection({ + startSync: true, + query: (q) => { + // Level 1: Base joined subquery (user + profile) + const usersWithProfiles = q + .from({ user: usersCollection }) + .join( + { profile: profilesCollection }, + ({ user, profile }) => eq(user.id, profile.userId), + `inner` + ) + .select(({ user, profile }) => ({ + userId: user.id, + userName: user.name, + userStatus: user.status, + profileBio: profile.bio, + })) + + // Level 2: Filter the joined subquery + const activeUsersWithProfiles = q + .from({ userProfile: usersWithProfiles }) + .where(({ userProfile }) => eq(userProfile.userStatus, `active`)) + .select(({ userProfile }) => ({ + id: userProfile.userId, + name: userProfile.userName, + bio: userProfile.profileBio, + })) + + // Level 3: Use the nested filtered joined subquery + // Outer alias: 'author', middle alias: 'userProfile', inner aliases: 'user', 'profile' + // Tests that aliasRemapping['author'] = 'user' (flattened to innermost, not 'userProfile') + return q + .from({ issue: issuesCollection }) + .join( + { author: activeUsersWithProfiles }, + ({ issue, author }) => eq(issue.userId, author.id), + `inner` + ) + .select(({ issue, author }) => ({ + issue_title: issue.title, + author_name: author.name, + author_bio: author.bio, + })) + }, + }) + + const results = joinQuery.toArray + // Only issues with active users (Alice: 1, 3 and Bob: 2, 5) + expect(results).toHaveLength(4) + + // All results should have complete author data from the joined profiles + results.forEach((result) => { + expect(result.author_name).toBeDefined() + expect(result.author_bio).toBeDefined() + expect([ + `Senior developer with 10 years experience`, + `Full-stack engineer`, + ]).toContain(result.author_bio) + }) + }) + }) }) }