Skip to content

perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088

Open
ydgandhi wants to merge 5 commits intoapache:mainfrom
ydgandhi:feat/multi-distinct-count-rewrite
Open

perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088
ydgandhi wants to merge 5 commits intoapache:mainfrom
ydgandhi:feat/multi-distinct-count-rewrite

Conversation

@ydgandhi
Copy link

Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics.

✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.

Which issue does this PR close?

Rationale for this change

Queries with multiple COUNT(DISTINCT col_i) in the same GROUP BY can force independent distinct state per aggregate (e.g. separate hash sets), which scales poorly in memory when several high-cardinality distinct columns appear together.

DataFusion already optimizes the single shared distinct field case via SingleDistinctToGroupBy. This PR adds a conservative logical rewrite for multiple distinct COUNT(DISTINCT …) arguments by splitting work into per-distinct branches joined on the group keys, which reduces peak memory for eligible plans.

COUNT(DISTINCT x) must ignore NULL x; the rewrite applies x IS NOT NULL on each distinct branch before inner grouping so semantics stay aligned with count_distinct behavior.

What changes are included in this PR?

New module:

  • datafusion/optimizer/src/multi_distinct_count_rewrite.rs — MultiDistinctCountRewrite (OptimizerRule, bottom-up).
  • Register rule in datafusion/optimizer/src/optimizer.rs immediately after SingleDistinctToGroupBy.
  • Export module from datafusion/optimizer/src/lib.rs.

Tests:

  • datafusion-optimizer: rewrite vs no-op cases (single distinct, mixed aggs, two distincts).
  • datafusion core_integration: datafusion/core/tests/sql/aggregates/multi_distinct_count_rewrite.rs — SQL over MemTable with NULLs.

Are these changes tested?

Yes.

  • cargo test -p datafusion-optimizer (includes new unit tests).
  • cargo test -p datafusion --test core_integration multi_count_distinct (SQL + NULLs).

Are there any user-facing changes?

  • Behavior / plans: For queries that match the rule, logical plans (and thus EXPLAIN output) can differ: eligible multi–COUNT(DISTINCT) aggregates may appear as joins of sub-aggregates instead of a single Aggregate with multiple distinct counts.

  • Results: Intended to be semantics-preserving for supported patterns (including NULL handling via filters).

  • Public API: No intentional breaking changes to public Rust APIs; this is an internal optimizer rule enabled by default.

  • Docs: No user guide update required unless maintainers want an “optimizer behavior” note; can add if requested.

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Mar 21, 2026
@Dandandan
Copy link
Contributor

Hi @ydgandhi !

This does seem the same to me as #20940 ?

Can you maybe help review that one?

@ydgandhi
Copy link
Author

ydgandhi commented Mar 21, 2026

Hi @Dandandan - thanks for this work; the cross-join split for multiple distinct aggregates with no GROUP BY is a strong fit for workloads like ClickBench extended.

I’ve been working on a related but different pattern: GROUP BY + several COUNT(DISTINCT …) in the same aggregate (typical BI). In that situation, your rule is not quite applicable, because MultiDistinctToCrossJoin needs an empty GROUP BY and all aggregates to be distinct on different columns.

Here is a concrete example from our internal benchmark suite on an ecommerce_orders table:

