From e430614eae53c8864b31a1dc64db83e27100d1d9 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 8 Mar 2016 15:40:45 -0800 Subject: [PATCH] [SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks ## What changes were proposed in this pull request? If a filter predicate or a join condition consists of `IsNotNull` checks, we should reorder these checks such that these non-nullability checks are evaluated before the rest of the predicates. For e.g., if a filter predicate is of the form `a > 5 && isNotNull(b)`, we should rewrite this as `isNotNull(b) && a > 5` during physical plan generation. ## How was this patch tested? new unit tests that verify the physical plan for both filters and joins in `ReorderedPredicateSuite` Author: Sameer Agarwal Closes #11511 from sameeragarwal/reorder-isnotnull. --- .../sql/catalyst/planning/QueryPlanner.scala | 24 +++- .../spark/sql/execution/SparkStrategies.scala | 37 ++++--- .../execution/ReorderedPredicateSuite.scala | 103 ++++++++++++++++++ 3 files changed, 150 insertions(+), 14 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 56a3dd02f9ba3..1e4523e2d8cf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.planning import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper} +import org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode @@ -26,8 +28,28 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * be used for execution. If this strategy does not apply to the give logical operation then an * empty list should be returned. */ -abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { +abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] + extends PredicateHelper with Logging { + def apply(plan: LogicalPlan): Seq[PhysicalPlan] + + // Attempts to re-order the individual conjunctive predicates in an expression to short circuit + // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others. + protected def reorderPredicates(expr: Expression): Expression = { + splitConjunctivePredicates(expr) + .sortWith((x, _) => x.isInstanceOf[IsNotNull]) + .reduce(And) + } + + // Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins + protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = { + exprOpt match { + case Some(expr) => + Option(reorderPredicates(expr)) + case None => + exprOpt + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index debd04aa95b9c..36fea4d20360a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -66,11 +66,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys( LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => joins.BroadcastLeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), + reorderPredicates(condition)) :: Nil // Find left semi joins where at least some predicates can be evaluated by matching join keys case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => joins.LeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), + reorderPredicates(condition)) :: Nil case _ => Nil } } @@ -111,33 +113,39 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition), + planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left), + planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeJoin( - leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil + leftKeys, rightKeys, reorderPredicates(condition), planLater(left), + planLater(right)) :: Nil // --- Outer joins -------------------------------------------------------------------------- case ExtractEquiJoinKeys( LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition), + planLater(left), planLater(right))) case ExtractEquiJoinKeys( RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition), + planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeOuterJoin( - leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil + leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left), + planLater(right)) :: Nil // --- Cases where this strategy does not apply --------------------------------------------- @@ -252,10 +260,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) => execution.joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil + planLater(left), planLater(right), joins.BuildLeft, j.joinType, + reorderPredicates(condition)) :: Nil case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) => execution.joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil + planLater(left), planLater(right), joins.BuildRight, j.joinType, + reorderPredicates(condition)) :: Nil case _ => Nil } } @@ -265,7 +275,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Join(left, right, Inner, None) => execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil case logical.Join(left, right, Inner, Some(condition)) => - execution.Filter(condition, + execution.Filter(reorderPredicates(condition), execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil case _ => Nil } @@ -282,7 +292,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } // This join could be very slow or even hang forever joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil + planLater(left), planLater(right), buildSide, joinType, + reorderPredicates(condition)) :: Nil case _ => Nil } } @@ -341,7 +352,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Project(projectList, child) => execution.Project(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => - execution.Filter(condition, planLater(child)) :: Nil + execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil case e @ logical.Expand(_, _, child) => execution.Expand(e.projections, e.output, planLater(child)) :: Nil case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala new file mode 100644 index 0000000000000..dd0e43866b80d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.execution +import org.apache.spark.sql.execution.joins.LeftSemiJoinHash +import org.apache.spark.sql.test.SharedSQLContext + + +class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper { + + setupTestData() + + // Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators + // and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained + private def verifyStableOrder(before: Expression, after: Expression): Unit = { + val oldPredicates = splitConjunctivePredicates(before) + splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) => + // Verify IsNotNull operator ordering + assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull]) + + // Verify stable sort order + if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) || + (!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) { + assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y)) + } + } + } + + test("null ordering in filter predicates") { + val query = sql( + """ + |SELECT * from testData + |WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5 + """.stripMargin) + .queryExecution + + val logicalPlan = query.optimizedPlan + val physicalPlan = query.sparkPlan + assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined) + assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined) + + val logicalCondition = logicalPlan.collect { + case logical.Filter(condition, _) => + condition + }.head + + val physicalCondition = physicalPlan.collect { + case Filter(condition, _) => + condition + }.head + + verifyStableOrder(logicalCondition, physicalCondition) + } + + test("null ordering in join predicates") { + sqlContext.cacheManager.clearCache() + val query = sql( + """ + |SELECT * FROM testData t1 + |LEFT SEMI JOIN testData t2 + |ON t1.key = t2.key + |AND t1.key + t2.key != 5 + |AND CONCAT(t1.value, t2.value) IS NOT NULL + """.stripMargin) + .queryExecution + + val logicalPlan = query.optimizedPlan + val physicalPlan = query.sparkPlan + assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined) + assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined) + + val logicalCondition = logicalPlan.collect { + case Join(_, _, _, condition) => + condition.get + }.head + + val physicalCondition = physicalPlan.collect { + case LeftSemiJoinHash(_, _, _, _, conditionOpt) => + conditionOpt.get + }.head + + verifyStableOrder(logicalCondition, physicalCondition) + } +}