Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,12 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Distinct(Distinct {input}) => {
// Convert distinct to groupby with no aggregations
let group_expr = expand_wildcard(input.schema(), input)?;
let aggregate = LogicalPlan::Aggregate(Aggregate {
input: input.clone(),
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
group_expr,
aggr_expr: vec![],
schema: input.schema().clone()
}
vec![],
input.schema().clone()
)?
);
Ok(self.create_initial_plan(&aggregate, session_state).await?)
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,12 +701,12 @@ impl LogicalPlanBuilder {
exprlist_to_fields(all_expr, &self.plan)?,
self.plan.schema().metadata().clone(),
)?;
Ok(Self::from(LogicalPlan::Aggregate(Aggregate {
input: Arc::new(self.plan.clone()),
Ok(Self::from(LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(self.plan.clone()),
group_expr,
aggr_expr,
schema: DFSchemaRef::new(aggr_schema),
})))
DFSchemaRef::new(aggr_schema),
)?)))
}

/// Create an expression to represent the explanation of the plan
Expand Down
30 changes: 29 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::utils::exprlist_to_fields;
use crate::utils::{exprlist_to_fields, grouping_set_expr_count};
use crate::{Expr, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError};
Expand Down Expand Up @@ -1314,6 +1314,34 @@ pub struct Aggregate {
}

impl Aggregate {
pub fn try_new(
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
schema: DFSchemaRef,
) -> datafusion_common::Result<Self> {
if group_expr.is_empty() && aggr_expr.is_empty() {
return Err(DataFusionError::Plan(
"Aggregate requires at least one grouping or aggregate expression"
.to_string(),
));
}
let group_expr_count = grouping_set_expr_count(&group_expr)?;
if schema.fields().len() != group_expr_count + aggr_expr.len() {
return Err(DataFusionError::Plan(format!(
"Aggregate schema has wrong number of fields. Expected {} got {}",
group_expr_count + aggr_expr.len(),
schema.fields().len()
)));
}
Ok(Self {
input,
group_expr,
aggr_expr,
schema,
})
}

pub fn try_from_plan(plan: &LogicalPlan) -> datafusion_common::Result<&Aggregate> {
match plan {
LogicalPlan::Aggregate(it) => Ok(it),
Expand Down
28 changes: 22 additions & 6 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ pub fn exprlist_to_columns(expr: &[Expr], accum: &mut HashSet<Column>) -> Result
Ok(())
}

/// Count the number of distinct exprs in a list of group by expressions. If the
/// first element is a `GroupingSet` expression then it must be the only expr.
pub fn grouping_set_expr_count(group_expr: &[Expr]) -> Result<usize> {
if let Some(Expr::GroupingSet(grouping_set)) = group_expr.first() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that this function takes either a GroupingSet expression, or an arbitrary expression. I dealt with this a lot in the subquery decorrelation - it almost feels like dynamic typing. I think the root cause might be that many of the enum values have fields inside them, rather than a struct - so it's impossible to de-reference them and pass them between functions.

How would you feel about blanket converting all enum values like InSubquery, Exists, etc to point to structs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found this frustrating to work with. Maybe we could introduce another enum?

pub enum Grouping {
    Expr(Vec<Expr>),
    Set(GroupingSet)
}

and then

pub struct Aggregate {
    pub input: Arc<LogicalPlan>,
    pub group_expr: Grouping,
    pub aggr_expr: Vec<Expr>,
    pub schema: DFSchemaRef,
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you feel about blanket converting all enum values like InSubquery, Exists, etc to point to structs?

Sounds good to me. This would be consistent with how we do things in the LogicalPlan.

if group_expr.len() > 1 {
return Err(DataFusionError::Plan(
"Invalid group by expressions, GroupingSet must be the only expression"
.to_string(),
));
}
Ok(grouping_set.distinct_expr().len())
} else {
Ok(group_expr.len())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do other validation? Should I be able to group by *?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect that we could add extra validation here over time. I would need to research what is allowable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the sqlparser won't allow group by *

❯ select count(*) from foo group by *;  🤔 Invalid statement: sql parser error: Expected an expression:, found: *

}
}

/// Find all distinct exprs in a list of group by expressions. If the
/// first element is a `GroupingSet` expression then it must be the only expr.
pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> Result<Vec<Expr>> {
Expand Down Expand Up @@ -395,12 +411,12 @@ pub fn from_plan(
})),
LogicalPlan::Aggregate(Aggregate {
group_expr, schema, ..
}) => Ok(LogicalPlan::Aggregate(Aggregate {
group_expr: expr[0..group_expr.len()].to_vec(),
aggr_expr: expr[group_expr.len()..].to_vec(),
input: Arc::new(inputs[0].clone()),
schema: schema.clone(),
})),
}) => Ok(LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(inputs[0].clone()),
expr[0..group_expr.len()].to_vec(),
expr[group_expr.len()..].to_vec(),
schema.clone(),
)?)),
LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort {
expr: expr.to_vec(),
input: Arc::new(inputs[0].clone()),
Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ fn optimize(
let new_aggr_expr = new_expr.pop().unwrap();
let new_group_expr = new_expr.pop().unwrap();

Ok(LogicalPlan::Aggregate(Aggregate {
input: Arc::new(new_input),
group_expr: new_group_expr,
aggr_expr: new_aggr_expr,
schema: schema.clone(),
}))
Ok(LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(new_input),
new_group_expr,
new_aggr_expr,
schema.clone(),
)?))
}
LogicalPlan::Sort(Sort { expr, input }) => {
let arrays = to_arrays(expr, input, &mut expr_set)?;
Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,18 +345,18 @@ fn optimize_plan(
schema.metadata().clone(),
)?;

Ok(LogicalPlan::Aggregate(Aggregate {
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
input: Arc::new(optimize_plan(
Ok(LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(optimize_plan(
_optimizer,
input,
&new_required_columns,
true,
_optimizer_config,
)?),
schema: DFSchemaRef::new(new_schema),
}))
group_expr.clone(),
new_aggr_expr,
DFSchemaRef::new(new_schema),
)?))
}
// scans:
// * remove un-used columns from the scan projection
Expand Down
25 changes: 12 additions & 13 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
all_field,
input.schema().metadata().clone(),
)?;
let grouped_agg = LogicalPlan::Aggregate(Aggregate {
input: input.clone(),
group_expr: all_group_args,
aggr_expr: Vec::new(),
schema: Arc::new(grouped_schema.clone()),
});
let grouped_agg = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
all_group_args,
Vec::new(),
Arc::new(grouped_schema.clone()),
)?);
let grouped_agg = optimize_children(&grouped_agg);
let final_agg_schema = Arc::new(DFSchema::new_with_metadata(
base_group_expr
Expand All @@ -129,13 +129,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
));
});

let final_agg = LogicalPlan::Aggregate(Aggregate {
input: Arc::new(grouped_agg?),
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
schema: final_agg_schema,
});

let final_agg = LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(grouped_agg?),
group_expr.clone(),
new_aggr_expr,
final_agg_schema,
)?);
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
alias_expr,
Arc::new(final_agg),
Expand Down