Skip to content

Commit

Permalink
Update OuterJoinEliminationSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jun 15, 2017
1 parent a032106 commit cec78b5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
// Operator push down
PushProjectionThroughUnion,
ReorderJoin(conf),
EliminateOuterJoin(conf),
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown(conf),
ColumnPruning,
InferFiltersFromConstraints(conf),
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
Expand Down Expand Up @@ -619,14 +619,15 @@ object CollapseWindow extends Rule[LogicalPlan] {
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
case class InferFiltersFromConstraints(conf: SQLConf)
extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = if (conf.constraintPropagationEnabled) {
inferFilters(plan)
} else {
plan
}
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {

def apply(plan: LogicalPlan): LogicalPlan = {
if (SQLConf.get.constraintPropagationEnabled) {
inferFilters(plan)
} else {
plan
}
}

private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class ReorderJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHe
*
* This rule should be executed before pushing down the Filter
*/
case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper {
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {

/**
* Returns whether the expression returns null or false when all inputs are nulls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf.CONSTRAINT_PROPAGATION_ENABLED
import org.apache.spark.sql.internal.SQLConf

class InferFiltersFromConstraintsSuite extends PlanTest {

Expand All @@ -32,20 +32,11 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
Batch("InferAndPushDownFilters", FixedPoint(100),
PushPredicateThroughJoin,
PushDownPredicate,
InferFiltersFromConstraints(conf),
InferFiltersFromConstraints,
CombineFilters,
BooleanSimplification) :: Nil
}

object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
val batches =
Batch("InferAndPushDownFilters", FixedPoint(100),
PushPredicateThroughJoin,
PushDownPredicate,
InferFiltersFromConstraints(conf.copy(CONSTRAINT_PROPAGATION_ENABLED -> false)),
CombineFilters) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("filter: filter out constraints in condition") {
Expand Down Expand Up @@ -215,8 +206,13 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
}

test("No inferred filter when constraint propagation is disabled") {
val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery)
comparePlans(optimized, originalQuery)
try {
SQLConf.get.setConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED, false)
val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, originalQuery)
} finally {
SQLConf.get.unsetConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,15 @@ import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf.CONSTRAINT_PROPAGATION_ENABLED
import org.apache.spark.sql.internal.SQLConf

class OuterJoinEliminationSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Outer Join Elimination", Once,
EliminateOuterJoin(conf),
PushPredicateThroughJoin) :: Nil
}

object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Outer Join Elimination", Once,
EliminateOuterJoin(conf.copy(CONSTRAINT_PROPAGATION_ENABLED -> false)),
EliminateOuterJoin,
PushPredicateThroughJoin) :: Nil
}

Expand Down Expand Up @@ -243,19 +234,25 @@ class OuterJoinEliminationSuite extends PlanTest {
}

test("no outer join elimination if constraint propagation is disabled") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)
try {
SQLConf.get.setConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED, false)

// The predicate "x.b + y.d >= 3" will be inferred constraints like:
// "x.b != null" and "y.d != null", if constraint propagation is enabled.
// When we disable it, the predicate can't be evaluated on left or right plan and used to
// filter out nulls. So the Outer Join will not be eliminated.
val originalQuery =
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

// The predicate "x.b + y.d >= 3" will be inferred constraints like:
// "x.b != null" and "y.d != null", if constraint propagation is enabled.
// When we disable it, the predicate can't be evaluated on left or right plan and used to
// filter out nulls. So the Outer Join will not be eliminated.
val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
.where("x.b".attr + "y.d".attr >= 3)

val optimized = OptimizeWithConstraintPropagationDisabled.execute(originalQuery.analyze)
val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(optimized, originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
} finally {
SQLConf.get.unsetConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED)
}
}
}

0 comments on commit cec78b5

Please sign in to comment.