Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
AggregateMode::Partial
) && can_combine(
(
agg_exec.group_by(),
agg_exec.group_expr(),
agg_exec.aggr_expr(),
agg_exec.filter_expr(),
),
(
input_agg_exec.group_by(),
input_agg_exec.group_expr(),
input_agg_exec.aggr_expr(),
input_agg_exec.filter_expr(),
),
Expand All @@ -88,7 +88,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
};
AggregateExec::try_new(
mode,
input_agg_exec.group_by().clone(),
input_agg_exec.group_expr().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
input_agg_exec.input().clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn get_common_requirement_of_aggregate_input(
if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
let input = aggr_exec.input();
let mut aggr_expr = try_get_updated_aggr_expr_from_child(aggr_exec);
let group_by = aggr_exec.group_by();
let group_by = aggr_exec.group_expr();
let mode = aggr_exec.mode();

let input_eq_properties = input.equivalence_properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ fn reorder_aggregate_keys(
) -> Result<PlanWithKeyRequirements> {
let parent_required = &agg_node.data;
let output_columns = agg_exec
.group_by()
.group_expr()
.expr()
.iter()
.enumerate()
Expand All @@ -474,15 +474,15 @@ fn reorder_aggregate_keys(
.collect::<Vec<_>>();

if parent_required.len() == output_exprs.len()
&& agg_exec.group_by().null_expr().is_empty()
&& agg_exec.group_expr().null_expr().is_empty()
&& !physical_exprs_equal(&output_exprs, parent_required)
{
if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) {
if let Some(agg_exec) =
agg_exec.input().as_any().downcast_ref::<AggregateExec>()
{
if matches!(agg_exec.mode(), &AggregateMode::Partial) {
let group_exprs = agg_exec.group_by().expr();
let group_exprs = agg_exec.group_expr().expr();
let new_group_exprs = positions
.into_iter()
.map(|idx| group_exprs[idx].clone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl LimitedDistinctAggregation {
// We found what we want: clone, copy the limit down, and return modified node
let new_aggr = AggregateExec::try_new(
*aggr.mode(),
aggr.group_by().clone(),
aggr.group_expr().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.input().clone(),
Expand Down Expand Up @@ -116,7 +116,7 @@ impl LimitedDistinctAggregation {
if let Some(parent_aggr) =
match_aggr.as_any().downcast_ref::<AggregateExec>()
{
if !parent_aggr.group_by().eq(aggr.group_by()) {
if !parent_aggr.group_expr().eq(aggr.group_expr()) {
// a partial and final aggregation with different groupings disqualifies
// rewriting the child aggregation
rewrite_applicable = false;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl TopKAggregation {
// We found what we want: clone, copy the limit down, and return modified node
let new_aggr = AggregateExec::try_new(
*aggr.mode(),
aggr.group_by().clone(),
aggr.group_expr().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.input().clone(),
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,17 +502,13 @@ impl AggregateExec {
}
}

pub fn group_by(&self) -> &PhysicalGroupBy {
&self.group_by
}

/// true, if this Aggregate has a group-by with no required or explicit ordering,
/// no filtering and no aggregate expressions
/// This method qualifies the use of the LimitedDistinctAggregation rewrite rule
/// on an AggregateExec.
pub fn is_unordered_unfiltered_group_by_distinct(&self) -> bool {
// ensure there is a group by
if self.group_by().is_empty() {
if self.group_expr().is_empty() {
return false;
}
// ensure there are no aggregate expressions
Expand Down