Skip to content

Commit

Permalink
use partition in PushPredicateThroughProject
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jul 23, 2015
1 parent 2f5cbd8 commit 7106989
Showing 1 changed file with 8 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -553,33 +553,27 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
// Split the condition into small conditions by `And`, so that we can push down part of this
// condition without nondeterministic expressions.
val andConditions = splitConjunctivePredicates(condition)
val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap))

val (deterministic, nondeterministic) = andConditions.partition(_.collect {
case a: Attribute if aliasMap.contains(a) => aliasMap(a)
}.forall(_.deterministic))

// If there is no nondeterministic conditions, push down the whole condition.
if (nondeterministicConditions.isEmpty) {
if (nondeterministic.isEmpty) {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
} else {
// If they are all nondeterministic conditions, leave it un-changed.
if (nondeterministicConditions.length == andConditions.length) {
if (deterministic.isEmpty) {
filter
} else {
val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap))
// Push down the small conditions without nondeterministic expressions.
val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And)
Filter(nondeterministicConditions.reduce(And),
val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
Filter(nondeterministic.reduce(And),
project.copy(child = Filter(pushedCondition, grandChild)))
}
}
}

private def hasNondeterministic(
condition: Expression,
sourceAliases: AttributeMap[Expression]) = {
condition.collect {
case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
}.exists(!_.deterministic)
}

// Substitute any attributes that are produced by the child projection, so that we safely
// eliminate it.
private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = {
Expand Down

0 comments on commit 7106989

Please sign in to comment.