From 312cb326922624e95528b7f2dc92129c59b3b524 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Mar 2016 09:13:35 +0800 Subject: [PATCH] Address comments. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 7 +++---- ...JoinSuite.scala => AddNullFilterForEquiJoinSuite.scala} | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/{AddFilterOfNullForInnerJoinSuite.scala => AddNullFilterForEquiJoinSuite.scala} (96%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0c81ea45aea3e..8d2156a32b031 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -58,8 +58,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { ReplaceDistinctWithAggregate) :: Batch("Aggregate", FixedPoint(100), RemoveLiteralFromGroupExpressions) :: - Batch("Join", FixedPoint(100), - AddFilterOfNullForInnerJoin) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -72,6 +70,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { PushPredicateThroughAggregate, LimitPushDown, ColumnPruning, + AddNullFilterForEquiJoin, // Operator combine CollapseRepartition, CollapseProject, @@ -148,11 +147,11 @@ object EliminateSerialization extends Rule[LogicalPlan] { } /** - * Add Filter to left and right of an inner Join to filter out rows with null keys. + * Add Filter to left and right of a EquiJoin to filter out rows with null keys. * So we may not need to check nullability of keys while joining. Besides, by filtering * out keys with null, we can also reduce data size in Join. */ -object AddFilterOfNullForInnerJoin extends Rule[LogicalPlan] with PredicateHelper { +object AddNullFilterForEquiJoin extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val leftConditions = leftKeys.distinct.map { l => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AddFilterOfNullForInnerJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AddNullFilterForEquiJoinSuite.scala similarity index 96% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AddFilterOfNullForInnerJoinSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AddNullFilterForEquiJoinSuite.scala index 2bd0c46cdc448..78b0d4ab6d44e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AddFilterOfNullForInnerJoinSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AddNullFilterForEquiJoinSuite.scala @@ -27,14 +27,14 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.IntegerType -class AddFilterOfNullForInnerJoinSuite extends PlanTest { +class AddNullFilterForEquiJoinSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Add Filter", FixedPoint(100), - AddFilterOfNullForInnerJoin) :: Nil + AddNullFilterForEquiJoin) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int)