Skip to content

Commit

Permalink
[SPARK-9082] [SQL] Filter using non-deterministic expressions should …
Browse files Browse the repository at this point in the history
…not be pushed down

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #7446 from cloud-fan/filter and squashes the following commits:

330021e [Wenchen Fan] add exists to tree node
2cab68c [Wenchen Fan] more enhance
949be07 [Wenchen Fan] push down part of predicate if possible
3912f84 [Wenchen Fan] address comments
8ce15ca [Wenchen Fan] fix bug
557158e [Wenchen Fan] Filter using non-deterministic expressions should not be pushed down
  • Loading branch information
cloud-fan authored and yhuai committed Jul 22, 2015
1 parent b55a36b commit 7652095
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,20 +541,50 @@ object SimplifyFilters extends Rule[LogicalPlan] {
*
* This heuristic is valid assuming the expression evaluation cost is minimal.
*/
object PushPredicateThroughProject extends Rule[LogicalPlan] {
object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
val sourceAliases = fields.collect { case a @ Alias(c, _) =>
(a.toAttribute: Attribute) -> c
}.toMap
project.copy(child = filter.copy(
replaceAlias(condition, sourceAliases),
grandChild))
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
val aliasMap = AttributeMap(fields.collect {
case a: Alias => (a.toAttribute, a.child)
})

// Split the condition into small conditions by `And`, so that we can push down part of this
// condition without nondeterministic expressions.
val andConditions = splitConjunctivePredicates(condition)
val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap))

// If there is no nondeterministic conditions, push down the whole condition.
if (nondeterministicConditions.isEmpty) {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
} else {
// If they are all nondeterministic conditions, leave it un-changed.
if (nondeterministicConditions.length == andConditions.length) {
filter
} else {
val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap))
// Push down the small conditions without nondeterministic expressions.
val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And)
Filter(nondeterministicConditions.reduce(And),
project.copy(child = Filter(pushedCondition, grandChild)))
}
}
}

private def hasNondeterministic(
condition: Expression,
sourceAliases: AttributeMap[Expression]) = {
condition.collect {
case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
}.exists(!_.deterministic)
}

private def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]) = {
condition transform {
case a: AttributeReference => sourceAliases.getOrElse(a, a)
// Substitute any attributes that are produced by the child projection, so that we safely
// eliminate it.
private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = {
condition.transform {
case a: Attribute => sourceAliases.getOrElse(a, a)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, Explode}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -146,6 +146,49 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter through project") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 || 'a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
}

test("nondeterministic: push down part of filter through project") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 && 'a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

val correctAnswer = testRelation
.where('a > 5)
.select(Rand(10).as('rand), 'a)
.where('rand > 5)
.analyze

comparePlans(optimized, correctAnswer)
}

test("nondeterministic: push down filter through project") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('a > 5 && 'a < 10)
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer = testRelation
.where('a > 5 && 'a < 10)
.select(Rand(10).as('rand), 'a)
.analyze

comparePlans(optimized, correctAnswer)
}

test("filters: combines filters") {
val originalQuery = testRelation
.select('a)
Expand Down

0 comments on commit 7652095

Please sign in to comment.