From 8f2ec6d82cb75ea021538fb6d3626a1056c9469a Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 19 Feb 2023 23:26:26 +0800 Subject: [PATCH] refactor `push_down_filter` to fix dead-loop and use optimizer_recurse. --- datafusion/optimizer/src/push_down_filter.rs | 75 ++++++++++---------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 8e90a683f730..4369b025fcb0 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -14,6 +14,7 @@ //! Push Down Filter optimizer rule ensures that filters are applied as early as possible in the plan +use crate::optimizer::ApplyOrder; use crate::utils::{conjunction, split_conjunction}; use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::{Column, DFSchema, DataFusionError, Result}; @@ -29,7 +30,6 @@ use datafusion_expr::{ use std::collections::{HashMap, HashSet}; use std::iter::once; use std::sync::Arc; -use utils::optimize_children; /// Push Down Filter optimizer rule pushes filter clauses down the plan /// # Introduction @@ -511,25 +511,20 @@ impl OptimizerRule for PushDownFilter { "push_down_filter" } + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + fn try_optimize( &self, plan: &LogicalPlan, - config: &dyn OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result> { let filter = match plan { LogicalPlan::Filter(filter) => filter, // we also need to pushdown filter in Join. - LogicalPlan::Join(join) => { - let optimized_plan = push_down_join(plan, join, None)?; - return match optimized_plan { - Some(optimized_plan) => Ok(Some( - optimize_children(self, &optimized_plan, config)? - .unwrap_or(optimized_plan), - )), - None => optimize_children(self, plan, config), - }; - } - _ => return optimize_children(self, plan, config), + LogicalPlan::Join(join) => return push_down_join(plan, join, None), + _ => return Ok(None), }; let child_plan = filter.input.as_ref(); @@ -550,11 +545,12 @@ impl OptimizerRule for PushDownFilter { let new_predicate = conjunction(new_predicates).ok_or_else(|| { DataFusionError::Plan("at least one expression exists".to_string()) })?; - let new_plan = LogicalPlan::Filter(Filter::try_new( + let new_filter = LogicalPlan::Filter(Filter::try_new( new_predicate, child_filter.input.clone(), )?); - return self.try_optimize(&new_plan, config); + self.try_optimize(&new_filter, _config)? + .unwrap_or(new_filter) } LogicalPlan::Repartition(_) | LogicalPlan::Distinct(_) @@ -691,7 +687,7 @@ impl OptimizerRule for PushDownFilter { LogicalPlan::Join(join) => { match push_down_join(&filter.input, join, Some(&filter.predicate))? { Some(optimized_plan) => optimized_plan, - None => plan.clone(), + None => return Ok(None), } } LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { @@ -741,12 +737,9 @@ impl OptimizerRule for PushDownFilter { None => new_scan, } } - _ => plan.clone(), + _ => return Ok(None), }; - - Ok(Some( - optimize_children(self, &new_plan, config)?.unwrap_or(new_plan), - )) + Ok(Some(new_plan)) } } @@ -777,6 +770,7 @@ pub fn replace_cols_by_name( #[cfg(test)] mod tests { use super::*; + use crate::optimizer::Optimizer; use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate; use crate::test::*; use crate::OptimizerContext; @@ -791,28 +785,35 @@ mod tests { use std::sync::Arc; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let optimized_plan = PushDownFilter::new() - .try_optimize(plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); - let formatted_plan = format!("{optimized_plan:?}"); - assert_eq!(plan.schema(), optimized_plan.schema()); - assert_eq!(expected, formatted_plan); - Ok(()) + crate::test::assert_optimized_plan_eq( + Arc::new(PushDownFilter::new()), + plan, + expected, + ) } fn assert_optimized_plan_eq_with_rewrite_predicate( plan: &LogicalPlan, expected: &str, ) -> Result<()> { - let mut optimized_plan = RewriteDisjunctivePredicate::new() - .try_optimize(plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); - optimized_plan = PushDownFilter::new() - .try_optimize(&optimized_plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); + let optimizer = Optimizer::with_rules(vec![ + Arc::new(RewriteDisjunctivePredicate::new()), + Arc::new(PushDownFilter::new()), + ]); + let mut optimized_plan = optimizer + .optimize_recursively( + optimizer.rules.get(0).unwrap(), + plan, + &OptimizerContext::new(), + )? + .unwrap_or_else(|| plan.clone()); + optimized_plan = optimizer + .optimize_recursively( + optimizer.rules.get(1).unwrap(), + &optimized_plan, + &OptimizerContext::new(), + )? + .unwrap_or_else(|| plan.clone()); let formatted_plan = format!("{optimized_plan:?}"); assert_eq!(plan.schema(), optimized_plan.schema()); assert_eq!(expected, formatted_plan);