Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeMap, Expression, If, Literal, NamedExpression, Or}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -85,12 +86,24 @@ object PlanMerger {
* When `filterPropagationEnabled` is true, non-grouping [[Aggregate]]s over the same base plan
* with different [[Filter]] conditions can also be merged. The filter conditions are exposed as
* boolean [[Project]] attributes and consumed at the [[Aggregate]] as FILTER clauses.
* When both sides carry a [[Filter]] (the symmetric case), merging broadens the scan to
* OR(f1, f2), which may reduce IO pruning. This path is separately gated by
* When both sides carry a [[Filter]] (the symmetric case), merging broadens the scan to OR(f1, f2),
* which may reduce IO pruning. This path is separately gated by
* `symmetricFilterPropagationEnabled`.
* When plans also differ in intermediate [[Project]] expressions, those are wrapped with
* `If(filterAttr, expr, null)` to avoid computing the expression for rows that do not
* match that side's filter condition.
* `If(filterAttr, expr, null)` to avoid computing the expression for rows that do not match that
* side's filter condition.
* Filter propagation also works through [[Join]] nodes: a filter on one child of the join produces
* a boolean attribute that flows through the join output to the enclosing [[Aggregate]].
* Propagation is only safe when the filter originates from the non-nullable side of the join, as
* enforced by `filterSafeForJoin`. When the filter is on the nullable side, the merged base plan
* restores rows that were filtered out of the nullable child, turning what were unmatched
* NULL-padded rows in the original plan into matched rows with real column values. This changes the
* result of expressions like `coalesce(col, default)` in the aggregate: an originally unmatched row
* would have contributed `default` via `coalesce(NULL, default)`, but in the merged plan it is
* matched, its real column value fails the filter, and `FILTER (WHERE false)` discards it entirely.
* Propagation is also skipped when both the left and right children simultaneously produce filter
* attributes, as combining them would require an additional AND alias above the join (not yet
* supported).
*
* {{{
* // Input plans
Expand Down Expand Up @@ -120,7 +133,9 @@ class PlanMerger(
filterPropagationEnabled: Boolean =
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_FILTER_PROPAGATION_ENABLED),
symmetricFilterPropagationEnabled: Boolean =
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_SYMMETRIC_FILTER_PROPAGATION_ENABLED)) {
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_SYMMETRIC_FILTER_PROPAGATION_ENABLED),
filterPropagationThroughJoinEnabled: Boolean =
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_FILTER_PROPAGATION_THROUGH_JOIN_ENABLED)) {
val cache = mutable.ArrayBuffer.empty[MergedPlan]

/**
Expand Down Expand Up @@ -224,7 +239,8 @@ class PlanMerger(
* - Aggregate nodes: Combines aggregate expressions if grouping is identical and both
* support the same aggregate implementation (hash/object-hash/sort-based)
* - Filter nodes: Only if filter conditions are identical
* - Join nodes: Only if join type, hints, and conditions are identical
* - Join nodes: Requires identical join type, hints, and conditions; filter propagation is
* forwarded into the join's children so a filter difference on one child can still be merged
*
* @param newPlan The plan to merge into the cached plan.
* @param cachedPlan The cached plan to merge with.
Expand Down Expand Up @@ -416,18 +432,37 @@ class PlanMerger(
}

case (np: Join, cp: Join) if np.joinType == cp.joinType && np.hint == cp.hint =>
// Filter propagation across joins is not yet supported.
tryMergePlans(np.left, cp.left, false).flatMap {
case TryMergeResult(mergedLeft, leftNPMapping, None, None) =>
tryMergePlans(np.right, cp.right, false).flatMap {
case TryMergeResult(mergedRight, rightNPMapping, None, None) =>
tryMergePlans(np.left, cp.left, filterPropagationSupported).flatMap {
case TryMergeResult(mergedLeft, leftNPMapping, leftNPFilter, leftCPFilter) =>
tryMergePlans(np.right, cp.right, filterPropagationSupported).flatMap {
case TryMergeResult(mergedRight, rightNPMapping, rightNPFilter, rightCPFilter)
// If both children independently propagate filter attributes we would need to
// AND them into a new alias above the join, which is not yet supported.
if !(leftNPFilter.isDefined && rightNPFilter.isDefined) &&
!(leftCPFilter.isDefined && rightCPFilter.isDefined) &&
// Gate join-crossing filter propagation behind its own config flag.
// When no filter attributes are in play the merge is unconditionally safe.
(leftNPFilter.isEmpty && leftCPFilter.isEmpty &&
rightNPFilter.isEmpty && rightCPFilter.isEmpty ||
filterPropagationThroughJoinEnabled) &&
// A filter attribute is only safe to propagate through a join if it comes
// from the "preserved" (non-nullable) side. On the nullable side, unmatched
// rows are NULL-padded so f=NULL, causing FILTER (WHERE f) to incorrectly
// exclude rows that should contribute to the aggregate. Right-side
// attributes are also absent from semi/anti join output.
(leftNPFilter.isEmpty && leftCPFilter.isEmpty ||
filterSafeForJoin(fromLeft = true, cp.joinType)) &&
(rightNPFilter.isEmpty && rightCPFilter.isEmpty ||
filterSafeForJoin(fromLeft = false, cp.joinType)) =>
val npMapping = leftNPMapping ++ rightNPMapping
val mappedNPCondition = np.condition.map(mapAttributes(_, npMapping))
// Comparing the canonicalized form is required to ignore different forms of the
// same expression and `AttributeReference.qualifier`s in `cp.condition`.
if (mappedNPCondition.map(_.canonicalized) == cp.condition.map(_.canonicalized)) {
val mergedPlan = cp.withNewChildren(Seq(mergedLeft, mergedRight))
Some(TryMergeResult(mergedPlan, npMapping))
val npFilter = leftNPFilter.orElse(rightNPFilter)
val cpFilter = leftCPFilter.orElse(rightCPFilter)
Some(TryMergeResult(cp.withNewChildren(Seq(mergedLeft, mergedRight)), npMapping,
npFilter, cpFilter))
} else {
None
}
Expand All @@ -441,6 +476,35 @@ class PlanMerger(
})
}

// Returns true when a filter attribute originating from `fromLeft` child of a join with
// `joinType` can be safely propagated through that join to a parent Aggregate.
//
// Two conditions must both hold:
// 1. The attribute is in the join's output (rules out the right side of LeftSemi/LeftAnti).
// 2. The filter must originate from the non-nullable ("preserved") side of the join.
// When a filter is on the nullable side, the merged base plan no longer applies it to the
// nullable child's scan, so rows that were previously absent from that child reappear as
// matched join rows instead of unmatched NULL-padded rows. This changes aggregate
// expressions that use the NULL-padded column: e.g. for `sum(coalesce(col, default))`, an
// originally unmatched row would have contributed `default` via `coalesce(NULL, default)`,
// but in the merged plan the row is now matched with its real column value, fails the
// filter, and FILTER (WHERE false) discards it -- losing the `default` contribution
// entirely.
private def filterSafeForJoin(fromLeft: Boolean, joinType: JoinType): Boolean =
if (fromLeft) {
// Left side is never NULL-padded in: Inner, LeftOuter, LeftSemi, LeftAnti, Cross.
joinType match {
case Inner | LeftOuter | LeftSemi | LeftAnti | Cross => true
case _ => false // RightOuter and FullOuter can NULL-pad the left side
}
} else {
// Right side is never NULL-padded AND is in the join output in: Inner, RightOuter, Cross.
joinType match {
case Inner | RightOuter | Cross => true
case _ => false // LeftOuter/FullOuter can NULL-pad right; LeftSemi/LeftAnti drop right
}
}

private def mapAttributes[T <: Expression](expr: T, outputMap: AttributeMap[Attribute]) = {
expr.transform {
case a: Attribute => outputMap.getOrElse(a, a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6608,6 +6608,20 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val MERGE_SUBPLANS_FILTER_PROPAGATION_THROUGH_JOIN_ENABLED =
buildConf("spark.sql.optimizer.mergeSubplans.filterPropagation.throughJoin.enabled")
.doc("When set to true, filter attributes can propagate through Join nodes during subplan " +
"merging, allowing subplans that differ only in their filter conditions and share a " +
"common join to be merged into a single scan. A filter attribute is only propagated " +
"through a join when it originates from the non-nullable (preserved) side: the left side " +
"of LeftOuter/LeftSemi/LeftAnti, the right side of RightOuter, or either side of " +
"Inner/Cross. FullOuter joins are never eligible. " +
s"Has no effect when ${MERGE_SUBPLANS_FILTER_PROPAGATION_ENABLED.key} is false.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat")
.doc("When PRETTY, the error message consists of textual representation of error class, " +
"message and query context. Stack traces are only shown for internal errors " +
Expand Down
Loading