Skip to content

Commit

Permalink
[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit is…
Browse files Browse the repository at this point in the history
…NotNull checks

## What changes were proposed in this pull request?

If a filter predicate or a join condition consists of `IsNotNull` checks, we should reorder these checks such that these non-nullability checks are evaluated before the rest of the predicates.

For e.g., if a filter predicate is of the form `a > 5 && isNotNull(b)`, we should rewrite this as `isNotNull(b) && a > 5` during physical plan generation.

## How was this patch tested?

new unit tests that verify the physical plan for both filters and joins in `ReorderedPredicateSuite`

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11511 from sameeragarwal/reorder-isnotnull.
  • Loading branch information
sameeragarwal authored and yhuai committed Mar 8, 2016
1 parent 1e28840 commit e430614
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst.planning

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper}
import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode

Expand All @@ -26,8 +28,28 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]]
extends PredicateHelper with Logging {

def apply(plan: LogicalPlan): Seq[PhysicalPlan]

// Attempts to re-order the individual conjunctive predicates in an expression to short circuit
// the evaluation of relatively cheaper checks (e.g., checking for nullability) before others.
protected def reorderPredicates(expr: Expression): Expression = {
splitConjunctivePredicates(expr)
.sortWith((x, _) => x.isInstanceOf[IsNotNull])
.reduce(And)
}

// Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins
protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = {
exprOpt match {
case Some(expr) =>
Option(reorderPredicates(expr))
case None =>
exprOpt
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(
LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
joins.BroadcastLeftSemiJoinHash(
leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
leftKeys, rightKeys, planLater(left), planLater(right),
reorderPredicates(condition)) :: Nil
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
joins.LeftSemiJoinHash(
leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
leftKeys, rightKeys, planLater(left), planLater(right),
reorderPredicates(condition)) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -111,33 +113,39 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
Seq(joins.BroadcastHashJoin(
leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right)))
leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition),
planLater(left), planLater(right)))

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
Seq(joins.BroadcastHashJoin(
leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right)))
leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left),
planLater(right)))

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoin(
leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil
leftKeys, rightKeys, reorderPredicates(condition), planLater(left),
planLater(right)) :: Nil

// --- Outer joins --------------------------------------------------------------------------

case ExtractEquiJoinKeys(
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
Seq(joins.BroadcastHashJoin(
leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right)))
leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition),
planLater(left), planLater(right)))

case ExtractEquiJoinKeys(
RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
Seq(joins.BroadcastHashJoin(
leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right)))
leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition),
planLater(left), planLater(right)))

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeOuterJoin(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left),
planLater(right)) :: Nil

// --- Cases where this strategy does not apply ---------------------------------------------

Expand Down Expand Up @@ -252,10 +260,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) =>
execution.joins.BroadcastNestedLoopJoin(
planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil
planLater(left), planLater(right), joins.BuildLeft, j.joinType,
reorderPredicates(condition)) :: Nil
case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) =>
execution.joins.BroadcastNestedLoopJoin(
planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil
planLater(left), planLater(right), joins.BuildRight, j.joinType,
reorderPredicates(condition)) :: Nil
case _ => Nil
}
}
Expand All @@ -265,7 +275,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Join(left, right, Inner, None) =>
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
execution.Filter(condition,
execution.Filter(reorderPredicates(condition),
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
case _ => Nil
}
Expand All @@ -282,7 +292,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
// This join could be very slow or even hang forever
joins.BroadcastNestedLoopJoin(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
planLater(left), planLater(right), buildSide, joinType,
reorderPredicates(condition)) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -341,7 +352,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.Expand(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.joins.LeftSemiJoinHash
import org.apache.spark.sql.test.SharedSQLContext


class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper {

setupTestData()

// Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators
// and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained
private def verifyStableOrder(before: Expression, after: Expression): Unit = {
val oldPredicates = splitConjunctivePredicates(before)
splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) =>
// Verify IsNotNull operator ordering
assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull])

// Verify stable sort order
if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) ||
(!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) {
assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y))
}
}
}

test("null ordering in filter predicates") {
val query = sql(
"""
|SELECT * from testData
|WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5
""".stripMargin)
.queryExecution

val logicalPlan = query.optimizedPlan
val physicalPlan = query.sparkPlan
assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined)
assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined)

val logicalCondition = logicalPlan.collect {
case logical.Filter(condition, _) =>
condition
}.head

val physicalCondition = physicalPlan.collect {
case Filter(condition, _) =>
condition
}.head

verifyStableOrder(logicalCondition, physicalCondition)
}

test("null ordering in join predicates") {
sqlContext.cacheManager.clearCache()
val query = sql(
"""
|SELECT * FROM testData t1
|LEFT SEMI JOIN testData t2
|ON t1.key = t2.key
|AND t1.key + t2.key != 5
|AND CONCAT(t1.value, t2.value) IS NOT NULL
""".stripMargin)
.queryExecution

val logicalPlan = query.optimizedPlan
val physicalPlan = query.sparkPlan
assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined)
assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined)

val logicalCondition = logicalPlan.collect {
case Join(_, _, _, condition) =>
condition.get
}.head

val physicalCondition = physicalPlan.collect {
case LeftSemiJoinHash(_, _, _, _, conditionOpt) =>
conditionOpt.get
}.head

verifyStableOrder(logicalCondition, physicalCondition)
}
}

0 comments on commit e430614

Please sign in to comment.