Skip to content

Conversation

uros-db
Copy link
Contributor

@uros-db uros-db commented Aug 1, 2024

What changes were proposed in this pull request?

Fix RewriteDistinctAggregates rule to deal properly with aggregation on DISTINCT literals. Physical plan for select count(distinct 1) from t:

-- count(distinct 1)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(distinct 1)], output=[count(DISTINCT 1)#2L])
   +- HashAggregate(keys=[], functions=[partial_count(distinct 1)], output=[count#6L])
      +- HashAggregate(keys=[], functions=[], output=[])
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20]
            +- HashAggregate(keys=[], functions=[], output=[])
               +- FileScan parquet spark_catalog.default.t[] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/nikola.mandic/oss-spark/spark-warehouse/org.apache.spark.s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Problem is happening when HashAggregate(keys=[], functions=[], output=[]) node yields one row to partial_count node, which then captures one row. This four-node structure is constructed by AggUtils.planAggregateWithOneDistinct.

To fix the problem, we're adding Expand node which will force non-empty grouping expressions in HashAggregateExec nodes. This will in turn enable streaming zero rows to parent partial_count node, yielding correct final result.

Why are the changes needed?

Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table t:
select count(distinct 1) from t returns 1, while the correct result should be 0.
For reference:
select count(1) from t returns 0, which is the correct and expected result.

Does this PR introduce any user-facing change?

Yes, this fixes a critical bug in Spark.

How was this patch tested?

New e2e SQL tests for aggregates with DISTINCT literals.

Was this patch authored or co-authored using generative AI tooling?

No.

@uros-db
Copy link
Contributor Author

uros-db commented Aug 1, 2024

@cloud-fan @yaooqinn @viirya follow-up for SPARK-49000 is ready

back-ports are in separate PRs

@dongjoon-hyun
Copy link
Member

In addition, please revise the PR title into a more corresponding one instead of sharing the same title with the original PR. The current PR has identical one, [SPARK-49000][SQL][FOLLOWUP] Fix "select count(distinct 1) from t" where t is empty table by expanding RewriteDistinctAggregates.

$ git log --oneline | grep SPARK-49000
dfa21332f20 [SPARK-49000][SQL] Fix "select count(distinct 1) from t" where t is empty table by expanding RewriteDistinctAggregates

@uros-db uros-db changed the title [SPARK-49000][SQL][FOLLOWUP] Fix "select count(distinct 1) from t" where t is empty table by expanding RewriteDistinctAggregates [SPARK-49000][SQL][FOLLOWUP] Improve code style and update comments Aug 1, 2024
@uros-db uros-db requested a review from cloud-fan August 1, 2024 20:33
@yaooqinn yaooqinn closed this in 080e7eb Aug 2, 2024
@yaooqinn
Copy link
Member

yaooqinn commented Aug 2, 2024

Thank you @uros-db @dongjoon-hyun @cloud-fan

Merged to master

fusheng9399 pushed a commit to fusheng9399/spark that referenced this pull request Aug 6, 2024
### What changes were proposed in this pull request?
Fix `RewriteDistinctAggregates` rule to deal properly with aggregation on DISTINCT literals. Physical plan for `select count(distinct 1) from t`:

```
-- count(distinct 1)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(distinct 1)], output=[count(DISTINCT 1)#2L])
   +- HashAggregate(keys=[], functions=[partial_count(distinct 1)], output=[count#6L])
      +- HashAggregate(keys=[], functions=[], output=[])
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20]
            +- HashAggregate(keys=[], functions=[], output=[])
               +- FileScan parquet spark_catalog.default.t[] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/nikola.mandic/oss-spark/spark-warehouse/org.apache.spark.s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
```

Problem is happening when `HashAggregate(keys=[], functions=[], output=[])` node yields one row to `partial_count` node, which then captures one row. This four-node structure is constructed by `AggUtils.planAggregateWithOneDistinct`.

To fix the problem, we're adding `Expand` node which will force non-empty grouping expressions in `HashAggregateExec` nodes. This will in turn enable streaming zero rows to parent `partial_count` node, yielding correct final result.

### Why are the changes needed?
Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table `t`:
`select count(distinct 1) from t` returns 1, while the correct result should be 0.
For reference:
`select count(1) from t` returns 0, which is the correct and expected result.

### Does this PR introduce _any_ user-facing change?
Yes, this fixes a critical bug in Spark.

### How was this patch tested?
New e2e SQL tests for aggregates with DISTINCT literals.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47565 from uros-db/SPARK-49000-followup.

Authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com>
Signed-off-by: Kent Yao <yao@apache.org>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?
Fix `RewriteDistinctAggregates` rule to deal properly with aggregation on DISTINCT literals. Physical plan for `select count(distinct 1) from t`:

```
-- count(distinct 1)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(distinct 1)], output=[count(DISTINCT 1)#2L])
   +- HashAggregate(keys=[], functions=[partial_count(distinct 1)], output=[count#6L])
      +- HashAggregate(keys=[], functions=[], output=[])
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20]
            +- HashAggregate(keys=[], functions=[], output=[])
               +- FileScan parquet spark_catalog.default.t[] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/nikola.mandic/oss-spark/spark-warehouse/org.apache.spark.s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
```

Problem is happening when `HashAggregate(keys=[], functions=[], output=[])` node yields one row to `partial_count` node, which then captures one row. This four-node structure is constructed by `AggUtils.planAggregateWithOneDistinct`.

To fix the problem, we're adding `Expand` node which will force non-empty grouping expressions in `HashAggregateExec` nodes. This will in turn enable streaming zero rows to parent `partial_count` node, yielding correct final result.

### Why are the changes needed?
Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table `t`:
`select count(distinct 1) from t` returns 1, while the correct result should be 0.
For reference:
`select count(1) from t` returns 0, which is the correct and expected result.

### Does this PR introduce _any_ user-facing change?
Yes, this fixes a critical bug in Spark.

### How was this patch tested?
New e2e SQL tests for aggregates with DISTINCT literals.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47565 from uros-db/SPARK-49000-followup.

Authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com>
Signed-off-by: Kent Yao <yao@apache.org>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
### What changes were proposed in this pull request?
Fix `RewriteDistinctAggregates` rule to deal properly with aggregation on DISTINCT literals. Physical plan for `select count(distinct 1) from t`:

```
-- count(distinct 1)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(distinct 1)], output=[count(DISTINCT 1)#2L])
   +- HashAggregate(keys=[], functions=[partial_count(distinct 1)], output=[count#6L])
      +- HashAggregate(keys=[], functions=[], output=[])
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20]
            +- HashAggregate(keys=[], functions=[], output=[])
               +- FileScan parquet spark_catalog.default.t[] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/nikola.mandic/oss-spark/spark-warehouse/org.apache.spark.s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
```

Problem is happening when `HashAggregate(keys=[], functions=[], output=[])` node yields one row to `partial_count` node, which then captures one row. This four-node structure is constructed by `AggUtils.planAggregateWithOneDistinct`.

To fix the problem, we're adding `Expand` node which will force non-empty grouping expressions in `HashAggregateExec` nodes. This will in turn enable streaming zero rows to parent `partial_count` node, yielding correct final result.

### Why are the changes needed?
Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table `t`:
`select count(distinct 1) from t` returns 1, while the correct result should be 0.
For reference:
`select count(1) from t` returns 0, which is the correct and expected result.

### Does this PR introduce _any_ user-facing change?
Yes, this fixes a critical bug in Spark.

### How was this patch tested?
New e2e SQL tests for aggregates with DISTINCT literals.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47565 from uros-db/SPARK-49000-followup.

Authored-by: Uros Bojanic <157381213+uros-db@users.noreply.github.com>
Signed-off-by: Kent Yao <yao@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants