From 8139308a6f235c76388b6e9856d1548a711843e9 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 6 Nov 2022 22:45:09 +0800 Subject: [PATCH] reuse code `utils::optimize_children` instead of redundant implementation. --- datafusion/optimizer/src/eliminate_filter.rs | 11 +- datafusion/optimizer/src/inline_table_scan.rs | 80 +++--- .../src/rewrite_disjunctive_predicate.rs | 45 ++-- .../src/single_distinct_to_groupby.rs | 246 +++++++++--------- 4 files changed, 170 insertions(+), 212 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index a5d92f55f3b6..04f6f00f8be5 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -18,11 +18,10 @@ //! Optimizer rule to replace `where false` on a plan with an empty relation. //! This saves time in planning and executing the query. //! Note that this rule should be applied after simplify expressions optimizer rule. -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ logical_plan::{EmptyRelation, LogicalPlan}, - utils::from_plan, Expr, }; @@ -61,13 +60,7 @@ impl OptimizerRule for EliminateFilter { })), None => { // Apply the optimization to all inputs of the plan - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|plan| self.optimize(plan, _optimizer_config)) - .collect::>>()?; - - from_plan(plan, &plan.expressions(), &new_inputs) + utils::optimize_children(self, plan, _optimizer_config) } } } diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 89c78405ae67..3b58c9643f69 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -18,11 +18,9 @@ //! Optimizer rule to replace TableScan references //! such as DataFrames and Views and inlines the LogicalPlan //! to support further optimization -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::Result; -use datafusion_expr::{ - logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan, -}; +use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan}; /// Optimization rule that inlines TableScan that provide a [LogicalPlan] /// ([DataFrame] / [ViewTable]) @@ -36,54 +34,44 @@ impl InlineTableScan { } } -/// Inline -fn inline_table_scan(plan: &LogicalPlan) -> Result { - match plan { - // Match only on scans without filter / projection / fetch - // Views and DataFrames won't have those added - // during the early stage of planning - LogicalPlan::TableScan(TableScan { - source, - table_name, - filters, - fetch: None, - .. - }) if filters.is_empty() => { - if let Some(sub_plan) = source.get_logical_plan() { - // Recursively apply optimization - let plan = inline_table_scan(sub_plan)?; - let plan = LogicalPlanBuilder::from(plan).project_with_alias( - vec![Expr::Wildcard], - Some(table_name.to_string()), - )?; - plan.build() - } else { - // No plan available, return with table scan as is - Ok(plan.clone()) - } - } - - // Rest: Recurse - _ => { - // apply the optimization to all inputs of the plan - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|plan| inline_table_scan(plan)) - .collect::>>()?; - - from_plan(plan, &plan.expressions(), &new_inputs) - } - } -} - impl OptimizerRule for InlineTableScan { fn optimize( &self, plan: &LogicalPlan, _optimizer_config: &mut OptimizerConfig, ) -> Result { - inline_table_scan(plan) + match plan { + // Match only on scans without filter / projection / fetch + // Views and DataFrames won't have those added + // during the early stage of planning + LogicalPlan::TableScan(TableScan { + source, + table_name, + filters, + fetch: None, + .. + }) if filters.is_empty() => { + if let Some(sub_plan) = source.get_logical_plan() { + // Recursively apply optimization + let plan = + utils::optimize_children(self, sub_plan, _optimizer_config)?; + let plan = LogicalPlanBuilder::from(plan).project_with_alias( + vec![Expr::Wildcard], + Some(table_name.to_string()), + )?; + plan.build() + } else { + // No plan available, return with table scan as is + Ok(plan.clone()) + } + } + + // Rest: Recurse + _ => { + // apply the optimization to all inputs of the plan + utils::optimize_children(self, plan, _optimizer_config) + } + } } fn name(&self) -> &str { diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 65b35abb229a..7d7184b064d8 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::expr::BinaryExpr; use datafusion_expr::logical_plan::Filter; -use datafusion_expr::utils::from_plan; use datafusion_expr::{Expr, LogicalPlan, Operator}; use std::sync::Arc; @@ -122,7 +121,14 @@ impl RewriteDisjunctivePredicate { pub fn new() -> Self { Self::default() } - fn rewrite_disjunctive_predicate(plan: &LogicalPlan) -> Result { +} + +impl OptimizerRule for RewriteDisjunctivePredicate { + fn optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result { match plan { LogicalPlan::Filter(filter) => { let predicate = predicate(filter.predicate())?; @@ -130,30 +136,12 @@ impl RewriteDisjunctivePredicate { let rewritten_expr = normalize_predicate(rewritten_predicate); Ok(LogicalPlan::Filter(Filter::try_new( rewritten_expr, - Arc::new(Self::rewrite_disjunctive_predicate(filter.input())?), + Arc::new(Self::optimize(self, filter.input(), _optimizer_config)?), )?)) } - _ => { - let expr = plan.expressions(); - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|input| Self::rewrite_disjunctive_predicate(input)) - .collect::>>()?; - from_plan(plan, &expr, &new_inputs) - } + _ => utils::optimize_children(self, plan, _optimizer_config), } } -} - -impl OptimizerRule for RewriteDisjunctivePredicate { - fn optimize( - &self, - plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, - ) -> Result { - Self::rewrite_disjunctive_predicate(plan) - } fn name(&self) -> &str { "rewrite_disjunctive_predicate" @@ -362,7 +350,6 @@ fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate { } #[cfg(test)] - mod tests { use crate::rewrite_disjunctive_predicate::{ normalize_predicate, predicate, rewrite_predicate, Predicate, @@ -392,7 +379,7 @@ mod tests { }, Predicate::Other { expr: Box::new(gt_expr.clone()) - } + }, ] }, Predicate::And { @@ -402,9 +389,9 @@ mod tests { }, Predicate::Other { expr: Box::new(lt_expr.clone()) - } + }, ] - } + }, ] } ); @@ -423,9 +410,9 @@ mod tests { }, Predicate::Other { expr: Box::new(lt_expr.clone()) - } + }, ] - } + }, ] } ); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 35d9c13d63c0..0360c569ace4 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -17,12 +17,12 @@ //! single distinct to group by optimizer rule -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{ col, logical_plan::{Aggregate, LogicalPlan, Projection}, - utils::{columnize_expr, from_plan}, + utils::columnize_expr, Expr, ExprSchemable, }; use hashbrown::HashSet; @@ -54,131 +54,6 @@ impl SingleDistinctToGroupBy { } } -fn optimize(plan: &LogicalPlan) -> Result { - match plan { - LogicalPlan::Aggregate(Aggregate { - input, - aggr_expr, - schema, - group_expr, - }) => { - if is_single_distinct_agg(plan)? && !contains_grouping_set(group_expr) { - // alias all original group_by exprs - let mut group_expr_alias = Vec::with_capacity(group_expr.len()); - let mut inner_group_exprs = group_expr - .iter() - .enumerate() - .map(|(i, group_expr)| { - let alias_str = format!("group_alias_{}", i); - let alias_expr = group_expr.clone().alias(&alias_str); - group_expr_alias.push((alias_str, schema.fields()[i].clone())); - alias_expr - }) - .collect::>(); - - // and they can be referenced by the alias in the outer aggr plan - let outer_group_exprs = group_expr_alias - .iter() - .map(|(alias, _)| col(alias)) - .collect::>(); - - // replace the distinct arg with alias - let mut group_fields_set = HashSet::new(); - let new_aggr_exprs = aggr_expr - .iter() - .map(|aggr_expr| match aggr_expr { - Expr::AggregateFunction { - fun, args, filter, .. - } => { - // is_single_distinct_agg ensure args.len=1 - if group_fields_set.insert(args[0].display_name()?) { - inner_group_exprs - .push(args[0].clone().alias(SINGLE_DISTINCT_ALIAS)); - } - Ok(Expr::AggregateFunction { - fun: fun.clone(), - args: vec![col(SINGLE_DISTINCT_ALIAS)], - distinct: false, // intentional to remove distinct here - filter: filter.clone(), - }) - } - _ => Ok(aggr_expr.clone()), - }) - .collect::>>()?; - - // construct the inner AggrPlan - let inner_fields = inner_group_exprs - .iter() - .map(|expr| expr.to_field(input.schema())) - .collect::>>()?; - let inner_schema = DFSchema::new_with_metadata( - inner_fields, - input.schema().metadata().clone(), - )?; - let grouped_aggr = LogicalPlan::Aggregate(Aggregate::try_new( - input.clone(), - inner_group_exprs, - Vec::new(), - )?); - let inner_agg = optimize_children(&grouped_aggr)?; - - let outer_aggr_schema = Arc::new(DFSchema::new_with_metadata( - outer_group_exprs - .iter() - .chain(new_aggr_exprs.iter()) - .map(|expr| expr.to_field(&inner_schema)) - .collect::>>()?, - input.schema().metadata().clone(), - )?); - - // so the aggregates are displayed in the same way even after the rewrite - // this optimizer has two kinds of alias: - // - group_by aggr - // - aggr expr - let mut alias_expr: Vec = Vec::new(); - for (alias, original_field) in group_expr_alias { - alias_expr.push(col(&alias).alias(original_field.qualified_name())); - } - for (i, expr) in new_aggr_exprs.iter().enumerate() { - alias_expr.push(columnize_expr( - expr.clone().alias( - schema.clone().fields()[i + group_expr.len()] - .qualified_name(), - ), - &outer_aggr_schema, - )); - } - - let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new( - Arc::new(inner_agg), - outer_group_exprs, - new_aggr_exprs, - )?); - - Ok(LogicalPlan::Projection(Projection::try_new_with_schema( - alias_expr, - Arc::new(outer_aggr), - schema.clone(), - None, - )?)) - } else { - optimize_children(plan) - } - } - _ => optimize_children(plan), - } -} - -fn optimize_children(plan: &LogicalPlan) -> Result { - let expr = plan.expressions(); - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|plan| optimize(plan)) - .collect::>>()?; - from_plan(plan, &expr, &new_inputs) -} - /// Check whether all aggregate exprs are distinct on a single field. fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { match plan { @@ -213,7 +88,122 @@ impl OptimizerRule for SingleDistinctToGroupBy { plan: &LogicalPlan, _optimizer_config: &mut OptimizerConfig, ) -> Result { - optimize(plan) + match plan { + LogicalPlan::Aggregate(Aggregate { + input, + aggr_expr, + schema, + group_expr, + }) => { + if is_single_distinct_agg(plan)? && !contains_grouping_set(group_expr) { + // alias all original group_by exprs + let mut group_expr_alias = Vec::with_capacity(group_expr.len()); + let mut inner_group_exprs = group_expr + .iter() + .enumerate() + .map(|(i, group_expr)| { + let alias_str = format!("group_alias_{}", i); + let alias_expr = group_expr.clone().alias(&alias_str); + group_expr_alias + .push((alias_str, schema.fields()[i].clone())); + alias_expr + }) + .collect::>(); + + // and they can be referenced by the alias in the outer aggr plan + let outer_group_exprs = group_expr_alias + .iter() + .map(|(alias, _)| col(alias)) + .collect::>(); + + // replace the distinct arg with alias + let mut group_fields_set = HashSet::new(); + let new_aggr_exprs = aggr_expr + .iter() + .map(|aggr_expr| match aggr_expr { + Expr::AggregateFunction { + fun, args, filter, .. + } => { + // is_single_distinct_agg ensure args.len=1 + if group_fields_set.insert(args[0].display_name()?) { + inner_group_exprs.push( + args[0].clone().alias(SINGLE_DISTINCT_ALIAS), + ); + } + Ok(Expr::AggregateFunction { + fun: fun.clone(), + args: vec![col(SINGLE_DISTINCT_ALIAS)], + distinct: false, // intentional to remove distinct here + filter: filter.clone(), + }) + } + _ => Ok(aggr_expr.clone()), + }) + .collect::>>()?; + + // construct the inner AggrPlan + let inner_fields = inner_group_exprs + .iter() + .map(|expr| expr.to_field(input.schema())) + .collect::>>()?; + let inner_schema = DFSchema::new_with_metadata( + inner_fields, + input.schema().metadata().clone(), + )?; + let grouped_aggr = LogicalPlan::Aggregate(Aggregate::try_new( + input.clone(), + inner_group_exprs, + Vec::new(), + )?); + let inner_agg = + utils::optimize_children(self, &grouped_aggr, _optimizer_config)?; + + let outer_aggr_schema = Arc::new(DFSchema::new_with_metadata( + outer_group_exprs + .iter() + .chain(new_aggr_exprs.iter()) + .map(|expr| expr.to_field(&inner_schema)) + .collect::>>()?, + input.schema().metadata().clone(), + )?); + + // so the aggregates are displayed in the same way even after the rewrite + // this optimizer has two kinds of alias: + // - group_by aggr + // - aggr expr + let mut alias_expr: Vec = Vec::new(); + for (alias, original_field) in group_expr_alias { + alias_expr + .push(col(&alias).alias(original_field.qualified_name())); + } + for (i, expr) in new_aggr_exprs.iter().enumerate() { + alias_expr.push(columnize_expr( + expr.clone().alias( + schema.clone().fields()[i + group_expr.len()] + .qualified_name(), + ), + &outer_aggr_schema, + )); + } + + let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new( + Arc::new(inner_agg), + outer_group_exprs, + new_aggr_exprs, + )?); + + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + alias_expr, + Arc::new(outer_aggr), + schema.clone(), + None, + )?)) + } else { + utils::optimize_children(self, plan, _optimizer_config) + } + } + _ => utils::optimize_children(self, plan, _optimizer_config), + } } fn name(&self) -> &str { "single_distinct_aggregation_to_group_by"