From 71069896833e56fa6b053a72a208d73bddb278d2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 23 Jul 2015 14:33:50 +0800 Subject: [PATCH] use `partition` in `PushPredicateThroughProject` --- .../sql/catalyst/optimizer/Optimizer.scala | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d2db3dd3d078e..b59f800e7cc0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -553,33 +553,27 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe // Split the condition into small conditions by `And`, so that we can push down part of this // condition without nondeterministic expressions. val andConditions = splitConjunctivePredicates(condition) - val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap)) + + val (deterministic, nondeterministic) = andConditions.partition(_.collect { + case a: Attribute if aliasMap.contains(a) => aliasMap(a) + }.forall(_.deterministic)) // If there is no nondeterministic conditions, push down the whole condition. - if (nondeterministicConditions.isEmpty) { + if (nondeterministic.isEmpty) { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) } else { // If they are all nondeterministic conditions, leave it un-changed. - if (nondeterministicConditions.length == andConditions.length) { + if (deterministic.isEmpty) { filter } else { - val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap)) // Push down the small conditions without nondeterministic expressions. - val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And) - Filter(nondeterministicConditions.reduce(And), + val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And) + Filter(nondeterministic.reduce(And), project.copy(child = Filter(pushedCondition, grandChild))) } } } - private def hasNondeterministic( - condition: Expression, - sourceAliases: AttributeMap[Expression]) = { - condition.collect { - case a: Attribute if sourceAliases.contains(a) => sourceAliases(a) - }.exists(!_.deterministic) - } - // Substitute any attributes that are produced by the child projection, so that we safely // eliminate it. private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = {