From b2fcb763e31822f993ba6a027598470180aa46e3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 14 Nov 2022 14:58:59 -0700 Subject: [PATCH 1/3] Add try_optimize method --- .../optimizer/src/decorrelate_where_exists.rs | 64 +++++++++++-------- datafusion/optimizer/src/optimizer.rs | 21 +++++- datafusion/optimizer/src/test/mod.rs | 8 +++ datafusion/optimizer/src/utils.rs | 11 ++-- 4 files changed, 69 insertions(+), 35 deletions(-) diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index b52c174b5921..8fc88b349782 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -20,7 +20,7 @@ use crate::utils::{ verify_not_disjunction, }; use crate::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_common::{context, plan_err, DataFusionError}; +use datafusion_common::context; use datafusion_expr::logical_plan::{Filter, JoinType, Subquery}; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; use std::sync::Arc; @@ -75,6 +75,14 @@ impl OptimizerRule for DecorrelateWhereExists { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { + Ok(self.try_optimize(plan, optimizer_config)?.unwrap_or(plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> datafusion_common::Result> { match plan { LogicalPlan::Filter(filter) => { let predicate = filter.predicate(); @@ -91,19 +99,28 @@ impl OptimizerRule for DecorrelateWhereExists { )?); if subqueries.is_empty() { // regular filter, no subquery exists clause here - return Ok(optimized_plan); + return Ok(Some(optimized_plan)); } // iterate through all exists clauses in predicate, turning each into a join let mut cur_input = (**filter_input).clone(); for subquery in subqueries { - cur_input = optimize_exists(&subquery, &cur_input, &other_exprs)?; + if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)? + { + cur_input = x; + } else { + return Ok(None); + } } - Ok(cur_input) + Ok(Some(cur_input)) } _ => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) } } } @@ -132,20 +149,22 @@ fn optimize_exists( query_info: &SubqueryInfo, outer_input: &LogicalPlan, outer_other_exprs: &[Expr], -) -> datafusion_common::Result { +) -> datafusion_common::Result> { let subqry_filter = match query_info.query.subquery.as_ref() { LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() { LogicalPlan::Projection(subqry_proj) => { Filter::try_from_plan(&subqry_proj.input) } - _ => Err(DataFusionError::NotImplemented( - "Subquery currently only supports distinct or projection".to_string(), - )), + _ => { + // Subquery currently only supports distinct or projection + return Ok(None); + } }, LogicalPlan::Projection(subqry_proj) => Filter::try_from_plan(&subqry_proj.input), - _ => Err(DataFusionError::NotImplemented( - "Subquery currently only supports distinct or projection".to_string(), - )), + _ => { + // Subquery currently only supports distinct or projection + return Ok(None); + } } .map_err(|e| context!("cannot optimize non-correlated subquery", e))?; @@ -159,7 +178,8 @@ fn optimize_exists( let (outer_cols, subqry_cols, join_filters) = exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?; if subqry_cols.is_empty() || outer_cols.is_empty() { - plan_err!("cannot optimize non-correlated subquery")?; + // cannot optimize non-correlated subquery + return Ok(None); } // build subquery side of join - the thing the subquery was querying @@ -188,7 +208,7 @@ fn optimize_exists( } let result = new_plan.build()?; - Ok(result) + Ok(Some(result)) } struct SubqueryInfo { @@ -318,9 +338,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"cannot optimize non-correlated subquery"#; - - assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected); + assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan); Ok(()) } @@ -339,9 +357,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"cannot optimize non-correlated subquery"#; - - assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected); + assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan); Ok(()) } @@ -360,9 +376,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"cannot optimize non-correlated subquery"#; - - assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected); + assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan); Ok(()) } @@ -426,9 +440,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = r#"cannot optimize non-correlated subquery"#; - - assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected); + assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan); Ok(()) } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 315f47499f78..d4323e456668 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -48,7 +48,18 @@ use std::time::Instant; /// way. If there are no suitable transformations for the input plan, /// the optimizer can simply return it as is. pub trait OptimizerRule { - /// Rewrite `plan` to an optimized form + /// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be + /// optimized by this rule. + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + self.optimize(plan, optimizer_config).map(|plan| Some(plan)) + } + + /// Rewrite `plan` to an optimized form. This method will eventually be deprecated and + /// replace by `try_optimize`. fn optimize( &self, plan: &LogicalPlan, @@ -209,13 +220,17 @@ impl Optimizer { log_plan(&format!("Optimizer input (pass {})", i), &new_plan); for rule in &self.rules { - let result = rule.optimize(&new_plan, optimizer_config); + let result = rule.try_optimize(&new_plan, optimizer_config); match result { - Ok(plan) => { + Ok(Some(plan)) => { new_plan = plan; observer(&new_plan, rule.as_ref()); log_plan(rule.name(), &new_plan); } + Ok(None) => { + observer(&new_plan, rule.as_ref()); + log_plan(rule.name(), &new_plan); + } Err(ref e) => { if optimizer_config.skip_failing_rules { // Note to future readers: if you see this warning it signals a diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index fc7d0bd8a1d5..504c4b2ba659 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -129,3 +129,11 @@ pub fn assert_optimizer_err( } } } + +pub fn assert_optimization_skipped( + rule: &dyn OptimizerRule, + plan: &LogicalPlan, +) { + let new_plan = rule.optimize(plan, &mut OptimizerConfig::new()).unwrap(); + assert_eq!(format!("{}", plan.display_indent()), format!("{}", new_plan.display_indent())); +} diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index c5496b5237f4..bd802b7c8490 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -43,12 +43,11 @@ pub fn optimize_children( optimizer_config: &mut OptimizerConfig, ) -> Result { let new_exprs = plan.expressions(); - let new_inputs = plan - .inputs() - .into_iter() - .map(|plan| optimizer.optimize(plan, optimizer_config)) - .collect::>>()?; - + let mut new_inputs = Vec::with_capacity(plan.inputs().len()); + for input in plan.inputs() { + let new_input = optimizer.try_optimize(input, optimizer_config)?; + new_inputs.push(new_input.unwrap_or(input.clone())) + } from_plan(plan, &new_exprs, &new_inputs) } From 88dc6be07716a59ccc5cd161d0bfb06718abcdf0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 14 Nov 2022 15:09:59 -0700 Subject: [PATCH 2/3] lint --- datafusion/optimizer/src/decorrelate_where_exists.rs | 4 +++- datafusion/optimizer/src/optimizer.rs | 2 +- datafusion/optimizer/src/test/mod.rs | 10 +++++----- datafusion/optimizer/src/utils.rs | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 8fc88b349782..a7669456da02 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -75,7 +75,9 @@ impl OptimizerRule for DecorrelateWhereExists { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { - Ok(self.try_optimize(plan, optimizer_config)?.unwrap_or(plan.clone())) + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) } fn try_optimize( diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d4323e456668..d18e21b9f419 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -55,7 +55,7 @@ pub trait OptimizerRule { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result> { - self.optimize(plan, optimizer_config).map(|plan| Some(plan)) + self.optimize(plan, optimizer_config).map(Some) } /// Rewrite `plan` to an optimized form. This method will eventually be deprecated and diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 504c4b2ba659..f5f93517e492 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -130,10 +130,10 @@ pub fn assert_optimizer_err( } } -pub fn assert_optimization_skipped( - rule: &dyn OptimizerRule, - plan: &LogicalPlan, -) { +pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan: &LogicalPlan) { let new_plan = rule.optimize(plan, &mut OptimizerConfig::new()).unwrap(); - assert_eq!(format!("{}", plan.display_indent()), format!("{}", new_plan.display_indent())); + assert_eq!( + format!("{}", plan.display_indent()), + format!("{}", new_plan.display_indent()) + ); } diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index bd802b7c8490..e2d326c16cc3 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -46,7 +46,7 @@ pub fn optimize_children( let mut new_inputs = Vec::with_capacity(plan.inputs().len()); for input in plan.inputs() { let new_input = optimizer.try_optimize(input, optimizer_config)?; - new_inputs.push(new_input.unwrap_or(input.clone())) + new_inputs.push(new_input.unwrap_or_else(|| input.clone())) } from_plan(plan, &new_exprs, &new_inputs) } From 50148ca945ec8f029b54c3fdab789b3c07fffccc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 15 Nov 2022 08:43:28 -0700 Subject: [PATCH 3/3] address feedback --- .../optimizer/src/decorrelate_where_exists.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index a7669456da02..9cf9138bc062 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -20,9 +20,11 @@ use crate::utils::{ verify_not_disjunction, }; use crate::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_common::context; -use datafusion_expr::logical_plan::{Filter, JoinType, Subquery}; -use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion_common::{context, Result}; +use datafusion_expr::{ + logical_plan::{Filter, JoinType, Subquery}, + Expr, LogicalPlan, LogicalPlanBuilder, +}; use std::sync::Arc; /// Optimizer rule for rewriting subquery filters to joins @@ -47,7 +49,7 @@ impl DecorrelateWhereExists { &self, predicate: &Expr, optimizer_config: &mut OptimizerConfig, - ) -> datafusion_common::Result<(Vec, Vec)> { + ) -> Result<(Vec, Vec)> { let filters = split_conjunction(predicate); let mut subqueries = vec![]; @@ -74,7 +76,7 @@ impl OptimizerRule for DecorrelateWhereExists { &self, plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, - ) -> datafusion_common::Result { + ) -> Result { Ok(self .try_optimize(plan, optimizer_config)? .unwrap_or_else(|| plan.clone())) @@ -84,7 +86,7 @@ impl OptimizerRule for DecorrelateWhereExists { &self, plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, - ) -> datafusion_common::Result> { + ) -> Result> { match plan { LogicalPlan::Filter(filter) => { let predicate = filter.predicate(); @@ -151,7 +153,7 @@ fn optimize_exists( query_info: &SubqueryInfo, outer_input: &LogicalPlan, outer_other_exprs: &[Expr], -) -> datafusion_common::Result> { +) -> Result> { let subqry_filter = match query_info.query.subquery.as_ref() { LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() { LogicalPlan::Projection(subqry_proj) => {