diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f61de59baf79e..1a5de15c61f86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -571,14 +571,20 @@ class Analyzer( val resolvedOperator: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] def resolvedAggregateOrdering = resolvedOperator.aggregateExpressions + // Expressions that have an aggregate can be pushed down. val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) + + // Attribute references, that are missing from the order but are present in the grouping + // expressions can also be pushed down. val requiredAttributes = resolvedAggregateOrdering.map(_.references).reduce(_ ++ _) - val missingAttributes = (requiredAttributes -- aggregate.outputSet).nonEmpty + val missingAttributes = requiredAttributes -- aggregate.outputSet + val validPushdownAttributes = + missingAttributes.filter(a => grouping.exists(a.semanticEquals)) // If resolution was successful and we see the ordering either has an aggregate in it or // it is missing something that is projected away by the aggregate, add the ordering // the original aggregate operator. - if (resolvedOperator.resolved && (needsAggregate || missingAttributes)) { + if (resolvedOperator.resolved && (needsAggregate || validPushdownAttributes.nonEmpty)) { val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { case (order, evaluated) => order.copy(child = evaluated.toAttribute) }