Skip to content

Commit

Permalink
use allConstraints
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Mar 10, 2018
1 parent 45fbb85 commit d8a1190
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,6 @@ trait Predicate extends Expression {
override def dataType: DataType = BooleanType
}

trait NotNullConstraintHelper {
/**
* Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
* of constraints.
*/
protected def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
constraint match {
// When the root is IsNotNull, we can push IsNotNull through the child null intolerant
// expressions
case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
// Constraints always return true for all the inputs. That means, null will never be returned.
// Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
// null intolerant expressions.
case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
}

/**
* Recursively explores the expressions which are null intolerant and returns all attributes
* in these expressions.
*/
protected def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
case a: Attribute => Seq(a)
case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
case _ => Seq.empty[Attribute]
}
}

trait PredicateHelper {
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,8 +638,7 @@ object CollapseWindow extends Rule[LogicalPlan] {
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper
with NotNullConstraintHelper {
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {

def apply(plan: LogicalPlan): LogicalPlan = {
if (SQLConf.get.constraintPropagationEnabled) {
Expand All @@ -664,7 +663,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
// right child
val constraints = join.allConstraints.filter { c =>
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
} ++ extraJoinConstraints(join).toSet
}
// Remove those constraints that are already enforced by either the left or the right child
val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
val newConditionOpt = conditionOpt match {
Expand All @@ -676,22 +675,6 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
}
if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
}

/**
* Returns additional constraints which are not enforced on the result of join operations, but
* which can be enforced either on the left or the right side
*/
def extraJoinConstraints(join: Join): Seq[Expression] = {
join match {
case Join(_, right, LeftAnti | LeftOuter, condition) if condition.isDefined =>
splitConjunctivePredicates(condition.get).flatMap(inferIsNotNullConstraints).filter(
_.references.subsetOf(right.outputSet))
case Join(left, _, RightOuter, condition) if condition.isDefined =>
splitConjunctivePredicates(condition.get).flatMap(inferIsNotNullConstraints).filter(
_.references.subsetOf(left.outputSet))
case _ => Seq.empty[Expression]
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,13 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions._


trait QueryPlanConstraints extends NotNullConstraintHelper { self: LogicalPlan =>
trait QueryPlanConstraints { self: LogicalPlan =>

/**
* An [[ExpressionSet]] that contains an additional set of constraints, such as equality
* constraints and `isNotNull` constraints, etc.
*/
lazy val allConstraints: ExpressionSet = {
if (conf.constraintPropagationEnabled) {
ExpressionSet(validConstraints
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints)))
} else {
ExpressionSet(Set.empty)
}
}
lazy val allConstraints: ExpressionSet = ExpressionSet(constructAllConstraints)

/**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
Expand All @@ -55,6 +47,20 @@ trait QueryPlanConstraints extends NotNullConstraintHelper { self: LogicalPlan =
*/
protected def validConstraints: Set[Expression] = Set.empty

/**
* Returns the [[Expression]]s representing all the constraints which can be enforced on the
* current operator.
*/
protected def constructAllConstraints: Set[Expression] = {
if (conf.constraintPropagationEnabled) {
validConstraints
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints))
} else {
Set.empty
}
}

/**
* Infers a set of `isNotNull` constraints from null intolerant expressions as well as
* non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
Expand All @@ -72,6 +78,31 @@ trait QueryPlanConstraints extends NotNullConstraintHelper { self: LogicalPlan =
isNotNullConstraints -- constraints
}

/**
* Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
* of constraints.
*/
protected def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
constraint match {
// When the root is IsNotNull, we can push IsNotNull thro0ugh the child null intolerant
// expressions
case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
// Constraints always return true for all the inputs. That means, null will never be returned.
// Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
// null intolerant expressions.
case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
}

/**
* Recursively explores the expressions which are null intolerant and returns all attributes
* in these expressions.
*/
protected def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
case a: Attribute => Seq(a)
case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
case _ => Seq.empty[Attribute]
}

/**
* Infers an additional set of constraints from a given set of equality constraints.
* For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,28 @@ case class Join(
case UsingJoin(_, _) => false
case _ => resolvedExceptNatural
}

/**
* Returns additional constraints which are not enforced on the result of join operations, but
* which can be enforced either on the left or the right side
*/
override protected def constructAllConstraints: Set[Expression] = {
val additionalConstraints = joinType match {
case LeftAnti | LeftOuter if condition.isDefined =>
splitConjunctivePredicates(condition.get).flatMap(inferIsNotNullConstraints).filter(
_.references.subsetOf(right.outputSet))
case RightOuter if condition.isDefined =>
splitConjunctivePredicates(condition.get).flatMap(inferIsNotNullConstraints).filter(
_.references.subsetOf(left.outputSet))
case _ => Seq.empty[Expression]
}
super.constructAllConstraints ++ additionalConstraints
}

override lazy val constraints: ExpressionSet = ExpressionSet(
super.constructAllConstraints.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
})
}

/**
Expand Down

0 comments on commit d8a1190

Please sign in to comment.