Skip to content

Commit 080e7eb

Browse files
uros-dbyaooqinn
authored andcommitted
[SPARK-49000][SQL][FOLLOWUP] Improve code style and update comments
### What changes were proposed in this pull request? Fix `RewriteDistinctAggregates` rule to deal properly with aggregation on DISTINCT literals. Physical plan for `select count(distinct 1) from t`: ``` -- count(distinct 1) == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(distinct 1)], output=[count(DISTINCT 1)#2L]) +- HashAggregate(keys=[], functions=[partial_count(distinct 1)], output=[count#6L]) +- HashAggregate(keys=[], functions=[], output=[]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20] +- HashAggregate(keys=[], functions=[], output=[]) +- FileScan parquet spark_catalog.default.t[] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/nikola.mandic/oss-spark/spark-warehouse/org.apache.spark.s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<> ``` Problem is happening when `HashAggregate(keys=[], functions=[], output=[])` node yields one row to `partial_count` node, which then captures one row. This four-node structure is constructed by `AggUtils.planAggregateWithOneDistinct`. To fix the problem, we're adding `Expand` node which will force non-empty grouping expressions in `HashAggregateExec` nodes. This will in turn enable streaming zero rows to parent `partial_count` node, yielding correct final result. ### Why are the changes needed? Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table `t`: `select count(distinct 1) from t` returns 1, while the correct result should be 0. For reference: `select count(1) from t` returns 0, which is the correct and expected result. ### Does this PR introduce _any_ user-facing change? Yes, this fixes a critical bug in Spark. ### How was this patch tested? New e2e SQL tests for aggregates with DISTINCT literals. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47565 from uros-db/SPARK-49000-followup. Authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent c248b06 commit 080e7eb

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,15 @@ import org.apache.spark.util.collection.Utils
198198
*/
199199
object RewriteDistinctAggregates extends Rule[LogicalPlan] {
200200
private def mustRewrite(
201-
aggregateExpressions: Seq[AggregateExpression],
201+
distinctAggs: Seq[AggregateExpression],
202202
groupingExpressions: Seq[Expression]): Boolean = {
203-
// If there are any AggregateExpressions with filter, we need to rewrite the query.
204-
// Also, if there are no grouping expressions and all aggregate expressions are foldable,
205-
// we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1).
206-
aggregateExpressions.exists(_.filter.isDefined) || (groupingExpressions.isEmpty &&
207-
aggregateExpressions.exists(_.aggregateFunction.children.forall(_.foldable)))
203+
// If there are any distinct AggregateExpressions with filter, we need to rewrite the query.
204+
// Also, if there are no grouping expressions and all distinct aggregate expressions are
205+
// foldable, we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). Without this case,
206+
// non-grouping aggregation queries with distinct aggregate expressions will be incorrectly
207+
// handled by the aggregation strategy, causing wrong results when working with empty tables.
208+
distinctAggs.exists(_.filter.isDefined) || (groupingExpressions.isEmpty &&
209+
distinctAggs.exists(_.aggregateFunction.children.forall(_.foldable)))
208210
}
209211

210212
private def mayNeedtoRewrite(a: Aggregate): Boolean = {
@@ -213,7 +215,6 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
213215
// We need at least two distinct aggregates or the single distinct aggregate group exists filter
214216
// clause for this rule because aggregation strategy can handle a single distinct aggregate
215217
// group without filter clause.
216-
// This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a).
217218
distinctAggs.size > 1 || mustRewrite(distinctAggs, a.groupingExpressions)
218219
}
219220

0 commit comments

Comments
 (0)