From 72f89cdb7c8f631339a7a9f29019571c280611d0 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 11 Apr 2023 21:15:47 +0800 Subject: [PATCH 1/5] feat: allow the customization of analyzer rules Signed-off-by: Ruihang Xia --- datafusion-examples/examples/rewrite_expr.rs | 2 +- datafusion/core/src/execution/context.rs | 5 +- datafusion/optimizer/src/analyzer/mod.rs | 6 +-- datafusion/optimizer/src/eliminate_limit.rs | 14 ++++-- datafusion/optimizer/src/optimizer.rs | 49 ++++++++++++++----- .../optimizer/src/propagate_empty_relation.rs | 11 +++-- datafusion/optimizer/src/push_down_filter.rs | 11 +++-- .../optimizer/src/push_down_projection.rs | 11 +++-- datafusion/optimizer/src/test/mod.rs | 8 +-- 9 files changed, 78 insertions(+), 39 deletions(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 7a752e5c003c..93aa976db250 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -47,7 +47,7 @@ pub fn main() -> Result<()> { ); // now run the optimizer with our custom rule - let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); + let optimizer = Optimizer::with_rules(vec![], vec![Arc::new(MyRule {})]); let config = OptimizerContext::default().with_skip_failing_rules(false); let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?; println!( diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index c3adb4cc74dd..1706f6d17b6c 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -100,7 +100,7 @@ use crate::physical_optimizer::global_sort_selection::GlobalSortSelection; use crate::physical_optimizer::pipeline_checker::PipelineChecker; use crate::physical_optimizer::pipeline_fixer::PipelineFixer; use crate::physical_optimizer::sort_enforcement::EnforceSorting; -use datafusion_optimizer::OptimizerConfig; +use datafusion_optimizer::{analyzer::AnalyzerRule, OptimizerConfig}; use datafusion_sql::planner::object_name_to_table_reference; use uuid::Uuid; @@ -1451,9 +1451,10 @@ impl SessionState { /// Replace the optimizer rules pub fn with_optimizer_rules( mut self, + analyzer_rules: Vec>, rules: Vec>, ) -> Self { - self.optimizer = Optimizer::with_rules(rules); + self.optimizer = Optimizer::with_rules(analyzer_rules, rules); self } diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index bb9b01c8593e..b5a29a287694 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -mod count_wildcard_rule; -mod inline_table_scan; -pub(crate) mod type_coercion; +pub mod count_wildcard_rule; +pub mod inline_table_scan; +pub mod type_coercion; use crate::analyzer::count_wildcard_rule::CountWildcardRule; use crate::analyzer::inline_table_scan::InlineTableScan; diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index 7844ca7909fc..d706916c2e9e 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -94,7 +94,8 @@ mod tests { use crate::push_down_limit::PushDownLimit; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]); + let optimizer = + Optimizer::with_rules(vec![], vec![Arc::new(EliminateLimit::new())]); let optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), @@ -115,10 +116,13 @@ mod tests { ) -> Result<()> { fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} let config = OptimizerContext::new().with_max_passes(1); - let optimizer = Optimizer::with_rules(vec![ - Arc::new(PushDownLimit::new()), - Arc::new(EliminateLimit::new()), - ]); + let optimizer = Optimizer::with_rules( + vec![], + vec![ + Arc::new(PushDownLimit::new()), + Arc::new(EliminateLimit::new()), + ], + ); let optimized_plan = optimizer .optimize(plan, &config, observe) .expect("failed to optimize plan"); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 6d02c46cc0f5..fc40ea576b8a 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -17,7 +17,10 @@ //! Query optimizer traits -use crate::analyzer::Analyzer; +use crate::analyzer::count_wildcard_rule::CountWildcardRule; +use crate::analyzer::inline_table_scan::InlineTableScan; +use crate::analyzer::type_coercion::TypeCoercion; +use crate::analyzer::{Analyzer, AnalyzerRule}; use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_where_exists::DecorrelateWhereExists; use crate::decorrelate_where_in::DecorrelateWhereIn; @@ -156,7 +159,10 @@ impl OptimizerConfig for OptimizerContext { /// A rule-based optimizer. #[derive(Clone)] pub struct Optimizer { - /// All rules to apply + /// All analyzer rules to apply + pub analyzer_rules: Vec>, + + /// All optimizer rules to apply pub rules: Vec>, } @@ -207,6 +213,12 @@ impl Default for Optimizer { impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new() -> Self { + let analyzer_rules: Vec> = vec![ + Arc::new(InlineTableScan::new()), + Arc::new(TypeCoercion::new()), + Arc::new(CountWildcardRule::new()), + ]; + let rules: Vec> = vec![ Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), @@ -244,12 +256,18 @@ impl Optimizer { Arc::new(PushDownLimit::new()), ]; - Self::with_rules(rules) + Self::with_rules(analyzer_rules, rules) } /// Create a new optimizer with the given rules - pub fn with_rules(rules: Vec>) -> Self { - Self { rules } + pub fn with_rules( + analyzer_rules: Vec>, + rules: Vec>, + ) -> Self { + Self { + analyzer_rules, + rules, + } } /// Optimizes the logical plan by applying optimizer rules, and @@ -265,7 +283,8 @@ impl Optimizer { { let options = config.options(); // execute_and_check has it's own timer - let mut new_plan = Analyzer::default().execute_and_check(plan, options)?; + let mut new_plan = Analyzer::with_rules(self.analyzer_rules.clone()) + .execute_and_check(plan, options)?; let start_time = Instant::now(); @@ -456,7 +475,7 @@ mod tests { #[test] fn skip_failing_rule() { - let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]); + let opt = Optimizer::with_rules(vec![], vec![Arc::new(BadRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(true); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -467,7 +486,7 @@ mod tests { #[test] fn no_skip_failing_rule() { - let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]); + let opt = Optimizer::with_rules(vec![], vec![Arc::new(BadRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(false); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -485,7 +504,7 @@ mod tests { #[test] fn generate_different_schema() { - let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]); + let opt = Optimizer::with_rules(vec![], vec![Arc::new(GetTableScanRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(false); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -511,7 +530,7 @@ mod tests { fn generate_same_schema_different_metadata() -> Result<()> { // if the plan creates more metadata than previously (because // some wrapping functions are removed, etc) do not error - let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]); + let opt = Optimizer::with_rules(vec![], vec![Arc::new(GetTableScanRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(false); let input = Arc::new(test_table_scan()?); @@ -536,7 +555,10 @@ mod tests { // Run a goofy optimizer, which rotates projection columns // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3] - let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]); + let opt = Optimizer::with_rules( + vec![], + vec![Arc::new(RotateProjectionRule::new(false))], + ); let config = OptimizerContext::new().with_max_passes(16); let initial_plan = LogicalPlanBuilder::empty(false) @@ -562,7 +584,10 @@ mod tests { // Run a goofy optimizer, which reverses and rotates projection columns // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1] - let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]); + let opt = Optimizer::with_rules( + vec![], + vec![Arc::new(RotateProjectionRule::new(true))], + ); let config = OptimizerContext::new().with_max_passes(16); let initial_plan = LogicalPlanBuilder::empty(false) diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 01e16058ec32..c7e94a10d933 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -211,10 +211,13 @@ mod tests { expected: &str, ) -> Result<()> { fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} - let optimizer = Optimizer::with_rules(vec![ - Arc::new(EliminateFilter::new()), - Arc::new(PropagateEmptyRelation::new()), - ]); + let optimizer = Optimizer::with_rules( + vec![], + vec![ + Arc::new(EliminateFilter::new()), + Arc::new(PropagateEmptyRelation::new()), + ], + ); let config = &mut OptimizerContext::new() .with_max_passes(1) .with_skip_failing_rules(false); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 1c0b4b07e57b..157bd4b0ebb2 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -919,10 +919,13 @@ mod tests { plan: &LogicalPlan, expected: &str, ) -> Result<()> { - let optimizer = Optimizer::with_rules(vec![ - Arc::new(RewriteDisjunctivePredicate::new()), - Arc::new(PushDownFilter::new()), - ]); + let optimizer = Optimizer::with_rules( + vec![], + vec![ + Arc::new(RewriteDisjunctivePredicate::new()), + Arc::new(PushDownFilter::new()), + ], + ); let mut optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index fd8f4c011a17..ee6a98cbdd0d 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -1061,10 +1061,13 @@ mod tests { } fn optimize(plan: &LogicalPlan) -> Result { - let optimizer = Optimizer::with_rules(vec![ - Arc::new(PushDownProjection::new()), - Arc::new(EliminateProjection::new()), - ]); + let optimizer = Optimizer::with_rules( + vec![], + vec![ + Arc::new(PushDownProjection::new()), + Arc::new(EliminateProjection::new()), + ], + ); let mut optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 439f44151ed7..5cd4436a7534 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -127,7 +127,7 @@ pub fn assert_optimized_plan_eq( plan: &LogicalPlan, expected: &str, ) -> Result<()> { - let optimizer = Optimizer::with_rules(vec![rule]); + let optimizer = Optimizer::with_rules(vec![], vec![rule]); let optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), @@ -146,7 +146,7 @@ pub fn assert_optimized_plan_eq_display_indent( plan: &LogicalPlan, expected: &str, ) { - let optimizer = Optimizer::with_rules(vec![rule]); + let optimizer = Optimizer::with_rules(vec![], vec![rule]); let optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), @@ -164,7 +164,7 @@ pub fn assert_optimizer_err( plan: &LogicalPlan, expected: &str, ) { - let optimizer = Optimizer::with_rules(vec![rule]); + let optimizer = Optimizer::with_rules(vec![], vec![rule]); let res = optimizer.optimize_recursively( optimizer.rules.get(0).unwrap(), plan, @@ -185,7 +185,7 @@ pub fn assert_optimization_skipped( rule: Arc, plan: &LogicalPlan, ) -> Result<()> { - let optimizer = Optimizer::with_rules(vec![rule]); + let optimizer = Optimizer::with_rules(vec![], vec![rule]); let new_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), From 3c58741045adcfd8d79ba0190ee3369c3eeebe15 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 11 Apr 2023 22:13:26 +0800 Subject: [PATCH 2/5] move analyzer out of optimizer Signed-off-by: Ruihang Xia --- datafusion-examples/examples/rewrite_expr.rs | 2 +- datafusion/core/src/execution/context.rs | 37 ++++++++++++-- .../src/analyzer/count_wildcard_rule.rs | 6 +++ datafusion/optimizer/src/eliminate_limit.rs | 14 ++---- datafusion/optimizer/src/optimizer.rs | 49 +++++-------------- .../optimizer/src/propagate_empty_relation.rs | 11 ++--- datafusion/optimizer/src/push_down_filter.rs | 11 ++--- .../optimizer/src/push_down_projection.rs | 11 ++--- datafusion/optimizer/src/test/mod.rs | 8 +-- .../optimizer/tests/integration-test.rs | 7 ++- 10 files changed, 76 insertions(+), 80 deletions(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 93aa976db250..7a752e5c003c 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -47,7 +47,7 @@ pub fn main() -> Result<()> { ); // now run the optimizer with our custom rule - let optimizer = Optimizer::with_rules(vec![], vec![Arc::new(MyRule {})]); + let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); let config = OptimizerContext::default().with_skip_failing_rules(false); let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?; println!( diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 1706f6d17b6c..2aa0ca95db71 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -100,7 +100,10 @@ use crate::physical_optimizer::global_sort_selection::GlobalSortSelection; use crate::physical_optimizer::pipeline_checker::PipelineChecker; use crate::physical_optimizer::pipeline_fixer::PipelineFixer; use crate::physical_optimizer::sort_enforcement::EnforceSorting; -use datafusion_optimizer::{analyzer::AnalyzerRule, OptimizerConfig}; +use datafusion_optimizer::{ + analyzer::{Analyzer, AnalyzerRule}, + OptimizerConfig, +}; use datafusion_sql::planner::object_name_to_table_reference; use uuid::Uuid; @@ -1198,6 +1201,8 @@ impl QueryPlanner for DefaultQueryPlanner { pub struct SessionState { /// UUID for the session session_id: String, + /// Responsible for analyzing and rewrite a logical plan before optimization + analyzer: Analyzer, /// Responsible for optimizing a logical plan optimizer: Optimizer, /// Responsible for optimizing a physical execution plan @@ -1336,6 +1341,7 @@ impl SessionState { SessionState { session_id, + analyzer: Analyzer::new(), optimizer: Optimizer::new(), physical_optimizers, query_planner: Arc::new(DefaultQueryPlanner {}), @@ -1448,13 +1454,21 @@ impl SessionState { self } + /// Replace the analyzer rules + pub fn with_analyzer_rules( + mut self, + rules: Vec>, + ) -> Self { + self.analyzer = Analyzer::with_rules(rules); + self + } + /// Replace the optimizer rules pub fn with_optimizer_rules( mut self, - analyzer_rules: Vec>, rules: Vec>, ) -> Self { - self.optimizer = Optimizer::with_rules(analyzer_rules, rules); + self.optimizer = Optimizer::with_rules(rules); self } @@ -1467,6 +1481,15 @@ impl SessionState { self } + /// Adds a new [`AnalyzerRule`] + pub fn add_analyzer_rule( + mut self, + analyzer_rule: Arc, + ) -> Self { + self.analyzer.rules.push(analyzer_rule); + self + } + /// Adds a new [`OptimizerRule`] pub fn add_optimizer_rule( mut self, @@ -1640,9 +1663,12 @@ impl SessionState { if let LogicalPlan::Explain(e) = plan { let mut stringified_plans = e.stringified_plans.clone(); + let analyzed_plan = self + .analyzer + .execute_and_check(e.plan.as_ref(), self.options())?; // optimize the child plan, capturing the output of each optimizer let (plan, logical_optimization_succeeded) = match self.optimizer.optimize( - e.plan.as_ref(), + &analyzed_plan, self, |optimized_plan, optimizer| { let optimizer_name = optimizer.name().to_string(); @@ -1668,7 +1694,8 @@ impl SessionState { logical_optimization_succeeded, })) } else { - self.optimizer.optimize(plan, self, |_, _| {}) + let analyzed_plan = self.analyzer.execute_and_check(plan, self.options())?; + self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) } } diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index ba19108ceb5e..5dbabd8289e4 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -28,6 +28,12 @@ use crate::analyzer::AnalyzerRule; /// Resolve issue: https://github.com/apache/arrow-datafusion/issues/5473. pub struct CountWildcardRule {} +impl Default for CountWildcardRule { + fn default() -> Self { + Self::new() + } +} + impl CountWildcardRule { pub fn new() -> Self { CountWildcardRule {} diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index d706916c2e9e..7844ca7909fc 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -94,8 +94,7 @@ mod tests { use crate::push_down_limit::PushDownLimit; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let optimizer = - Optimizer::with_rules(vec![], vec![Arc::new(EliminateLimit::new())]); + let optimizer = Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]); let optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), @@ -116,13 +115,10 @@ mod tests { ) -> Result<()> { fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} let config = OptimizerContext::new().with_max_passes(1); - let optimizer = Optimizer::with_rules( - vec![], - vec![ - Arc::new(PushDownLimit::new()), - Arc::new(EliminateLimit::new()), - ], - ); + let optimizer = Optimizer::with_rules(vec![ + Arc::new(PushDownLimit::new()), + Arc::new(EliminateLimit::new()), + ]); let optimized_plan = optimizer .optimize(plan, &config, observe) .expect("failed to optimize plan"); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index fc40ea576b8a..8bc616bf095f 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -17,10 +17,6 @@ //! Query optimizer traits -use crate::analyzer::count_wildcard_rule::CountWildcardRule; -use crate::analyzer::inline_table_scan::InlineTableScan; -use crate::analyzer::type_coercion::TypeCoercion; -use crate::analyzer::{Analyzer, AnalyzerRule}; use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_where_exists::DecorrelateWhereExists; use crate::decorrelate_where_in::DecorrelateWhereIn; @@ -159,9 +155,6 @@ impl OptimizerConfig for OptimizerContext { /// A rule-based optimizer. #[derive(Clone)] pub struct Optimizer { - /// All analyzer rules to apply - pub analyzer_rules: Vec>, - /// All optimizer rules to apply pub rules: Vec>, } @@ -213,12 +206,6 @@ impl Default for Optimizer { impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new() -> Self { - let analyzer_rules: Vec> = vec![ - Arc::new(InlineTableScan::new()), - Arc::new(TypeCoercion::new()), - Arc::new(CountWildcardRule::new()), - ]; - let rules: Vec> = vec![ Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), @@ -256,18 +243,12 @@ impl Optimizer { Arc::new(PushDownLimit::new()), ]; - Self::with_rules(analyzer_rules, rules) + Self::with_rules(rules) } /// Create a new optimizer with the given rules - pub fn with_rules( - analyzer_rules: Vec>, - rules: Vec>, - ) -> Self { - Self { - analyzer_rules, - rules, - } + pub fn with_rules(rules: Vec>) -> Self { + Self { rules } } /// Optimizes the logical plan by applying optimizer rules, and @@ -282,9 +263,7 @@ impl Optimizer { F: FnMut(&LogicalPlan, &dyn OptimizerRule), { let options = config.options(); - // execute_and_check has it's own timer - let mut new_plan = Analyzer::with_rules(self.analyzer_rules.clone()) - .execute_and_check(plan, options)?; + let mut new_plan = plan.clone(); let start_time = Instant::now(); @@ -351,7 +330,7 @@ impl Optimizer { } log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); - Ok(new_plan) + Ok(new_plan.clone()) } fn optimize_node( @@ -475,7 +454,7 @@ mod tests { #[test] fn skip_failing_rule() { - let opt = Optimizer::with_rules(vec![], vec![Arc::new(BadRule {})]); + let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(true); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -486,7 +465,7 @@ mod tests { #[test] fn no_skip_failing_rule() { - let opt = Optimizer::with_rules(vec![], vec![Arc::new(BadRule {})]); + let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(false); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -504,7 +483,7 @@ mod tests { #[test] fn generate_different_schema() { - let opt = Optimizer::with_rules(vec![], vec![Arc::new(GetTableScanRule {})]); + let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(false); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -530,7 +509,7 @@ mod tests { fn generate_same_schema_different_metadata() -> Result<()> { // if the plan creates more metadata than previously (because // some wrapping functions are removed, etc) do not error - let opt = Optimizer::with_rules(vec![], vec![Arc::new(GetTableScanRule {})]); + let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]); let config = OptimizerContext::new().with_skip_failing_rules(false); let input = Arc::new(test_table_scan()?); @@ -555,10 +534,7 @@ mod tests { // Run a goofy optimizer, which rotates projection columns // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3] - let opt = Optimizer::with_rules( - vec![], - vec![Arc::new(RotateProjectionRule::new(false))], - ); + let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]); let config = OptimizerContext::new().with_max_passes(16); let initial_plan = LogicalPlanBuilder::empty(false) @@ -584,10 +560,7 @@ mod tests { // Run a goofy optimizer, which reverses and rotates projection columns // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1] - let opt = Optimizer::with_rules( - vec![], - vec![Arc::new(RotateProjectionRule::new(true))], - ); + let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]); let config = OptimizerContext::new().with_max_passes(16); let initial_plan = LogicalPlanBuilder::empty(false) diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index c7e94a10d933..01e16058ec32 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -211,13 +211,10 @@ mod tests { expected: &str, ) -> Result<()> { fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} - let optimizer = Optimizer::with_rules( - vec![], - vec![ - Arc::new(EliminateFilter::new()), - Arc::new(PropagateEmptyRelation::new()), - ], - ); + let optimizer = Optimizer::with_rules(vec![ + Arc::new(EliminateFilter::new()), + Arc::new(PropagateEmptyRelation::new()), + ]); let config = &mut OptimizerContext::new() .with_max_passes(1) .with_skip_failing_rules(false); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 157bd4b0ebb2..1c0b4b07e57b 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -919,13 +919,10 @@ mod tests { plan: &LogicalPlan, expected: &str, ) -> Result<()> { - let optimizer = Optimizer::with_rules( - vec![], - vec![ - Arc::new(RewriteDisjunctivePredicate::new()), - Arc::new(PushDownFilter::new()), - ], - ); + let optimizer = Optimizer::with_rules(vec![ + Arc::new(RewriteDisjunctivePredicate::new()), + Arc::new(PushDownFilter::new()), + ]); let mut optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index ee6a98cbdd0d..fd8f4c011a17 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -1061,13 +1061,10 @@ mod tests { } fn optimize(plan: &LogicalPlan) -> Result { - let optimizer = Optimizer::with_rules( - vec![], - vec![ - Arc::new(PushDownProjection::new()), - Arc::new(EliminateProjection::new()), - ], - ); + let optimizer = Optimizer::with_rules(vec![ + Arc::new(PushDownProjection::new()), + Arc::new(EliminateProjection::new()), + ]); let mut optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 5cd4436a7534..439f44151ed7 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -127,7 +127,7 @@ pub fn assert_optimized_plan_eq( plan: &LogicalPlan, expected: &str, ) -> Result<()> { - let optimizer = Optimizer::with_rules(vec![], vec![rule]); + let optimizer = Optimizer::with_rules(vec![rule]); let optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), @@ -146,7 +146,7 @@ pub fn assert_optimized_plan_eq_display_indent( plan: &LogicalPlan, expected: &str, ) { - let optimizer = Optimizer::with_rules(vec![], vec![rule]); + let optimizer = Optimizer::with_rules(vec![rule]); let optimized_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), @@ -164,7 +164,7 @@ pub fn assert_optimizer_err( plan: &LogicalPlan, expected: &str, ) { - let optimizer = Optimizer::with_rules(vec![], vec![rule]); + let optimizer = Optimizer::with_rules(vec![rule]); let res = optimizer.optimize_recursively( optimizer.rules.get(0).unwrap(), plan, @@ -185,7 +185,7 @@ pub fn assert_optimization_skipped( rule: Arc, plan: &LogicalPlan, ) -> Result<()> { - let optimizer = Optimizer::with_rules(vec![], vec![rule]); + let optimizer = Optimizer::with_rules(vec![rule]); let new_plan = optimizer .optimize_recursively( optimizer.rules.get(0).unwrap(), diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index 0b9134c8b84f..e58a2aaa00c9 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -20,8 +20,9 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource}; +use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; -use datafusion_optimizer::{OptimizerContext, OptimizerRule}; +use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::sqlparser::ast::Statement; use datafusion_sql::sqlparser::dialect::GenericDialect; @@ -347,8 +348,10 @@ fn test_sql(sql: &str) -> Result { let config = OptimizerContext::new() .with_skip_failing_rules(false) .with_query_execution_start_time(now_time); + let analyzer = Analyzer::new(); let optimizer = Optimizer::new(); - // optimize the logical plan + // analyze and optimize the logical plan + let plan = analyzer.execute_and_check(&plan, config.options())?; optimizer.optimize(&plan, &config, &observe) } From c7696e29ee08fc8de1ec34f5bdb083144c816881 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 12 Apr 2023 12:03:17 +0800 Subject: [PATCH 3/5] add example analyzer rule Signed-off-by: Ruihang Xia --- datafusion-examples/examples/rewrite_expr.rs | 73 ++++++++++++++++++-- 1 file changed, 66 insertions(+), 7 deletions(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 7a752e5c003c..16c82bef6afb 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -18,10 +18,11 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::{ AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, }; +use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule}; use datafusion_optimizer::optimizer::Optimizer; use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; @@ -46,10 +47,18 @@ pub fn main() -> Result<()> { logical_plan.display_indent() ); - // now run the optimizer with our custom rule - let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); + // run the analyzer with our custom rule let config = OptimizerContext::default().with_skip_failing_rules(false); - let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?; + let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]); + let analyzed_plan = analyzer.execute_and_check(&logical_plan, config.options())?; + println!( + "Analyzed Logical Plan:\n\n{}\n", + analyzed_plan.display_indent() + ); + + // then run the optimizer with our custom rule + let optimizer = Optimizer::with_rules(vec![Arc::new(MyOptimizerRule {})]); + let optimized_plan = optimizer.optimize(&analyzed_plan, &config, observe)?; println!( "Optimized Logical Plan:\n\n{}\n", optimized_plan.display_indent() @@ -66,11 +75,61 @@ fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { ) } -struct MyRule {} +/// An example analyzer rule that changes Int64 literals to UInt64 +struct MyAnalyzerRule {} + +impl AnalyzerRule for MyAnalyzerRule { + fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { + Self::analyze_internal(&plan) + } + + fn name(&self) -> &str { + "my_analyzer_rule" + } +} + +impl MyAnalyzerRule { + fn analyze_internal(plan: &LogicalPlan) -> Result { + // optimize child plans first + let mut new_inputs = plan + .inputs() + .iter() + .map(|p| Self::analyze_internal(p)) + .collect::>>()?; + + if let LogicalPlan::Filter(filter) = plan { + let predicate = Self::analyze_expr(filter.predicate.clone())?; + Ok(LogicalPlan::Filter(Filter::try_new( + predicate, + Arc::new(new_inputs.pop().unwrap()), + )?)) + } else { + Ok(plan.with_new_inputs(&new_inputs)?) + } + } + + fn analyze_expr(expr: Expr) -> Result { + expr.transform(&|expr| { + // closure is invoked for all sub expressions + Ok(match expr { + Expr::Literal(ScalarValue::Int64(i)) => { + // transform to UInt64 + Transformed::Yes(Expr::Literal(ScalarValue::UInt64( + i.map(|i| i as u64), + ))) + } + _ => Transformed::No(expr), + }) + }) + } +} + +/// An example optimizer rule that rewrite BETWEEN expression to binary compare expressions +struct MyOptimizerRule {} -impl OptimizerRule for MyRule { +impl OptimizerRule for MyOptimizerRule { fn name(&self) -> &str { - "my_rule" + "my_optimizer_rule" } fn try_optimize( From 33526392ed4aad79a1a1b8a4e72c1515fdf46e56 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 12 Apr 2023 21:46:05 +0800 Subject: [PATCH 4/5] Update datafusion/optimizer/src/optimizer.rs Co-authored-by: jakevin --- datafusion/optimizer/src/optimizer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 8bc616bf095f..77b2312bc74a 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -330,7 +330,7 @@ impl Optimizer { } log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); - Ok(new_plan.clone()) + Ok(new_plan) } fn optimize_node( From 09e43ae8438d4fe2ae86452e7f6b72bb1812fa20 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 12 Apr 2023 21:59:35 +0800 Subject: [PATCH 5/5] apply CR sugg. Signed-off-by: Ruihang Xia --- datafusion-examples/examples/rewrite_expr.rs | 32 ++++++++----------- .../src/analyzer/count_wildcard_rule.rs | 7 +--- 2 files changed, 15 insertions(+), 24 deletions(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 16c82bef6afb..451205e4cb39 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -80,7 +80,7 @@ struct MyAnalyzerRule {} impl AnalyzerRule for MyAnalyzerRule { fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> Result { - Self::analyze_internal(&plan) + Self::analyze_plan(plan) } fn name(&self) -> &str { @@ -89,23 +89,19 @@ impl AnalyzerRule for MyAnalyzerRule { } impl MyAnalyzerRule { - fn analyze_internal(plan: &LogicalPlan) -> Result { - // optimize child plans first - let mut new_inputs = plan - .inputs() - .iter() - .map(|p| Self::analyze_internal(p)) - .collect::>>()?; - - if let LogicalPlan::Filter(filter) = plan { - let predicate = Self::analyze_expr(filter.predicate.clone())?; - Ok(LogicalPlan::Filter(Filter::try_new( - predicate, - Arc::new(new_inputs.pop().unwrap()), - )?)) - } else { - Ok(plan.with_new_inputs(&new_inputs)?) - } + fn analyze_plan(plan: LogicalPlan) -> Result { + plan.transform(&|plan| { + Ok(match plan { + LogicalPlan::Filter(filter) => { + let predicate = Self::analyze_expr(filter.predicate.clone())?; + Transformed::Yes(LogicalPlan::Filter(Filter::try_new( + predicate, + filter.input, + )?)) + } + _ => Transformed::No(plan), + }) + }) } fn analyze_expr(expr: Expr) -> Result { diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index 5dbabd8289e4..ecd00d7ac15c 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -26,14 +26,9 @@ use crate::analyzer::AnalyzerRule; /// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`. /// Resolve issue: https://github.com/apache/arrow-datafusion/issues/5473. +#[derive(Default)] pub struct CountWildcardRule {} -impl Default for CountWildcardRule { - fn default() -> Self { - Self::new() - } -} - impl CountWildcardRule { pub fn new() -> Self { CountWildcardRule {}