Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .changeset/cool-beers-attend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"@tanstack/db": minor
---

Add support for compound join conditions using `and()`

Joins can now use multiple equality conditions combined with `and()`:

```
.join(
{ inventory: inventoriesCollection },
({ product, inventory }) =>
and(
eq(product.region, inventory.region),
eq(product.sku, inventory.sku)
)
)
```
5 changes: 4 additions & 1 deletion packages/db/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,10 @@ export class InvalidSourceError extends QueryBuilderError {

export class JoinConditionMustBeEqualityError extends QueryBuilderError {
constructor() {
super(`Join condition must be an equality expression`)
super(
`Join condition must be an equality expression (eq) or compound equality (and(eq, eq, ...)). ` +
`Only eq() expressions are allowed within and().`
)
}
}

Expand Down
99 changes: 82 additions & 17 deletions packages/db/src/query/builder/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ export class BaseQueryBuilder<TContext extends Context = Context> {
* query
* .from({ u: usersCollection })
* .join({ p: postsCollection }, ({u, p}) => eq(u.id, p.userId), 'inner')
*
* // Compound join on multiple fields
* query
* .from({ product: productsCollection })
* .join(
* { inventory: inventoryCollection },
* ({ product, inventory }) =>
* and(
* eq(product.region, inventory.region),
* eq(product.sku, inventory.sku)
* )
* )
*
* // Left join with compound condition
* query
* .from({ item: itemsCollection })
* .join(
* { details: detailsCollection },
* ({ item, details }) =>
* and(
* eq(item.category, details.category),
* eq(item.subcategory, details.subcategory)
* ),
* 'left'
* )
* ```
*
* // Join with a subquery
Expand Down Expand Up @@ -167,27 +192,15 @@ export class BaseQueryBuilder<TContext extends Context = Context> {
// Get the join condition expression
const onExpression = onCallback(refProxy)

// Extract left and right from the expression
// For now, we'll assume it's an eq function with two arguments
let left: BasicExpression
let right: BasicExpression

if (
onExpression.type === `func` &&
onExpression.name === `eq` &&
onExpression.args.length === 2
) {
left = onExpression.args[0]!
right = onExpression.args[1]!
} else {
throw new JoinConditionMustBeEqualityError()
}
// Extract join conditions (supports both eq() and and(eq(), ...))
const { primary, additional } = extractJoinConditions(onExpression)

const joinClause: JoinClause = {
from,
type,
left,
right,
left: primary.left,
right: primary.right,
additionalConditions: additional.length > 0 ? additional : undefined,
}

const existingJoins = this.query.join || []
Expand Down Expand Up @@ -763,6 +776,58 @@ export class BaseQueryBuilder<TContext extends Context = Context> {
}
}

/**
* Extracts join conditions from an expression.
* Accepts either:
* - eq(left, right) - single condition
* - and(eq(l1, r1), eq(l2, r2), ...) - compound condition
*
* Returns primary condition (first eq) and additional conditions (remaining eqs).
*/
function extractJoinConditions(expr: BasicExpression): {
primary: { left: BasicExpression; right: BasicExpression }
additional: Array<{ left: BasicExpression; right: BasicExpression }>
} {
// Case 1: Single eq() expression
if (expr.type === `func` && expr.name === `eq` && expr.args.length === 2) {
return {
primary: {
left: expr.args[0]!,
right: expr.args[1]!,
},
additional: [],
}
}

// Case 2: and(eq(), eq(), ...) expression
if (expr.type === `func` && expr.name === `and`) {
const conditions: Array<{ left: BasicExpression; right: BasicExpression }> =
[]

for (const arg of expr.args) {
if (arg.type !== `func` || arg.name !== `eq` || arg.args.length !== 2) {
throw new JoinConditionMustBeEqualityError()
}
conditions.push({
left: arg.args[0]!,
right: arg.args[1]!,
})
}

if (conditions.length === 0) {
throw new JoinConditionMustBeEqualityError()
}

return {
primary: conditions[0]!,
additional: conditions.slice(1),
}
}

// Case 3: Invalid expression
throw new JoinConditionMustBeEqualityError()
}

// Helper to ensure we have a BasicExpression/Aggregate for a value
function toExpr(value: any): BasicExpression | Aggregate {
if (value === undefined) return toExpression(null)
Expand Down
99 changes: 83 additions & 16 deletions packages/db/src/query/compiler/joins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,40 @@ export function processJoins(
return resultPipeline
}

/**
* Creates a join key extractor function that handles both single and composite conditions.
*
* Fast path: Single condition returns primitive value (no serialization overhead)
* Composite path: Multiple conditions serialize to JSON string for consistent hashing
*/
function createJoinKeyExtractor(
compiledConditions: Array<(namespacedRow: NamespacedRow) => any>
): (namespacedRow: NamespacedRow) => any {
// Fast path: single condition, return primitive value directly
if (compiledConditions.length === 1) {
return compiledConditions[0]!
}

// Composite path: extract all values and serialize
return (namespacedRow: NamespacedRow) => {
const parts: Array<any> = []

for (const extractor of compiledConditions) {
const value = extractor(namespacedRow)

// If any value is null/undefined, entire composite key is null
if (value == null) {
return null
}

parts.push(value)
}

// Serialize to string for consistent hashing in IVM operator
return JSON.stringify(parts)
}
}

/**
* Processes a single join clause with lazy loading optimization.
* For LEFT/RIGHT/INNER joins, marks one side as "lazy" (loads on-demand based on join keys).
Expand Down Expand Up @@ -167,24 +201,52 @@ function processJoin(
joinedCollection
)

// Analyze which source each expression refers to and swap if necessary
// Collect all condition pairs (primary + additional)
const conditionPairs: Array<{
left: BasicExpression
right: BasicExpression
}> = [
{ left: joinClause.left, right: joinClause.right },
...(joinClause.additionalConditions || []),
]

// Analyze and compile each condition pair
const availableSources = Object.keys(sources)
const { mainExpr, joinedExpr } = analyzeJoinExpressions(
joinClause.left,
joinClause.right,
availableSources,
joinedSource
)
const compiledMainExprs: Array<(row: NamespacedRow) => any> = []
const compiledJoinedExprs: Array<(row: NamespacedRow) => any> = []

// Store analyzed expressions for primary condition (used for lazy loading check)
let primaryMainExpr: BasicExpression | null = null
let primaryJoinedExpr: BasicExpression | null = null

for (let i = 0; i < conditionPairs.length; i++) {
const { left, right } = conditionPairs[i]!
const { mainExpr, joinedExpr } = analyzeJoinExpressions(
left,
right,
availableSources,
joinedSource
)

// Pre-compile the join expressions
const compiledMainExpr = compileExpression(mainExpr)
const compiledJoinedExpr = compileExpression(joinedExpr)
// Save the analyzed primary expressions for lazy loading optimization
if (i === 0) {
primaryMainExpr = mainExpr
primaryJoinedExpr = joinedExpr
}

compiledMainExprs.push(compileExpression(mainExpr))
compiledJoinedExprs.push(compileExpression(joinedExpr))
}

// Create composite key extractors (fast path for single condition)
const mainKeyExtractor = createJoinKeyExtractor(compiledMainExprs)
const joinedKeyExtractor = createJoinKeyExtractor(compiledJoinedExprs)

// Prepare the main pipeline for joining
let mainPipeline = pipeline.pipe(
map(([currentKey, namespacedRow]) => {
// Extract the join key from the main source expression
const mainKey = compiledMainExpr(namespacedRow)
const mainKey = mainKeyExtractor(namespacedRow)

// Return [joinKey, [originalKey, namespacedRow]]
return [mainKey, [currentKey, namespacedRow]] as [
Expand All @@ -201,7 +263,7 @@ function processJoin(
const namespacedRow: NamespacedRow = { [joinedSource]: row }

// Extract the join key from the joined source expression
const joinedKey = compiledJoinedExpr(namespacedRow)
const joinedKey = joinedKeyExtractor(namespacedRow)

// Return [joinKey, [originalKey, namespacedRow]]
return [joinedKey, [currentKey, namespacedRow]] as [
Expand All @@ -226,12 +288,16 @@ function processJoin(
lazyFrom.type === `queryRef` &&
(lazyFrom.query.limit || lazyFrom.query.offset)

// Use analyzed primary expressions (potentially swapped by analyzeJoinExpressions)
// If join expressions contain computed values (like concat functions)
// we don't optimize the join because we don't have an index over the computed values
const hasComputedJoinExpr =
mainExpr.type === `func` || joinedExpr.type === `func`
primaryMainExpr!.type === `func` || primaryJoinedExpr!.type === `func`

// Disable lazy loading for compound joins (multiple conditions)
const hasCompoundJoin = conditionPairs.length > 1

if (!limitedSubquery && !hasComputedJoinExpr) {
if (!limitedSubquery && !hasComputedJoinExpr && !hasCompoundJoin) {
// This join can be optimized by having the active collection
// dynamically load keys into the lazy collection
// based on the value of the joinKey and by looking up
Expand All @@ -247,10 +313,11 @@ function processJoin(
const activePipeline =
activeSource === `main` ? mainPipeline : joinedPipeline

// Use primaryJoinedExpr for lazy loading index lookup
const lazySourceJoinExpr =
activeSource === `main`
? (joinedExpr as PropRef)
: (mainExpr as PropRef)
? (primaryJoinedExpr as PropRef)
: (primaryMainExpr as PropRef)

const followRefResult = followRef(
rawQuery,
Expand Down
6 changes: 5 additions & 1 deletion packages/db/src/query/ir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ export type Join = Array<JoinClause>
export interface JoinClause {
from: CollectionRef | QueryRef
type: `left` | `right` | `inner` | `outer` | `full` | `cross`
left: BasicExpression
left: BasicExpression // Primary join condition (always present)
right: BasicExpression
additionalConditions?: Array<{
left: BasicExpression
right: BasicExpression
}>
}

export type Where =
Expand Down
6 changes: 6 additions & 0 deletions packages/db/src/query/optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,12 @@ function deepCopyQuery(query: QueryIR): QueryIR {
type: joinClause.type,
left: joinClause.left,
right: joinClause.right,
additionalConditions: joinClause.additionalConditions
? joinClause.additionalConditions.map((cond) => ({
left: cond.left,
right: cond.right,
}))
: undefined,
from:
joinClause.from.type === `collectionRef`
? new CollectionRefClass(
Expand Down
Loading