Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35455][SQL] Unify empty relation optimization between normal and AQE optimizer #32602

Closed
wants to merge 23 commits into from
Closed
Expand Up @@ -26,59 +26,46 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL}

/**
* Collapse plans consisting empty local relations generated by [[PruneFilters]].
* 1. Binary(or Higher)-node Logical Plans
* - Union with all empty children.
* - Join with one or two empty children (including Intersect/Except).
* 2. Unary-node Logical Plans
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
* - Join with false condition.
* - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
* The base class of two rules in the normal and AQE Optimizer. It simplifies query plans with
* empty or non-empty relations:
* 1. Binary-node Logical Plans
* - Join with one or two empty children (including Intersect/Except).
* - Left semi Join
* Right side is non-empty and condition is empty. Eliminate join to its left side.
* - Left anti join
* Right side is non-empty and condition is empty. Eliminate join to an empty
* [[LocalRelation]].
* 2. Unary-node Logical Plans
* - Limit/Repartition with all empty children.
* - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
*/
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper with CastSupport {
private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport {
protected def isEmpty(plan: LogicalPlan): Boolean = plan match {
case p: LocalRelation => p.data.isEmpty
case _ => false
}

private def empty(plan: LogicalPlan) =
protected def nonEmpty(plan: LogicalPlan): Boolean = plan match {
case p: LocalRelation => p.data.nonEmpty
case _ => false
}

protected def empty(plan: LogicalPlan): LocalRelation =
LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)

// Construct a project list from plan's output, while the value is always NULL.
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) }

def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) {
case p: Union if p.children.exists(isEmptyLocalRelation) =>
val newChildren = p.children.filterNot(isEmptyLocalRelation)
if (newChildren.isEmpty) {
empty(p)
} else {
val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head
val outputs = newPlan.output.zip(p.output)
// the original Union may produce different output attributes than the new one so we alias
// them if needed
if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) {
newPlan
} else {
val outputAliases = outputs.map { case (newAttr, oldAttr) =>
val newExplicitMetadata =
if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None
Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata)
}
Project(outputAliases, newPlan)
}
}

protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = {
// Joins on empty LocalRelations generated from streaming sources are not eliminated
// as stateful streaming joins need to perform other state management operations other than
// just processing the input data.
case p @ Join(_, _, joinType, conditionOpt, _)
if !p.children.exists(_.isStreaming) =>
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
val isLeftEmpty = isEmptyLocalRelation(p.left)
val isRightEmpty = isEmptyLocalRelation(p.right)
val isLeftEmpty = isEmpty(p.left)
val isRightEmpty = isEmpty(p.right)
val isFalseCondition = conditionOpt match {
case Some(FalseLiteral) => true
case _ => false
Expand All @@ -103,14 +90,15 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case _ => p
}
} else if (joinType == LeftSemi && conditionOpt.isEmpty && nonEmpty(p.right)) {
p.left
} else if (joinType == LeftAnti && conditionOpt.isEmpty && nonEmpty(p.right)) {
empty(p)
} else {
p
}

case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match {
case _: Project => empty(p)
case _: Filter => empty(p)
case _: Sample => empty(p)
case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) => p match {
case _: Sort => empty(p)
case _: GlobalLimit if !p.isStreaming => empty(p)
case _: LocalLimit if !p.isStreaming => empty(p)
Expand All @@ -137,3 +125,55 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
}
}
}

/**
* This rule runs in the normal optimizer and optimizes more cases
* compared to [[PropagateEmptyRelationBase]]:
* 1. Higher-node Logical Plans
* - Union with all empty children.
* 2. Unary-node Logical Plans
* - Project/Filter/Sample with all empty children.
*
* The reason why we don't apply this rule at AQE optimizer side is: the benefit is not big enough
* and it may introduce extra exchanges.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more thought, I think this is a big performance issue if we can't propagate empty relations through project/filter which are quite common. The risk of introducing new shuffles is relatively small compared to this.

@ulysses-you can we move all the logic to PropagateEmptyRelationBase? PropagateEmptyRelation should not have any extra logic.

*/
object PropagateEmptyRelation extends PropagateEmptyRelationBase {
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
private def applyFunc: PartialFunction[LogicalPlan, LogicalPlan] = {
case p: Union if p.children.exists(isEmpty) =>
val newChildren = p.children.filterNot(isEmpty)
if (newChildren.isEmpty) {
empty(p)
} else {
val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head
val outputs = newPlan.output.zip(p.output)
// the original Union may produce different output attributes than the new one so we alias
// them if needed
if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) {
newPlan
} else {
val outputAliases = outputs.map { case (newAttr, oldAttr) =>
val newExplicitMetadata =
if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None
Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata)
}
Project(outputAliases, newPlan)
}
}

case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) && canPropagate(p) =>
empty(p)
}

// extract the pattern avoid conflict with propagateEmptyRelationAdvanced
private def canPropagate(plan: LogicalPlan): Boolean = plan match {
case _: Project => true
case _: Filter => true
case _: Sample => true
case _ => false
}

override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) {
applyFunc.orElse(commonApplyFunc)
}
}
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity, PlanHelper}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -27,7 +28,9 @@ import org.apache.spark.util.Utils
*/
class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] {
private val defaultBatches = Seq(
Batch("Eliminate Unnecessary Join", Once, EliminateUnnecessaryJoin),
Batch("Propagate Empty Relations", Once,
AQEPropagateEmptyRelation,
UpdateAttributeNullability),
ulysses-you marked this conversation as resolved.
Show resolved Hide resolved
Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin)
)

Expand Down
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase
import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys

/**
* This rule runs in the AQE optimizer and optimizes more cases
* compared to [[PropagateEmptyRelationBase]]:
* 1. Join is single column NULL-aware anti join (NAAJ)
* Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an
* empty [[LocalRelation]].
*/
object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
override protected def isEmpty(plan: LogicalPlan): Boolean =
super.isEmpty(plan) || getRowCount(plan).contains(0)

override protected def nonEmpty(plan: LogicalPlan): Boolean =
super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0)

private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match {
case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined =>
stage.getRuntimeStatistics.rowCount
case _ => None
}

private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec)
if stage.resultOption.get().isDefined =>
stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys
case _ => false
}

private def eliminateSingleColumnNullAwareAntiJoin: PartialFunction[LogicalPlan, LogicalPlan] = {
case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) =>
empty(j)
}

// TODO we need use transformUpWithPruning instead of transformUp
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , if we want to use transformUpWithPruning at AQE optimizer side, we need to some more work like add pattern at LogicalQueryStage. So this PR does not do the change, just use transformUp. Do you think it's OK ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's OK as it's not a regression. cc @sigmod

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan seems we don't need to use transformUpWithPruning here since the AQE Optimizer always run once rather than fixed point ?

eliminateSingleColumnNullAwareAntiJoin.orElse(commonApplyFunc)
}
}

This file was deleted.

Expand Up @@ -1382,7 +1382,7 @@ abstract class DynamicPartitionPruningSuiteBase
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> EliminateUnnecessaryJoin.ruleName) {
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) {
val df = sql(
"""
|SELECT * FROM fact_sk f
Expand Down