SELECT
    seller_name,
    COUNT(*) as total_orders,
    COUNT(DISTINCT delivery_city) as cities_served,
    COUNT(DISTINCT state) as states_served,
    SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) as completed_orders,
    SUM(CASE WHEN order_status = 'Cancelled' THEN 1 ELSE 0 END) as cancelled_orders,
    ROUND(100.0 * SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate
FROM ecommerce_orders
GROUP BY seller_name
HAVING COUNT(*) > 100
ORDER BY total_orders DESC
LIMIT 100;

This is not “global” multi-distinct: it’s per seller_name, with multiple COUNT(DISTINCT …) plus other aggregates. That’s the class my optimizer rule (MultiDistinctCountRewrite) targets — rewriting the COUNT(DISTINCT …) pieces into joinable sub-aggregates aligned on the same GROUP BY keys, with correct NULL handling where needed. For this table with 20m rows, on my M4 machine the times are 0.42s vs ~17s for the implementation in #20940

In my opinion, they’re complementary: different predicates, different plans, and they can coexist in the optimizer pipeline (we’d want to sanity-check rule order so we don’t double-rewrite the same node).

Happy to align naming, tests, and placement with you and the maintainers.

@xiedeyantu
Copy link
Member

I personally think this PR is a more generalized rewrite approach. #20940 might cover a limited range of scenarios, but it is more rigorous. Could the current PR add some test cases?

@ydgandhi
Copy link
Author

Thanks for the review. I asked cursor to add a few tests


Tests for MultiDistinctCountRewrite (what they cover)

Optimizer unit tests — datafusion/optimizer/src/multi_distinct_count_rewrite.rs

Test What it asserts
rewrites_two_count_distinct GROUP BY a + COUNT(DISTINCT b), COUNT(DISTINCT c) → inner joins, per-branch null filters on b/c, mdc_base + two mdc_d aliases.
rewrites_global_three_count_distinct No GROUP BY, three COUNT(DISTINCT …) → cross/inner join rewrite; no mdc_base (global-only path).
rewrites_two_count_distinct_with_non_distinct_count Grouped BI-style: two distincts + COUNT(a) → join rewrite with mdc_base holding the non-distinct agg.
does_not_rewrite_two_count_distinct_same_column Two COUNT(DISTINCT b) with different aliases → no rewrite (duplicate distinct key).
does_not_rewrite_single_count_distinct Only one COUNT(DISTINCT …)no rewrite (rule needs ≥2 distincts).
rewrites_three_count_distinct_grouped Three grouped COUNT(DISTINCT …) on b, c, atwo inner joins + mdc_base.
rewrites_interleaved_non_distinct_between_distincts Order COUNT(DISTINCT b), COUNT(a), COUNT(DISTINCT c) → rewrite + mdc_base for the middle non-distinct agg (projection order / interleaving).
rewrites_count_distinct_on_cast_exprs COUNT(DISTINCT CAST(b AS Int64)), same for c → rewrite + null filters on the cast expressions.
does_not_rewrite_grouping_sets_multi_distinct GROUPING SETS aggregate with two COUNT(DISTINCT …)no rewrite (rule bails on grouping sets).
does_not_rewrite_mixed_agg COUNT(DISTINCT b) + COUNT(c)no rewrite (only one COUNT(DISTINCT …); rule requires at least two).

SQL integration — datafusion/core/tests/sql/aggregates/multi_distinct_count_rewrite.rs

Test What it asserts
multi_count_distinct_matches_expected_with_nulls End-to-end grouped two COUNT(DISTINCT …) with NULLs in distinct columns; exact sorted batch string vs expected counts.
multi_count_distinct_with_count_star_matches_expected COUNT(*) plus two COUNT(DISTINCT …) per group (BI-style); exact result table.
multi_count_distinct_two_group_keys_matches_expected GROUP BY g1, g2 + two distincts; verifies joins line up on all group keys and numerics match.

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Mar 21, 2026
yash added 4 commits March 21, 2026 20:24
Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics.

✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.
…gate

✅ Omit base Aggregate when GROUP BY is empty and only COUNT(DISTINCT) branches exist (matches clickbench extended global queries).
✅ First distinct branch seeds the plan; subsequent branches join (empty keys → Cross Join in plan).
✅ Add rewrites_global_three_count_distinct unit test.

❌ Previous shape could error: Aggregate with no grouping and no aggregate expressions.
… tests

✅ Fix projection after join so output columns match the original aggregate list when COUNT(DISTINCT …) and non-distinct aggs are interleaved (schema-compatible with mixed BI-style queries).

✅ Add internal_err guard for inconsistent aggregate index mapping.

✅ Optimizer tests: three grouped COUNT(DISTINCT), non-distinct between distincts, CAST(distinct) args, no rewrite for GROUPING SETS.

✅ SQL integration: COUNT(*) + two COUNT(DISTINCT); two GROUP BY keys with expected results.

❌ Grouping-set / filtered-distinct cases remain explicitly out of scope for this rule (covered by unchanged-plan tests where applicable).

Made-with: Cursor
… rewrite

✅ End-to-end: COUNT(DISTINCT lower(b)) with 'Abc'/'aBC' plus second distinct on c (case collapse = 1).

✅ End-to-end: COUNT(DISTINCT CAST(x AS INT)) with 1.2/1.3 vs second CAST distinct on y (int collision = 1).

✅ docs: REPLY_PR_20940_DANDANDAN.md — integration table rows + note on safe distinct args (lower/upper/CAST).

❌ No optimizer or engine behavior change; asserts semantics match non-rewritten aggregation.

Made-with: Cursor
@ydgandhi ydgandhi force-pushed the feat/multi-distinct-count-rewrite branch from a7348f6 to 4321c50 Compare March 21, 2026 14:54
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 21, 2026
@Dandandan
Copy link
Contributor

I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys.
So I think it needs to have a config value (off by default) or when enabled some benchmarks showing that it is better in large majority of the cases.

I am also wondering if mostly for memory usage a GroupsAccumulator for distinct count / sum might give similar/more improvements.

@xiedeyantu
Copy link
Member

I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys. So I think it needs to have a config value (off by default) or when enabled some benchmarks showing that it is better in large majority of the cases.

I am also wondering if mostly for memory usage a GroupsAccumulator for distinct count / sum might give similar/more improvements.

@Dandandan Thank you for the explanation. It’s true that this would add a hash join, but if aggregation can be performed in parallel, there might be advantages in scenarios with two or more COUNT(DISTINCT) operations. I agree to run performance tests across multiple scenarios to evaluate the actual results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Optimize multiple COUNT(DISTINCT) memory via logical plan rewrite

3 participants