Skip to content

Improve SingleDistinctToGroupBy to get the same plan as the group by query #11360

@jayzhan211

Description

@jayzhan211

Is your feature request related to a problem or challenge?

While working on #11299 , I meet the issue that the single distinct plan is different from group by plan.
https://github.com/apache/datafusion/pull/11299/files#r1667248774
I solve the issue by handling different values I got in update_batach. But, I think this is not the root cause of the problem.

SingleDistinctToGroupBy is converting distinct to group by expression. Ideally the optimized plan should be the same as the group by version, but the following plan is not what I expect.

statement ok
create table t(a int) as values (1);

query TT
explain select array_agg(distinct a) from t where a > 3;
----
logical_plan
01)Projection: ARRAY_AGG(alias1) AS ARRAY_AGG(DISTINCT t.a)
02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]]
03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]]
04)------Filter: t.a > Int32(3)
05)--------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(alias1)@0 as ARRAY_AGG(DISTINCT t.a)]
02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(alias1)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(alias1)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=4
08)--------------AggregateExec: mode=Partial, gby=[a@0 as alias1], aggr=[]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: a@0 > 3
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select array_agg(a) from t where a > 3 group by a;
----
logical_plan
01)Projection: ARRAY_AGG(t.a)
02)--Aggregate: groupBy=[[t.a]], aggr=[[ARRAY_AGG(t.a)]]
03)----Filter: t.a > Int32(3)
04)------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(t.a)@1 as ARRAY_AGG(t.a)]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 > 3
09)----------------MemoryExec: partitions=1, partition_sizes=[1]

Describe the solution you'd like

Rewrite SingleDistinctToGroupBy so the optimized plan should be the same like the group by version.

02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]] // outer
03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]] // inner

I think it is possible to have just one Aggregate if outer group by expr is empty and inner aggregate expr is empty

Describe alternatives you've considered

Do nothing but add the docs about the reason of why we can't

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions