From 76a745cd167e847fc10be86935f4eff004c88b6b Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Mon, 5 Aug 2024 11:45:53 +0300 Subject: [PATCH] add valid distinct case for aggregate.slt --- .../src/replace_distinct_aggregate.rs | 9 +++--- .../sqllogictest/test_files/aggregate.slt | 28 +++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index f73eeacfbf0e..c887192f6370 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -82,10 +82,11 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { for dep in input.schema().functional_dependencies().iter() { // If distinct is exactly the same with a previous GROUP BY, we can // simply remove it: - if dep.source_indices[..field_count] - .iter() - .enumerate() - .all(|(idx, f_idx)| idx == *f_idx) + if dep.source_indices.len() >= field_count + && dep.source_indices[..field_count] + .iter() + .enumerate() + .all(|(idx, f_idx)| idx == *f_idx) { return Ok(Transformed::yes(input.as_ref().clone())); } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index abeeb767b948..6513258f879e 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -4521,6 +4521,34 @@ false true NULL +# +# Add valid distinct case as aggregation plan test +# + +query TT +EXPLAIN SELECT DISTINCT c3, min(c1) FROM aggregate_test_100 group by c3 limit 5; +---- +logical_plan +01)Limit: skip=0, fetch=5 +02)--Aggregate: groupBy=[[aggregate_test_100.c3, MIN(aggregate_test_100.c1)]], aggr=[[]] +03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[MIN(aggregate_test_100.c1)]] +04)------TableScan: aggregate_test_100 projection=[c1, c3] +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--CoalescePartitionsExec +03)----LocalLimitExec: fetch=5 +04)------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, MIN(aggregate_test_100.c1)@1 as MIN(aggregate_test_100.c1)], aggr=[], lim=[5] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([c3@0, MIN(aggregate_test_100.c1)@1], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[c3@0 as c3, MIN(aggregate_test_100.c1)@1 as MIN(aggregate_test_100.c1)], aggr=[], lim=[5] +08)--------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[MIN(aggregate_test_100.c1)] +09)----------------CoalesceBatchesExec: target_batch_size=8192 +10)------------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4 +11)--------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[MIN(aggregate_test_100.c1)] +12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +13)------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true + + # # Push limit into distinct group-by aggregation tests #