From e3c01e64bb7a48ea97e0c68e9aa9ae07c6842030 Mon Sep 17 00:00:00 2001 From: jackwener Date: Fri, 2 Dec 2022 01:25:19 +0800 Subject: [PATCH 1/3] remove Object-oriented get methods of Filter. --- datafusion-examples/examples/rewrite_expr.rs | 4 +-- datafusion/core/src/physical_plan/planner.rs | 6 ++-- datafusion/expr/src/logical_plan/plan.rs | 4 +-- .../optimizer/src/common_subexpr_eliminate.rs | 6 ++-- .../optimizer/src/decorrelate_where_exists.rs | 13 ++++----- .../optimizer/src/decorrelate_where_in.rs | 12 ++++---- .../optimizer/src/eliminate_cross_join.rs | 4 +-- datafusion/optimizer/src/eliminate_filter.rs | 6 ++-- .../optimizer/src/eliminate_outer_join.rs | 4 +-- datafusion/optimizer/src/push_down_filter.rs | 29 +++++++++---------- .../src/rewrite_disjunctive_predicate.rs | 4 +-- .../optimizer/src/scalar_subquery_to_join.rs | 14 ++++----- .../optimizer/src/subquery_filter_to_join.rs | 8 ++--- datafusion/proto/src/logical_plan.rs | 4 +-- 14 files changed, 56 insertions(+), 62 deletions(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 7a2ee6d93cd1..ded069957107 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -84,11 +84,11 @@ impl OptimizerRule for MyRule { match plan { LogicalPlan::Filter(filter) => { let mut expr_rewriter = MyExprRewriter {}; - let predicate = filter.predicate().clone(); + let predicate = filter.predicate.clone(); let predicate = predicate.rewrite(&mut expr_rewriter)?; Ok(LogicalPlan::Filter(Filter::try_new( predicate, - filter.input().clone(), + filter.input.clone(), )?)) } _ => Ok(plan.clone()), diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index d11b27670bf8..cdd7cb9af262 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -756,12 +756,12 @@ impl DefaultPhysicalPlanner { )?)) } LogicalPlan::Filter(filter) => { - let physical_input = self.create_initial_plan(filter.input(), session_state).await?; + let physical_input = self.create_initial_plan(&filter.input, session_state).await?; let input_schema = physical_input.as_ref().schema(); - let input_dfschema = filter.input().schema(); + let input_dfschema = filter.input.schema(); let runtime_expr = self.create_physical_expr( - filter.predicate(), + &filter.predicate, input_dfschema, &input_schema, session_state, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ed5711a7fb57..b2e8417b0029 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1193,9 +1193,9 @@ pub struct SubqueryAlias { #[derive(Clone)] pub struct Filter { /// The predicate expression, which must have Boolean type. - predicate: Expr, + pub predicate: Expr, /// The incoming logical plan - input: Arc, + pub input: Arc, } impl Filter { diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 11ed5cdbebb0..06446812cfa6 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -120,8 +120,8 @@ impl OptimizerRule for CommonSubexprEliminate { )?)) } LogicalPlan::Filter(filter) => { - let input = filter.input(); - let predicate = filter.predicate(); + let input = &filter.input; + let predicate = &filter.predicate; let input_schema = Arc::clone(input.schema()); let mut id_array = vec![]; expr_to_identifier( @@ -134,7 +134,7 @@ impl OptimizerRule for CommonSubexprEliminate { let (mut new_expr, new_input) = self.rewrite_expr( &[&[predicate.clone()]], &[&[id_array]], - filter.input(), + &filter.input, &mut expr_set, optimizer_config, )?; diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 9cf9138bc062..81392fecd342 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -89,8 +89,8 @@ impl OptimizerRule for DecorrelateWhereExists { ) -> Result> { match plan { LogicalPlan::Filter(filter) => { - let predicate = filter.predicate(); - let filter_input = filter.input(); + let predicate = &filter.predicate; + let filter_input = &filter.input; // Apply optimizer rule to current input let optimized_input = self.optimize(filter_input, optimizer_config)?; @@ -173,22 +173,21 @@ fn optimize_exists( .map_err(|e| context!("cannot optimize non-correlated subquery", e))?; // split into filters - let subqry_filter_exprs = split_conjunction(subqry_filter.predicate()); + let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on let (col_exprs, other_subqry_exprs) = - find_join_exprs(subqry_filter_exprs, subqry_filter.input().schema())?; + find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())?; let (outer_cols, subqry_cols, join_filters) = - exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?; + exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)?; if subqry_cols.is_empty() || outer_cols.is_empty() { // cannot optimize non-correlated subquery return Ok(None); } // build subquery side of join - the thing the subquery was querying - let mut subqry_plan = - LogicalPlanBuilder::from(subqry_filter.input().as_ref().clone()); + let mut subqry_plan = LogicalPlanBuilder::from(subqry_filter.input.as_ref().clone()); if let Some(expr) = conjunction(other_subqry_exprs) { subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them } diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index ba8fba543102..1491d2e6c932 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -83,8 +83,8 @@ impl OptimizerRule for DecorrelateWhereIn { ) -> datafusion_common::Result { match plan { LogicalPlan::Filter(filter) => { - let predicate = filter.predicate(); - let filter_input = filter.input(); + let predicate = &filter.predicate; + let filter_input = &filter.input; // Apply optimizer rule to current input let optimized_input = self.optimize(filter_input, optimizer_config)?; @@ -150,18 +150,18 @@ fn optimize_where_in( let mut other_subqry_exprs = vec![]; if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() { // split into filters - let subqry_filter_exprs = split_conjunction(subqry_filter.predicate()); + let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on let (col_exprs, other_exprs) = - find_join_exprs(subqry_filter_exprs, subqry_filter.input().schema()) + find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema()) .map_err(|e| context!("column correlation not found", e))?; if !col_exprs.is_empty() { // it's correlated - subqry_input = subqry_filter.input().clone(); + subqry_input = subqry_filter.input.clone(); (outer_cols, subqry_cols, join_filters) = - exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false) + exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false) .map_err(|e| context!("column correlation not found", e))?; other_subqry_exprs = other_exprs; } diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 83fc9e164beb..a2951c4602ac 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -62,7 +62,7 @@ impl OptimizerRule for EliminateCrossJoin { ) -> Result { match plan { LogicalPlan::Filter(filter) => { - let input = (**filter.input()).clone(); + let input = filter.input.as_ref().clone(); let mut possible_join_keys: Vec<(Column, Column)> = vec![]; let mut all_inputs: Vec = vec![]; @@ -86,7 +86,7 @@ impl OptimizerRule for EliminateCrossJoin { } } - let predicate = filter.predicate(); + let predicate = &filter.predicate; // join keys are handled locally let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new(); diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index d5cbcf3949c9..2f557cf8d8b0 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -43,10 +43,8 @@ impl OptimizerRule for EliminateFilter { _optimizer_config: &mut OptimizerConfig, ) -> Result { let predicate_and_input = match plan { - LogicalPlan::Filter(filter) => match filter.predicate() { - Expr::Literal(ScalarValue::Boolean(Some(v))) => { - Some((*v, filter.input())) - } + LogicalPlan::Filter(filter) => match filter.predicate { + Expr::Literal(ScalarValue::Boolean(Some(v))) => Some((v, &filter.input)), _ => None, }, _ => None, diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 9be661624652..0164b3cae00a 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -68,12 +68,12 @@ impl OptimizerRule for EliminateOuterJoin { optimizer_config: &mut OptimizerConfig, ) -> Result { match plan { - LogicalPlan::Filter(filter) => match filter.input().as_ref() { + LogicalPlan::Filter(filter) => match filter.input.as_ref() { LogicalPlan::Join(join) => { let mut non_nullable_cols: Vec = vec![]; extract_non_nullable_columns( - filter.predicate(), + &filter.predicate, &mut non_nullable_cols, join.left.schema(), join.right.schema(), diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index e59590df5515..b241dbcc670b 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -524,14 +524,14 @@ impl OptimizerRule for PushDownFilter { _ => return utils::optimize_children(self, plan, optimizer_config), }; - let child_plan = &**filter.input(); + let child_plan = filter.input.as_ref(); let new_plan = match child_plan { LogicalPlan::Filter(child_filter) => { let new_predicate = - and(filter.predicate().clone(), child_filter.predicate().clone()); + and(filter.predicate.clone(), child_filter.predicate.clone()); let new_plan = LogicalPlan::Filter(Filter::try_new( new_predicate, - child_filter.input().clone(), + child_filter.input.clone(), )?); return self.optimize(&new_plan, optimizer_config); } @@ -561,7 +561,7 @@ impl OptimizerRule for PushDownFilter { ); } let new_predicate = - replace_cols_by_name(filter.predicate().clone(), &replace_map)?; + replace_cols_by_name(filter.predicate.clone(), &replace_map)?; let new_filter = LogicalPlan::Filter(Filter::try_new( new_predicate, subquery_alias.input.clone(), @@ -590,7 +590,7 @@ impl OptimizerRule for PushDownFilter { // re-write all filters based on this projection // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1" let new_filter = LogicalPlan::Filter(Filter::try_new( - replace_cols_by_name(filter.predicate().clone(), &replace_map)?, + replace_cols_by_name(filter.predicate.clone(), &replace_map)?, projection.input.clone(), )?); @@ -608,7 +608,7 @@ impl OptimizerRule for PushDownFilter { } let push_predicate = - replace_cols_by_name(filter.predicate().clone(), &replace_map)?; + replace_cols_by_name(filter.predicate.clone(), &replace_map)?; inputs.push(Arc::new(LogicalPlan::Filter(Filter::try_new( push_predicate, input.clone(), @@ -635,7 +635,7 @@ impl OptimizerRule for PushDownFilter { used_columns.extend(agg_columns); let predicates = utils::split_conjunction_owned(utils::cnf_rewrite( - filter.predicate().clone(), + filter.predicate.clone(), )); let mut keep_predicates = vec![]; @@ -661,11 +661,8 @@ impl OptimizerRule for PushDownFilter { )?), None => (*agg.input).clone(), }; - let new_agg = from_plan( - filter.input(), - &filter.input().expressions(), - &vec![child], - )?; + let new_agg = + from_plan(&filter.input, &filter.input.expressions(), &vec![child])?; match conjunction(keep_predicates) { Some(predicate) => LogicalPlan::Filter(Filter::try_new( predicate, @@ -675,24 +672,24 @@ impl OptimizerRule for PushDownFilter { } } LogicalPlan::Join(join) => { - match push_down_join(filter.input(), join, Some(filter.predicate()))? { + match push_down_join(&filter.input, join, Some(&filter.predicate))? { Some(optimized_plan) => optimized_plan, None => plan.clone(), } } LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { let predicates = utils::split_conjunction_owned(utils::cnf_rewrite( - filter.predicate().clone(), + filter.predicate.clone(), )); - push_down_all_join(predicates, filter.input(), left, right, vec![])? + push_down_all_join(predicates, &filter.input, left, right, vec![])? } LogicalPlan::TableScan(scan) => { let mut new_scan_filters = scan.filters.clone(); let mut new_predicate = vec![]; let filter_predicates = utils::split_conjunction_owned( - utils::cnf_rewrite(filter.predicate().clone()), + utils::cnf_rewrite(filter.predicate.clone()), ); for filter_expr in &filter_predicates { diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 6a2ba2bde68a..a0b8ac2cd03e 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -131,12 +131,12 @@ impl OptimizerRule for RewriteDisjunctivePredicate { ) -> Result { match plan { LogicalPlan::Filter(filter) => { - let predicate = predicate(filter.predicate())?; + let predicate = predicate(&filter.predicate)?; let rewritten_predicate = rewrite_predicate(predicate); let rewritten_expr = normalize_predicate(rewritten_predicate); Ok(LogicalPlan::Filter(Filter::try_new( rewritten_expr, - Arc::new(Self::optimize(self, filter.input(), _optimizer_config)?), + Arc::new(Self::optimize(self, &filter.input, _optimizer_config)?), )?)) } _ => utils::optimize_children(self, plan, _optimizer_config), diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 5e5167ab4e16..b81df61482ef 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -97,21 +97,21 @@ impl OptimizerRule for ScalarSubqueryToJoin { match plan { LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input - let optimized_input = self.optimize(filter.input(), optimizer_config)?; + let optimized_input = self.optimize(&filter.input, optimizer_config)?; let (subqueries, other_exprs) = - self.extract_subquery_exprs(filter.predicate(), optimizer_config)?; + self.extract_subquery_exprs(&filter.predicate, optimizer_config)?; if subqueries.is_empty() { // regular filter, no subquery exists clause here return Ok(LogicalPlan::Filter(Filter::try_new( - filter.predicate().clone(), + filter.predicate.clone(), Arc::new(optimized_input), )?)); } // iterate through all subqueries in predicate, turning each into a join - let mut cur_input = filter.input().as_ref().clone(); + let mut cur_input = filter.input.as_ref().clone(); for subquery in subqueries { if let Some(optimized_subquery) = optimize_scalar( &subquery, @@ -123,7 +123,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { } else { // if we can't handle all of the subqueries then bail for now return Ok(LogicalPlan::Filter(Filter::try_new( - filter.predicate().clone(), + filter.predicate.clone(), Arc::new(optimized_input), )?)); } @@ -228,14 +228,14 @@ fn optimize_scalar( // if there were filters, we use that logical plan, otherwise the plan from the aggregate let input = if let Some(filter) = filter { - filter.input() + &filter.input } else { &aggr.input }; // if there were filters, split and capture them let subqry_filter_exprs = if let Some(filter) = filter { - split_conjunction(filter.predicate()) + split_conjunction(&filter.predicate) } else { vec![] }; diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index 504167d21fe2..ec4aa2f727fb 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -57,10 +57,10 @@ impl OptimizerRule for SubqueryFilterToJoin { match plan { LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input - let optimized_input = self.optimize(filter.input(), optimizer_config)?; + let optimized_input = self.optimize(&filter.input, optimizer_config)?; // Splitting filter expression into components by AND - let filters = utils::split_conjunction(filter.predicate()); + let filters = utils::split_conjunction(&filter.predicate); // Searching for subquery-based filters let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) = @@ -79,7 +79,7 @@ impl OptimizerRule for SubqueryFilterToJoin { if !subqueries_in_regular.is_empty() { return Ok(LogicalPlan::Filter(Filter::try_new( - filter.predicate().clone(), + filter.predicate.clone(), Arc::new(optimized_input), )?)); }; @@ -151,7 +151,7 @@ impl OptimizerRule for SubqueryFilterToJoin { Ok(plan) => plan, Err(_) => { return Ok(LogicalPlan::Filter(Filter::try_new( - filter.predicate().clone(), + filter.predicate.clone(), Arc::new(optimized_input), )?)) } diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 78db01313b36..fbfecb03f94e 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -975,14 +975,14 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Filter(filter) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( - filter.input().as_ref(), + filter.input.as_ref(), extension_codec, )?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Selection(Box::new( protobuf::SelectionNode { input: Some(Box::new(input)), - expr: Some(filter.predicate().try_into()?), + expr: Some((&(filter.predicate)).try_into()?), }, ))), }) From 409ca0b8a0f4f1a256b79d2ac4062db081dfcada Mon Sep 17 00:00:00 2001 From: jackwener Date: Fri, 2 Dec 2022 01:27:54 +0800 Subject: [PATCH 2/3] remove access method --- datafusion/expr/src/logical_plan/plan.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b2e8417b0029..a45ddd3cb105 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1236,16 +1236,6 @@ impl Filter { _ => plan_err!("Could not coerce into Filter!"), } } - - /// Access the filter predicate expression - pub fn predicate(&self) -> &Expr { - &self.predicate - } - - /// Access the filter input plan - pub fn input(&self) -> &Arc { - &self.input - } } /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) From 7a49b266143fa3be3e03e253f905311021160be4 Mon Sep 17 00:00:00 2001 From: jackwener Date: Fri, 2 Dec 2022 02:02:02 +0800 Subject: [PATCH 3/3] fix --- datafusion-examples/examples/rewrite_expr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index ded069957107..b5006e7b2626 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -88,7 +88,7 @@ impl OptimizerRule for MyRule { let predicate = predicate.rewrite(&mut expr_rewriter)?; Ok(LogicalPlan::Filter(Filter::try_new( predicate, - filter.input.clone(), + filter.input, )?)) } _ => Ok(plan.clone()),