From a264ebac6924ae7184abe6fc75fa0dfb659b7d54 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 19 Oct 2016 04:11:54 +0000 Subject: [PATCH] Pruning unnecessary IsNotNull predicates from Filter. --- .../sql/catalyst/optimizer/Optimizer.scala | 39 ++++++++++++++----- .../optimizer/PruneFiltersSuite.scala | 13 +++++++ 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e5e2cd7d27d15..ae9cdb2d68a98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -649,6 +649,24 @@ object EliminateSorts extends Rule[LogicalPlan] { * 3) by eliminating the always-true conditions given the constraints on the child's output. */ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { + // A helper function used to prune predicates for [[Filter]]. It will partition conjunctive + // predicates according to the given partition function. The positive part of predicates will + // be pruned. + private def prunedPredicates(f: Filter, func: (Expression) => Boolean): LogicalPlan = { + val (prunedPredicates, remainingPredicates) = + splitConjunctivePredicates(f.condition).partition { cond => + func(cond) + } + if (prunedPredicates.isEmpty) { + f + } else if (remainingPredicates.isEmpty) { + f.child + } else { + val newCond = remainingPredicates.reduce(And) + Filter(newCond, f.child) + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // If the filter condition always evaluate to true, remove the filter. case Filter(Literal(true, BooleanType), child) => child @@ -659,17 +677,18 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { // If any deterministic condition is guaranteed to be true given the constraints on the child's // output, remove the condition case f @ Filter(fc, p: LogicalPlan) => - val (prunedPredicates, remainingPredicates) = - splitConjunctivePredicates(fc).partition { cond => - cond.deterministic && p.constraints.contains(cond) - } - if (prunedPredicates.isEmpty) { - f - } else if (remainingPredicates.isEmpty) { - p + val pruned = prunedPredicates(f, (cond) => { + cond.deterministic && p.constraints.contains(cond) + }) + if (pruned.isInstanceOf[Filter]) { + prunedPredicates(pruned.asInstanceOf[Filter], (cond) => { + cond match { + case IsNotNull(a) if !a.nullable => true + case _ => false + } + }) } else { - val newCond = remainingPredicates.reduce(And) - Filter(newCond, p) + pruned } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala index d8cfec5391497..84ea79f566adc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -133,4 +133,17 @@ class PruneFiltersSuite extends PlanTest { val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze comparePlans(optimized, correctAnswer) } + + test("Pruning useless IsNotNull") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val outerJoined = x.join(y, FullOuter).subquery('x) + val query = outerJoined.join(z, Inner).where("x.a = z.a").where("x.a".isNotNull) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = outerJoined.where("x.a = z.a").join(z, Inner).analyze + comparePlans(optimized, correctAnswer) + } }