From f1029aa0bbd5409bfd00cc02cc03609f73b883a6 Mon Sep 17 00:00:00 2001 From: Uros Bojanic <157381213+uros-db@users.noreply.github.com> Date: Thu, 1 Aug 2024 09:11:07 +0200 Subject: [PATCH 1/3] Small fixes --- .../catalyst/optimizer/RewriteDistinctAggregates.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index e91493188873e..cc2405f6d40c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -198,13 +198,13 @@ import org.apache.spark.util.collection.Utils */ object RewriteDistinctAggregates extends Rule[LogicalPlan] { private def mustRewrite( - aggregateExpressions: Seq[AggregateExpression], + distinctAggs: Seq[AggregateExpression], groupingExpressions: Seq[Expression]): Boolean = { - // If there are any AggregateExpressions with filter, we need to rewrite the query. + // If there are any distinct AggregateExpressions with filter, we need to rewrite the query. // Also, if there are no grouping expressions and all aggregate expressions are foldable, // we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). - aggregateExpressions.exists(_.filter.isDefined) || (groupingExpressions.isEmpty && - aggregateExpressions.exists(_.aggregateFunction.children.forall(_.foldable))) + distinctAggs.exists(_.filter.isDefined) || (groupingExpressions.isEmpty && + distinctAggs.exists(_.aggregateFunction.children.forall(_.foldable))) } private def mayNeedtoRewrite(a: Aggregate): Boolean = { @@ -213,7 +213,6 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // We need at least two distinct aggregates or the single distinct aggregate group exists filter // clause for this rule because aggregation strategy can handle a single distinct aggregate // group without filter clause. - // This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a). distinctAggs.size > 1 || mustRewrite(distinctAggs, a.groupingExpressions) } From 4de8c159aa6762eced20b421a7eb8055586e6a4a Mon Sep 17 00:00:00 2001 From: Uros Bojanic <157381213+uros-db@users.noreply.github.com> Date: Thu, 1 Aug 2024 09:17:45 +0200 Subject: [PATCH 2/3] Update comment --- .../sql/catalyst/optimizer/RewriteDistinctAggregates.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index cc2405f6d40c0..24947e5fe40e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -202,7 +202,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { groupingExpressions: Seq[Expression]): Boolean = { // If there are any distinct AggregateExpressions with filter, we need to rewrite the query. // Also, if there are no grouping expressions and all aggregate expressions are foldable, - // we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). + // we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). Without this condition, + // non-grouping aggregation queries with distinct aggregate expressions will be incorrectly + // handled by the aggregation strategy, causing wrong results when working with empty tables. distinctAggs.exists(_.filter.isDefined) || (groupingExpressions.isEmpty && distinctAggs.exists(_.aggregateFunction.children.forall(_.foldable))) } From 9fb64290d4283aa2d060a739949ef35d396223f8 Mon Sep 17 00:00:00 2001 From: Uros Bojanic <157381213+uros-db@users.noreply.github.com> Date: Thu, 1 Aug 2024 16:39:52 +0200 Subject: [PATCH 3/3] Update RewriteDistinctAggregates.scala --- .../sql/catalyst/optimizer/RewriteDistinctAggregates.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 24947e5fe40e4..801bd2693af42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -201,8 +201,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { distinctAggs: Seq[AggregateExpression], groupingExpressions: Seq[Expression]): Boolean = { // If there are any distinct AggregateExpressions with filter, we need to rewrite the query. - // Also, if there are no grouping expressions and all aggregate expressions are foldable, - // we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). Without this condition, + // Also, if there are no grouping expressions and all distinct aggregate expressions are + // foldable, we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). Without this case, // non-grouping aggregation queries with distinct aggregate expressions will be incorrectly // handled by the aggregation strategy, causing wrong results when working with empty tables. distinctAggs.exists(_.filter.isDefined) || (groupingExpressions.isEmpty &&