From 76520955fddbda87a5c53d0a394dedc91dce67e8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 22 Jul 2015 11:45:51 -0700 Subject: [PATCH] [SPARK-9082] [SQL] Filter using non-deterministic expressions should not be pushed down Author: Wenchen Fan Closes #7446 from cloud-fan/filter and squashes the following commits: 330021e [Wenchen Fan] add exists to tree node 2cab68c [Wenchen Fan] more enhance 949be07 [Wenchen Fan] push down part of predicate if possible 3912f84 [Wenchen Fan] address comments 8ce15ca [Wenchen Fan] fix bug 557158e [Wenchen Fan] Filter using non-deterministic expressions should not be pushed down --- .../sql/catalyst/optimizer/Optimizer.scala | 50 +++++++++++++++---- .../optimizer/FilterPushdownSuite.scala | 45 ++++++++++++++++- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e42f0b9a247e3..d2db3dd3d078e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -541,20 +541,50 @@ object SimplifyFilters extends Rule[LogicalPlan] { * * This heuristic is valid assuming the expression evaluation cost is minimal. */ -object PushPredicateThroughProject extends Rule[LogicalPlan] { +object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, project @ Project(fields, grandChild)) => - val sourceAliases = fields.collect { case a @ Alias(c, _) => - (a.toAttribute: Attribute) -> c - }.toMap - project.copy(child = filter.copy( - replaceAlias(condition, sourceAliases), - grandChild)) + // Create a map of Aliases to their values from the child projection. + // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). + val aliasMap = AttributeMap(fields.collect { + case a: Alias => (a.toAttribute, a.child) + }) + + // Split the condition into small conditions by `And`, so that we can push down part of this + // condition without nondeterministic expressions. + val andConditions = splitConjunctivePredicates(condition) + val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap)) + + // If there is no nondeterministic conditions, push down the whole condition. + if (nondeterministicConditions.isEmpty) { + project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) + } else { + // If they are all nondeterministic conditions, leave it un-changed. + if (nondeterministicConditions.length == andConditions.length) { + filter + } else { + val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap)) + // Push down the small conditions without nondeterministic expressions. + val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And) + Filter(nondeterministicConditions.reduce(And), + project.copy(child = Filter(pushedCondition, grandChild))) + } + } + } + + private def hasNondeterministic( + condition: Expression, + sourceAliases: AttributeMap[Expression]) = { + condition.collect { + case a: Attribute if sourceAliases.contains(a) => sourceAliases(a) + }.exists(!_.deterministic) } - private def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]) = { - condition transform { - case a: AttributeReference => sourceAliases.getOrElse(a, a) + // Substitute any attributes that are produced by the child projection, so that we safely + // eliminate it. + private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = { + condition.transform { + case a: Attribute => sourceAliases.getOrElse(a, a) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index dc28b3ffb59ee..0f1fde2fb0f67 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, Explode} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ @@ -146,6 +146,49 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("nondeterministic: can't push down filter through project") { + val originalQuery = testRelation + .select(Rand(10).as('rand), 'a) + .where('rand > 5 || 'a > 5) + .analyze + + val optimized = Optimize.execute(originalQuery) + + comparePlans(optimized, originalQuery) + } + + test("nondeterministic: push down part of filter through project") { + val originalQuery = testRelation + .select(Rand(10).as('rand), 'a) + .where('rand > 5 && 'a > 5) + .analyze + + val optimized = Optimize.execute(originalQuery) + + val correctAnswer = testRelation + .where('a > 5) + .select(Rand(10).as('rand), 'a) + .where('rand > 5) + .analyze + + comparePlans(optimized, correctAnswer) + } + + test("nondeterministic: push down filter through project") { + val originalQuery = testRelation + .select(Rand(10).as('rand), 'a) + .where('a > 5 && 'a < 10) + .analyze + + val optimized = Optimize.execute(originalQuery) + val correctAnswer = testRelation + .where('a > 5 && 'a < 10) + .select(Rand(10).as('rand), 'a) + .analyze + + comparePlans(optimized, correctAnswer) + } + test("filters: combines filters") { val originalQuery = testRelation .select('a)