Skip to content

Commit

Permalink
Filter using non-deterministic expressions should not be pushed down
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jul 21, 2015
1 parent 936a96c commit 557158e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,18 +515,28 @@ object SimplifyFilters extends Rule[LogicalPlan] {
object PushPredicateThroughProject extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
val sourceAliases = fields.collect { case a @ Alias(c, _) =>
(a.toAttribute: Attribute) -> c
}.toMap
project.copy(child = filter.copy(
replaceAlias(condition, sourceAliases),
grandChild))
}
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> Alias(a + b, c)).
val aliasMap = AttributeMap(fields.collect {
case a: Alias => (a.toAttribute, a)
})

private def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]) = {
condition transform {
case a: AttributeReference => sourceAliases.getOrElse(a, a)
}
// We only push down filter if their overlapped expressions are all
// deterministic.
val hasNondeterministic = condition.collect {
case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
}.exists(_.find(!_.deterministic).isDefined)

if (hasNondeterministic) {
filter
} else {
// Substitute any attributes that are produced by the child projection, so that we safely
// eliminate it.
val substitutedCondition = condition.transform {
case a: Attribute => aliasMap.getOrElse(a, a)
}
project.copy(child = filter.copy(substitutedCondition, grandChild))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, Explode}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -146,6 +146,32 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter through project") {
val originalQuery = testRelation
.select(Rand(10).as('rand))
.where('rand > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
}

test("nondeterministic: push down filter through project") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer = testRelation
.where('a > 5)
.select(Rand(10).as('rand), 'a)
.analyze

comparePlans(optimized, correctAnswer)
}

test("filters: combines filters") {
val originalQuery = testRelation
.select('a)
Expand Down

0 comments on commit 557158e

Please sign in to comment.