From 44e494bd1725645a2ce13321ec846098d8da6ae4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Mar 2017 04:59:32 +0000 Subject: [PATCH 1/6] Add a flag to disable constraint propagation. --- .../spark/sql/catalyst/CatalystConf.scala | 5 ++++- .../sql/catalyst/optimizer/Optimizer.scala | 17 +++++++++-------- .../spark/sql/catalyst/optimizer/joins.scala | 6 ++++-- .../BinaryComparisonSimplificationSuite.scala | 5 +++-- .../optimizer/BooleanSimplificationSuite.scala | 5 +++-- .../InferFiltersFromConstraintsSuite.scala | 3 ++- .../optimizer/OuterJoinEliminationSuite.scala | 3 ++- .../optimizer/PropagateEmptyRelationSuite.scala | 5 +++-- .../catalyst/optimizer/PruneFiltersSuite.scala | 3 ++- .../catalyst/optimizer/SetOperationSuite.scala | 3 ++- .../org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++ 11 files changed, 45 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 5f50ce1ba68ff..93ba4277fa538 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -42,6 +42,8 @@ trait CatalystConf { def sessionLocalTimeZone: String + def constraintPropagationEnabled: Boolean + /** If true, cartesian products between relations will be allowed for all * join types(inner, (left|right|full) outer). * If false, cartesian products will require explicit CROSS JOIN syntax. @@ -76,5 +78,6 @@ case class SimpleCatalystConf( crossJoinEnabled: Boolean = false, cboEnabled: Boolean = false, warehousePath: String = "/user/hive/warehouse", - sessionLocalTimeZone: String = TimeZone.getDefault().getID) + sessionLocalTimeZone: String = TimeZone.getDefault().getID, + constraintPropagationEnabled: Boolean = true) extends CatalystConf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 036da3ad2062f..291e6f5d6b60a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -81,12 +81,12 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) // Operator push down PushProjectionThroughUnion, ReorderJoin, - EliminateOuterJoin, + EliminateOuterJoin(conf), PushPredicateThroughJoin, PushDownPredicate, LimitPushDown(conf), ColumnPruning, - InferFiltersFromConstraints, + InferFiltersFromConstraints(conf), // Operator combine CollapseRepartition, CollapseProject, @@ -105,7 +105,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) SimplifyConditionals, RemoveDispensableExpressions, SimplifyBinaryComparison, - PruneFilters, + PruneFilters(conf), EliminateSorts, SimplifyCasts, SimplifyCaseConversionExpressions, @@ -606,9 +606,10 @@ object CollapseWindow extends Rule[LogicalPlan] { * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and * LeftSemi joins. */ -object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper { +case class InferFiltersFromConstraints(conf: CatalystConf) + extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case filter @ Filter(condition, child) => + case filter @ Filter(condition, child) if conf.constraintPropagationEnabled => val newFilters = filter.constraints -- (child.constraints ++ splitConjunctivePredicates(condition)) if (newFilters.nonEmpty) { @@ -617,7 +618,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe filter } - case join @ Join(left, right, joinType, conditionOpt) => + case join @ Join(left, right, joinType, conditionOpt) if conf.constraintPropagationEnabled => // Only consider constraints that can be pushed down completely to either the left or the // right child val constraints = join.constraints.filter { c => @@ -696,7 +697,7 @@ object EliminateSorts extends Rule[LogicalPlan] { * 2) by substituting a dummy empty relation when the filter will always evaluate to `false`. * 3) by eliminating the always-true conditions given the constraints on the child's output. */ -object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { +case class PruneFilters(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // If the filter condition always evaluate to true, remove the filter. case Filter(Literal(true, BooleanType), child) => child @@ -706,7 +707,7 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) // If any deterministic condition is guaranteed to be true given the constraints on the child's // output, remove the condition - case f @ Filter(fc, p: LogicalPlan) => + case f @ Filter(fc, p: LogicalPlan) if conf.constraintPropagationEnabled => val (prunedPredicates, remainingPredicates) = splitConjunctivePredicates(fc).partition { cond => cond.deterministic && p.constraints.contains(cond) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index bfe529e21e9ad..0585950369f61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ @@ -101,7 +102,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * * This rule should be executed before pushing down the Filter */ -object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { +case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { /** * Returns whether the expression returns null or false when all inputs are nulls. @@ -135,7 +136,8 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) => + case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) + if conf.constraintPropagationEnabled => val newJoinType = buildNewJoinType(f, j) if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala index a0d489681fd9f..2bfddb7bc2f35 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala @@ -30,15 +30,16 @@ import org.apache.spark.sql.catalyst.rules._ class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { + val conf = SimpleCatalystConf(caseSensitiveAnalysis = true) val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("Constant Folding", FixedPoint(50), - NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)), + NullPropagation(conf), ConstantFolding, BooleanSimplification, SimplifyBinaryComparison, - PruneFilters) :: Nil + PruneFilters(conf)) :: Nil } val nullableRelation = LocalRelation('a.int.withNullability(true)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 1b9db06014921..4d404f55aa570 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -30,14 +30,15 @@ import org.apache.spark.sql.catalyst.rules._ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { + val conf = SimpleCatalystConf(caseSensitiveAnalysis = true) val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("Constant Folding", FixedPoint(50), - NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)), + NullPropagation(conf), ConstantFolding, BooleanSimplification, - PruneFilters) :: Nil + PruneFilters(conf)) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 9f57f66a2ea20..0b94b37e53fd6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -31,7 +32,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { Batch("InferAndPushDownFilters", FixedPoint(100), PushPredicateThroughJoin, PushDownPredicate, - InferFiltersFromConstraints, + InferFiltersFromConstraints(SimpleCatalystConf(caseSensitiveAnalysis = true)), CombineFilters) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index c168a55e40c54..b3f3e587d1afb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -31,7 +32,7 @@ class OuterJoinEliminationSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Outer Join Elimination", Once, - EliminateOuterJoin, + EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true)), PushPredicateThroughJoin) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 908dde7a66988..f771e3e9eba65 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans._ @@ -33,7 +34,7 @@ class PropagateEmptyRelationSuite extends PlanTest { ReplaceExceptWithAntiJoin, ReplaceIntersectWithSemiJoin, PushDownPredicate, - PruneFilters, + PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)), PropagateEmptyRelation) :: Nil } @@ -45,7 +46,7 @@ class PropagateEmptyRelationSuite extends PlanTest { ReplaceExceptWithAntiJoin, ReplaceIntersectWithSemiJoin, PushDownPredicate, - PruneFilters) :: Nil + PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil } val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala index d8cfec5391497..9165fd30803fd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -33,7 +34,7 @@ class PruneFiltersSuite extends PlanTest { EliminateSubqueryAliases) :: Batch("Filter Pushdown and Pruning", Once, CombineFilters, - PruneFilters, + PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)), PushDownPredicate, PushPredicateThroughJoin) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala index 21b7f49e14bd5..ca4976f0d6db0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -34,7 +35,7 @@ class SetOperationSuite extends PlanTest { CombineUnions, PushProjectionThroughUnion, PushDownPredicate, - PruneFilters) :: Nil + PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 461dfe3a66e1b..d30ff531fe753 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -190,6 +190,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled") + .internal() + .doc("When true, the query optimizer will use constraint propagation in query plans to " + + "perform optimization. Constraint propagation can be computation expensive for long " + + "query plans. For such queries, disable this flag to get around this issue. Default " + + "is enabled") + .booleanConf + .createWithDefault(true) + val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema") .doc("When true, the Parquet data source merges schemas collected from all data files, " + "otherwise the schema is picked from the summary file or a random data file " + @@ -795,6 +804,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) + def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED) + def subexpressionEliminationEnabled: Boolean = getConf(SUBEXPRESSION_ELIMINATION_ENABLED) From ae9f0370f20dfb90e3abeed22258ce224f43b175 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Mar 2017 14:04:18 +0000 Subject: [PATCH 2/6] Address comments. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 +++++++++--- .../apache/spark/sql/catalyst/optimizer/joins.scala | 9 ++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 291e6f5d6b60a..8a5c1d36e011c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -608,8 +608,14 @@ object CollapseWindow extends Rule[LogicalPlan] { */ case class InferFiltersFromConstraints(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case filter @ Filter(condition, child) if conf.constraintPropagationEnabled => + def apply(plan: LogicalPlan): LogicalPlan = if (conf.constraintPropagationEnabled) { + doInferFilters(plan) + } else { + plan + } + + private def doInferFilters(plan: LogicalPlan): LogicalPlan = plan transform { + case filter @ Filter(condition, child) => val newFilters = filter.constraints -- (child.constraints ++ splitConjunctivePredicates(condition)) if (newFilters.nonEmpty) { @@ -618,7 +624,7 @@ case class InferFiltersFromConstraints(conf: CatalystConf) filter } - case join @ Join(left, right, joinType, conditionOpt) if conf.constraintPropagationEnabled => + case join @ Join(left, right, joinType, conditionOpt) => // Only consider constraints that can be pushed down completely to either the left or the // right child val constraints = join.constraints.filter { c => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 0585950369f61..b8b9846198c5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -118,7 +118,11 @@ case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with } private def buildNewJoinType(filter: Filter, join: Join): JoinType = { - val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints + val conditions = if (conf.constraintPropagationEnabled) { + splitConjunctivePredicates(filter.condition) ++ filter.constraints + } else { + splitConjunctivePredicates(filter.condition) + } val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet)) val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet)) @@ -136,8 +140,7 @@ case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) - if conf.constraintPropagationEnabled => + case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) => val newJoinType = buildNewJoinType(f, j) if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } From 8318152521371744cfc91c3ff810aa28acb5760e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Mar 2017 02:46:59 +0000 Subject: [PATCH 3/6] Let QueryPlan return empty constraints when the flag is disabled. --- .../sql/catalyst/optimizer/Optimizer.scala | 21 +++++++--------- .../spark/sql/catalyst/optimizer/joins.scala | 6 +---- .../spark/sql/catalyst/plans/QueryPlan.scala | 12 ++++++++++ .../plans/ConstraintPropagationSuite.scala | 24 +++++++++++++++++++ 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 8a5c1d36e011c..ee1daf4aa69c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -608,16 +608,10 @@ object CollapseWindow extends Rule[LogicalPlan] { */ case class InferFiltersFromConstraints(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { - def apply(plan: LogicalPlan): LogicalPlan = if (conf.constraintPropagationEnabled) { - doInferFilters(plan) - } else { - plan - } - - private def doInferFilters(plan: LogicalPlan): LogicalPlan = plan transform { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => - val newFilters = filter.constraints -- - (child.constraints ++ splitConjunctivePredicates(condition)) + val newFilters = filter.getConstraints(conf) -- + (child.getConstraints(conf) ++ splitConjunctivePredicates(condition)) if (newFilters.nonEmpty) { Filter(And(newFilters.reduce(And), condition), child) } else { @@ -627,11 +621,12 @@ case class InferFiltersFromConstraints(conf: CatalystConf) case join @ Join(left, right, joinType, conditionOpt) => // Only consider constraints that can be pushed down completely to either the left or the // right child - val constraints = join.constraints.filter { c => + val constraints = join.getConstraints(conf).filter { c => c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet) } // Remove those constraints that are already enforced by either the left or the right child - val additionalConstraints = constraints -- (left.constraints ++ right.constraints) + val additionalConstraints = constraints -- + (left.getConstraints(conf) ++ right.getConstraints(conf)) val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) @@ -713,10 +708,10 @@ case class PruneFilters(conf: CatalystConf) extends Rule[LogicalPlan] with Predi case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) // If any deterministic condition is guaranteed to be true given the constraints on the child's // output, remove the condition - case f @ Filter(fc, p: LogicalPlan) if conf.constraintPropagationEnabled => + case f @ Filter(fc, p: LogicalPlan) => val (prunedPredicates, remainingPredicates) = splitConjunctivePredicates(fc).partition { cond => - cond.deterministic && p.constraints.contains(cond) + cond.deterministic && p.getConstraints(conf).contains(cond) } if (prunedPredicates.isEmpty) { f diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index b8b9846198c5a..89d9438160512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -118,11 +118,7 @@ case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with } private def buildNewJoinType(filter: Filter, join: Join): JoinType = { - val conditions = if (conf.constraintPropagationEnabled) { - splitConjunctivePredicates(filter.condition) ++ filter.constraints - } else { - splitConjunctivePredicates(filter.condition) - } + val conditions = splitConjunctivePredicates(filter.condition) ++ filter.getConstraints(conf) val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet)) val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index a5761703fd655..d91022f1aceb1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types.{DataType, StructType} @@ -186,6 +187,17 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT */ lazy val constraints: ExpressionSet = ExpressionSet(getRelevantConstraints(validConstraints)) + /** + * Returns [[constraints]] depending on the config of enabling constraint propagation. If the + * flag is disabled, simply returning an empty constraints. + */ + private[spark] def getConstraints(conf: CatalystConf): ExpressionSet = + if (conf.constraintPropagationEnabled) { + constraints + } else { + ExpressionSet(Set.empty) + } + /** * This method can be overridden by any child class of QueryPlan to specify a set of constraints * based on the given operator's constraint propagation logic. These constraints are then diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 908b370408280..faa1c12afffc8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans import java.util.TimeZone import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -397,4 +398,27 @@ class ConstraintPropagationSuite extends SparkFunSuite { IsNotNull(resolveColumn(tr, "a")), IsNotNull(resolveColumn(tr, "c"))))) } + + test("enable/disable constraint propagation") { + val enabledConf = SimpleCatalystConf(caseSensitiveAnalysis = true, + constraintPropagationEnabled = true) + val disabledConf = SimpleCatalystConf(caseSensitiveAnalysis = true, + constraintPropagationEnabled = false) + + val tr = LocalRelation('a.int, 'b.string, 'c.int) + val filterRelation = tr.where('a.attr > 10) + + verifyConstraints( + filterRelation.analyze.getConstraints(enabledConf), + filterRelation.analyze.constraints) + + assert(filterRelation.analyze.getConstraints(disabledConf).isEmpty) + + val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5) + .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a, 'a3) + + verifyConstraints(aliasedRelation.analyze.getConstraints(enabledConf), + aliasedRelation.analyze.constraints) + assert(aliasedRelation.analyze.getConstraints(disabledConf).isEmpty) + } } From 3eda726e5694d46b1aee20d88cf4a601fd87c379 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Mar 2017 03:46:48 +0000 Subject: [PATCH 4/6] Add more tests. --- .../sql/catalyst/optimizer/Optimizer.scala | 12 ++++--- .../spark/sql/catalyst/optimizer/joins.scala | 3 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 5 ++- .../InferFiltersFromConstraintsSuite.scala | 16 +++++++++ .../optimizer/PruneFiltersSuite.scala | 36 +++++++++++++++++++ .../plans/ConstraintPropagationSuite.scala | 14 +++----- 6 files changed, 67 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ee1daf4aa69c1..01ab5412685a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -610,8 +610,9 @@ case class InferFiltersFromConstraints(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => - val newFilters = filter.getConstraints(conf) -- - (child.getConstraints(conf) ++ splitConjunctivePredicates(condition)) + val constraintEnabled = conf.constraintPropagationEnabled + val newFilters = filter.getConstraints(constraintEnabled) -- + (child.getConstraints(constraintEnabled) ++ splitConjunctivePredicates(condition)) if (newFilters.nonEmpty) { Filter(And(newFilters.reduce(And), condition), child) } else { @@ -621,12 +622,13 @@ case class InferFiltersFromConstraints(conf: CatalystConf) case join @ Join(left, right, joinType, conditionOpt) => // Only consider constraints that can be pushed down completely to either the left or the // right child - val constraints = join.getConstraints(conf).filter { c => + val constraintEnabled = conf.constraintPropagationEnabled + val constraints = join.getConstraints(constraintEnabled).filter { c => c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet) } // Remove those constraints that are already enforced by either the left or the right child val additionalConstraints = constraints -- - (left.getConstraints(conf) ++ right.getConstraints(conf)) + (left.getConstraints(constraintEnabled) ++ right.getConstraints(constraintEnabled)) val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) @@ -711,7 +713,7 @@ case class PruneFilters(conf: CatalystConf) extends Rule[LogicalPlan] with Predi case f @ Filter(fc, p: LogicalPlan) => val (prunedPredicates, remainingPredicates) = splitConjunctivePredicates(fc).partition { cond => - cond.deterministic && p.getConstraints(conf).contains(cond) + cond.deterministic && p.getConstraints(conf.constraintPropagationEnabled).contains(cond) } if (prunedPredicates.isEmpty) { f diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 89d9438160512..f7d3c1d4dbdc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -118,7 +118,8 @@ case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with } private def buildNewJoinType(filter: Filter, join: Join): JoinType = { - val conditions = splitConjunctivePredicates(filter.condition) ++ filter.getConstraints(conf) + val conditions = splitConjunctivePredicates(filter.condition) ++ + filter.getConstraints(conf.constraintPropagationEnabled) val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet)) val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index d91022f1aceb1..9fd95a4b368ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.plans -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types.{DataType, StructType} @@ -191,8 +190,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * Returns [[constraints]] depending on the config of enabling constraint propagation. If the * flag is disabled, simply returning an empty constraints. */ - private[spark] def getConstraints(conf: CatalystConf): ExpressionSet = - if (conf.constraintPropagationEnabled) { + private[spark] def getConstraints(constraintPropagationEnabled: Boolean): ExpressionSet = + if (constraintPropagationEnabled) { constraints } else { ExpressionSet(Set.empty) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 0b94b37e53fd6..83cb075ffb66e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -36,6 +36,16 @@ class InferFiltersFromConstraintsSuite extends PlanTest { CombineFilters) :: Nil } + object OptimizeDisableConstraintPropagation extends RuleExecutor[LogicalPlan] { + val batches = + Batch("InferAndPushDownFilters", FixedPoint(100), + PushPredicateThroughJoin, + PushDownPredicate, + InferFiltersFromConstraints(SimpleCatalystConf(caseSensitiveAnalysis = true, + constraintPropagationEnabled = false)), + CombineFilters) :: Nil + } + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) test("filter: filter out constraints in condition") { @@ -202,4 +212,10 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } + + test("No inferred filter when constraint propagation is disabled") { + val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze + val optimized = OptimizeDisableConstraintPropagation.execute(originalQuery) + comparePlans(optimized, originalQuery) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala index 9165fd30803fd..f1fd0758bf07e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -39,6 +39,18 @@ class PruneFiltersSuite extends PlanTest { PushPredicateThroughJoin) :: Nil } + object OptimizeDisableConstraintPropagation extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubqueryAliases) :: + Batch("Filter Pushdown and Pruning", Once, + CombineFilters, + PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true, + constraintPropagationEnabled = false)), + PushDownPredicate, + PushPredicateThroughJoin) :: Nil + } + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) test("Constraints of isNull + LeftOuter") { @@ -134,4 +146,28 @@ class PruneFiltersSuite extends PlanTest { val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze comparePlans(optimized, correctAnswer) } + + test("No pruning when constraint propagation is disabled") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + val query = tr1 + .where("tr1.a".attr > 10 || "tr1.c".attr < 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) + + val queryWithUselessFilter = + query.where( + ("tr1.a".attr > 10 || "tr1.c".attr < 10) && + 'd.attr < 100) + + val optimized = OptimizeDisableConstraintPropagation.execute(queryWithUselessFilter.analyze) + // When constraint propagation is disabled, the useless filter won't be pruned. + // It gets pushed down. Because the rule `CombineFilters` runs only once, there are redundant + // and duplicate filters. + val correctAnswer = tr1 + .where("tr1.a".attr > 10 || "tr1.c".attr < 10).where("tr1.a".attr > 10 || "tr1.c".attr < 10) + .join(tr2.where('d.attr < 100).where('d.attr < 100), + Inner, Some("tr1.a".attr === "tr2.a".attr)).analyze + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index faa1c12afffc8..4061394b862a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.plans import java.util.TimeZone import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -400,25 +399,20 @@ class ConstraintPropagationSuite extends SparkFunSuite { } test("enable/disable constraint propagation") { - val enabledConf = SimpleCatalystConf(caseSensitiveAnalysis = true, - constraintPropagationEnabled = true) - val disabledConf = SimpleCatalystConf(caseSensitiveAnalysis = true, - constraintPropagationEnabled = false) - val tr = LocalRelation('a.int, 'b.string, 'c.int) val filterRelation = tr.where('a.attr > 10) verifyConstraints( - filterRelation.analyze.getConstraints(enabledConf), + filterRelation.analyze.getConstraints(constraintPropagationEnabled = true), filterRelation.analyze.constraints) - assert(filterRelation.analyze.getConstraints(disabledConf).isEmpty) + assert(filterRelation.analyze.getConstraints(constraintPropagationEnabled = false).isEmpty) val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5) .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a, 'a3) - verifyConstraints(aliasedRelation.analyze.getConstraints(enabledConf), + verifyConstraints(aliasedRelation.analyze.getConstraints(constraintPropagationEnabled = true), aliasedRelation.analyze.constraints) - assert(aliasedRelation.analyze.getConstraints(disabledConf).isEmpty) + assert(aliasedRelation.analyze.getConstraints(constraintPropagationEnabled = false).isEmpty) } } From 0e204bc226cfa520fe76a83e233790153c776522 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 13 Mar 2017 14:30:57 +0000 Subject: [PATCH 5/6] Address comments. --- .../sql/catalyst/optimizer/Optimizer.scala | 19 ++++++++++++------- .../apache/spark/sql/internal/SQLConf.scala | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index de6fa194240c8..09bfa8bd4575a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -606,11 +606,18 @@ object CollapseWindow extends Rule[LogicalPlan] { */ case class InferFiltersFromConstraints(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { + def apply(plan: LogicalPlan): LogicalPlan = if (conf.constraintPropagationEnabled) { + inferFilters(plan) + } else { + plan + } + + + private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => val constraintEnabled = conf.constraintPropagationEnabled - val newFilters = filter.getConstraints(constraintEnabled) -- - (child.getConstraints(constraintEnabled) ++ splitConjunctivePredicates(condition)) + val newFilters = filter.constraints -- + (child.constraints ++ splitConjunctivePredicates(condition)) if (newFilters.nonEmpty) { Filter(And(newFilters.reduce(And), condition), child) } else { @@ -620,13 +627,11 @@ case class InferFiltersFromConstraints(conf: CatalystConf) case join @ Join(left, right, joinType, conditionOpt) => // Only consider constraints that can be pushed down completely to either the left or the // right child - val constraintEnabled = conf.constraintPropagationEnabled - val constraints = join.getConstraints(constraintEnabled).filter { c => + val constraints = join.constraints.filter { c => c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet) } // Remove those constraints that are already enforced by either the left or the right child - val additionalConstraints = constraints -- - (left.getConstraints(constraintEnabled) ++ right.getConstraints(constraintEnabled)) + val additionalConstraints = constraints -- (left.constraints ++ right.constraints) val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cab58ea21f693..3c964f662e88b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -193,7 +193,7 @@ object SQLConf { val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled") .internal() .doc("When true, the query optimizer will use constraint propagation in query plans to " + - "perform optimization. Constraint propagation can be computation expensive for long " + + "perform optimization. Constraint propagation can be computation expensive for large " + "query plans. For such queries, disable this flag to get around this issue. Default " + "is enabled") .booleanConf From a02c8cbf80a8977e80ce21e0db9b4c0b61753331 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 23 Mar 2017 12:01:07 +0000 Subject: [PATCH 6/6] Address comments. --- .../sql/catalyst/optimizer/Optimizer.scala | 1 - .../apache/spark/sql/internal/SQLConf.scala | 8 +++--- .../InferFiltersFromConstraintsSuite.scala | 4 +-- .../optimizer/OuterJoinEliminationSuite.scala | 27 +++++++++++++++++++ .../optimizer/PruneFiltersSuite.scala | 5 ++-- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 415a685618c72..ee7de86921496 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -626,7 +626,6 @@ case class InferFiltersFromConstraints(conf: CatalystConf) private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => - val constraintEnabled = conf.constraintPropagationEnabled val newFilters = filter.constraints -- (child.constraints ++ splitConjunctivePredicates(condition)) if (newFilters.nonEmpty) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 32efa4acf8a43..5566b06aa3553 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -189,10 +189,10 @@ object SQLConf { val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled") .internal() - .doc("When true, the query optimizer will use constraint propagation in query plans to " + - "perform optimization. Constraint propagation can be computation expensive for large " + - "query plans. For such queries, disable this flag to get around this issue. Default " + - "is enabled") + .doc("When true, the query optimizer will infer and propagate data constraints in the query " + + "plan to optimize them. Constraint propagation can sometimes be computationally expensive" + + "for certain kinds of query plans (such as those with a large number of predicates and " + + "aliases) which might negatively impact overall runtime.") .booleanConf .createWithDefault(true) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 83cb075ffb66e..98d8b897a9165 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -36,7 +36,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { CombineFilters) :: Nil } - object OptimizeDisableConstraintPropagation extends RuleExecutor[LogicalPlan] { + object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] { val batches = Batch("InferAndPushDownFilters", FixedPoint(100), PushPredicateThroughJoin, @@ -215,7 +215,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { test("No inferred filter when constraint propagation is disabled") { val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze - val optimized = OptimizeDisableConstraintPropagation.execute(originalQuery) + val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery) comparePlans(optimized, originalQuery) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index b3f3e587d1afb..cbabc1fa6d929 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -36,6 +36,16 @@ class OuterJoinEliminationSuite extends PlanTest { PushPredicateThroughJoin) :: Nil } + object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubqueryAliases) :: + Batch("Outer Join Elimination", Once, + EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true, + constraintPropagationEnabled = false)), + PushPredicateThroughJoin) :: Nil + } + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int) @@ -232,4 +242,21 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("no outer join elimination if constraint propagation is disabled") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + // The predicate "x.b + y.d >= 3" will be inferred constraints like: + // "x.b != null" and "y.d != null", if constraint propagation is enabled. + // When we disable it, the predicate can't be evaluated on left or right plan and used to + // filter out nulls. So the Outer Join will not be eliminated. + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)) + .where("x.b".attr + "y.d".attr >= 3) + + val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery.analyze) + + comparePlans(optimized, originalQuery.analyze) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala index f1fd0758bf07e..20f7f69e86c05 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -39,7 +39,7 @@ class PruneFiltersSuite extends PlanTest { PushPredicateThroughJoin) :: Nil } - object OptimizeDisableConstraintPropagation extends RuleExecutor[LogicalPlan] { + object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, EliminateSubqueryAliases) :: @@ -160,7 +160,8 @@ class PruneFiltersSuite extends PlanTest { ("tr1.a".attr > 10 || "tr1.c".attr < 10) && 'd.attr < 100) - val optimized = OptimizeDisableConstraintPropagation.execute(queryWithUselessFilter.analyze) + val optimized = + OptimizeWithConstraintPropagationDisabled.execute(queryWithUselessFilter.analyze) // When constraint propagation is disabled, the useless filter won't be pruned. // It gets pushed down. Because the rule `CombineFilters` runs only once, there are redundant // and duplicate filters.