Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Apr 18, 2018
1 parent 5e41208 commit 787cddf
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ object CollapseWindow extends Rule[LogicalPlan] {
* Note: While this optimization is applicable to a lot of types of join, it primarily benefits
* Inner and LeftSemi joins.
*/
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
object InferFiltersFromConstraints extends Rule[LogicalPlan]
with PredicateHelper with ConstraintHelper {

def apply(plan: LogicalPlan): LogicalPlan = {
if (SQLConf.get.constraintPropagationEnabled) {
Expand Down Expand Up @@ -692,12 +693,12 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
conditionOpt: Option[Expression]): Set[Expression] = {
val baseConstraints = left.constraints.union(right.constraints)
.union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet)
baseConstraints.union(ConstraintsUtils.inferAdditionalConstraints(baseConstraints))
baseConstraints.union(inferAdditionalConstraints(baseConstraints))
}

private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan = {
val newPredicates = constraints
.union(ConstraintsUtils.constructIsNotNullConstraints(constraints, plan.output))
.union(constructIsNotNullConstraints(constraints, plan.output))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(plan.outputSet) && c.deterministic
} -- plan.constraints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions._


trait QueryPlanConstraints { self: LogicalPlan =>
trait QueryPlanConstraints extends ConstraintHelper { self: LogicalPlan =>

/**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
Expand All @@ -31,8 +31,8 @@ trait QueryPlanConstraints { self: LogicalPlan =>
if (conf.constraintPropagationEnabled) {
ExpressionSet(
validConstraints
.union(ConstraintsUtils.inferAdditionalConstraints(validConstraints))
.union(ConstraintsUtils.constructIsNotNullConstraints(validConstraints, output))
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints, output))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
}
Expand All @@ -53,7 +53,7 @@ trait QueryPlanConstraints { self: LogicalPlan =>
protected def validConstraints: Set[Expression] = Set.empty
}

object ConstraintsUtils {
trait ConstraintHelper {

/**
* Infers an additional set of constraints from a given set of equality constraints.
Expand Down

0 comments on commit 787cddf

Please sign in to comment.