Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Jun 5, 2020
1 parent 073efe3 commit a5f52a8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,10 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan]
left: LogicalPlan,
right: LogicalPlan,
conditionOpt: Option[Expression]): Set[Expression] = {
val baseConstraints = left.constraints.union(right.constraints)
.union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet)
val condition = conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet
val baseConstraints = left.constraints.union(right.constraints).union(condition)
baseConstraints.union(inferAdditionalConstraints(baseConstraints))
.union(inferIsNotNullConstraintsForJoinCondition(condition))
}

private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ trait ConstraintHelper {
isNotNullConstraints -- constraints
}

/**
* Infers a set of `isNotNull` constraints for non null intolerant child from null intolerant
* expressions. For e.g., if an expression is of the form (`coalesce(t1.a, t1.b) = t2.a`),
* this returns a constraint of the form `isNotNull(coalesce(t1.a, t1.b))`
*/
def inferIsNotNullConstraintsForJoinCondition(constraints: Set[Expression]): Set[Expression] = {
constraints.filter(_.isInstanceOf[NullIntolerant])
.flatMap { e =>
e.children.filter(_.references.nonEmpty).filter(c => inferIsNotNullConstraints(c).isEmpty)
}.map(IsNotNull)
}

/**
* Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
* of constraints.
Expand All @@ -113,13 +125,6 @@ trait ConstraintHelper {
// When the root is IsNotNull, we can push IsNotNull through the child null intolerant
// expressions
case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
// For join condition: CAST(coalesce(t1.a, t1.b) as DECIMAL) = CAST(t2.c AS DECIMAL).
// We can infer an additional constraint: CAST(coalesce(t1.a, t1.b) as DECIMAL) IS NOT NULL
// to avoid data skew.
case e: BinaryComparison if e.isInstanceOf[NullIntolerant] =>
e.children.filter(_.references.nonEmpty).flatMap { c =>
Option(scanNullIntolerantAttribute(c)).filter(_.nonEmpty).getOrElse(Seq(c))
}.map(IsNotNull(_))
// Constraints always return true for all the inputs. That means, null will never be returned.
// Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
// null intolerant expressions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
}
}

test("Infer IsNotNull for all children of binary comparison children") {
test("Infer IsNotNull for non null-intolerant child of null intolerant join condition") {
testConstraintsAfterJoin(
testRelation.subquery('left),
testRelation.subquery('right),
Expand All @@ -327,9 +327,8 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
Some(Coalesce(Seq("left.a".attr, "left.b".attr)) === "right.c".attr))
}

test("Should not infer IsNotNull for non-binary comparison children") {
val query = testRelation.where(Not('b.in(ListQuery(testRelation.select('a))))).analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
test("Should not infer IsNotNull for non null-intolerant child from same table") {
comparePlans(Optimize.execute(testRelation.where(Coalesce(Seq('a, 'b)) === 'c).analyze),
testRelation.where(Coalesce(Seq('a, 'b)) === 'c && IsNotNull('c)).analyze)
}
}

0 comments on commit a5f52a8

Please sign in to comment.