Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,15 @@ import org.apache.spark.util.collection.Utils
*/
object RewriteDistinctAggregates extends Rule[LogicalPlan] {
private def mustRewrite(
aggregateExpressions: Seq[AggregateExpression],
distinctAggs: Seq[AggregateExpression],
groupingExpressions: Seq[Expression]): Boolean = {
// If there are any AggregateExpressions with filter, we need to rewrite the query.
// Also, if there are no grouping expressions and all aggregate expressions are foldable,
// we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1).
aggregateExpressions.exists(_.filter.isDefined) || (groupingExpressions.isEmpty &&
aggregateExpressions.exists(_.aggregateFunction.children.forall(_.foldable)))
// If there are any distinct AggregateExpressions with filter, we need to rewrite the query.
// Also, if there are no grouping expressions and all distinct aggregate expressions are
// foldable, we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). Without this case,
// non-grouping aggregation queries with distinct aggregate expressions will be incorrectly
// handled by the aggregation strategy, causing wrong results when working with empty tables.
distinctAggs.exists(_.filter.isDefined) || (groupingExpressions.isEmpty &&
distinctAggs.exists(_.aggregateFunction.children.forall(_.foldable)))
}

private def mayNeedtoRewrite(a: Aggregate): Boolean = {
Expand All @@ -213,7 +215,6 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
// We need at least two distinct aggregates or the single distinct aggregate group exists filter
// clause for this rule because aggregation strategy can handle a single distinct aggregate
// group without filter clause.
// This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a).
distinctAggs.size > 1 || mustRewrite(distinctAggs, a.groupingExpressions)
}

Expand Down