Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 8, 2016
1 parent 2c92f90 commit 312cb32
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", FixedPoint(100),
RemoveLiteralFromGroupExpressions) ::
Batch("Join", FixedPoint(100),
AddFilterOfNullForInnerJoin) ::
Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
SetOperationPushDown,
Expand All @@ -72,6 +70,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
PushPredicateThroughAggregate,
LimitPushDown,
ColumnPruning,
AddNullFilterForEquiJoin,
// Operator combine
CollapseRepartition,
CollapseProject,
Expand Down Expand Up @@ -148,11 +147,11 @@ object EliminateSerialization extends Rule[LogicalPlan] {
}

/**
* Add Filter to left and right of an inner Join to filter out rows with null keys.
* Add Filter to left and right of a EquiJoin to filter out rows with null keys.
* So we may not need to check nullability of keys while joining. Besides, by filtering
* out keys with null, we can also reduce data size in Join.
*/
object AddFilterOfNullForInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
object AddNullFilterForEquiJoin extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val leftConditions = leftKeys.distinct.map { l =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.IntegerType

class AddFilterOfNullForInnerJoinSuite extends PlanTest {
class AddNullFilterForEquiJoinSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Add Filter", FixedPoint(100),
AddFilterOfNullForInnerJoin) :: Nil
AddNullFilterForEquiJoin) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down

0 comments on commit 312cb32

Please sign in to comment.