From 38088f6dd02a33f76cde8d0b68ead27d281b853f Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 22 Nov 2022 12:24:16 +0800 Subject: [PATCH] reimplment eliminate_limit to remove `global-state`. --- datafusion/optimizer/src/eliminate_limit.rs | 307 +++++++------------- 1 file changed, 108 insertions(+), 199 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index 2deea4eb9f7e..1cf8b6ad281b 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -20,17 +20,12 @@ //! on a plan with an empty relation. //! This rule also removes OFFSET 0 from the [LogicalPlan] //! This saves time in planning and executing the query. -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::Result; -use datafusion_expr::{ - logical_plan::{EmptyRelation, Limit, LogicalPlan}, - utils::from_plan, -}; +use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan}; -/// Optimization rule that replaces LIMIT 0 or -/// LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch -/// with an [LogicalPlan::EmptyRelation]. -/// This rule also removes OFFSET 0 from the [LogicalPlan] +/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, fetch:None). +/// It can cooperate with `propagate_empty_relation` and `limit_push_down`. #[derive(Default)] pub struct EliminateLimit; @@ -41,104 +36,31 @@ impl EliminateLimit { } } -/// Ancestor indicates the current ancestor in the LogicalPlan tree -/// when traversing down related to "eliminate limit". -enum Ancestor { - /// Limit - FromLimit { skip: usize }, - /// Other nodes that don't affect the adjustment of "Limit" - NotRelevant, -} - -/// replaces LIMIT 0 with an [LogicalPlan::EmptyRelation] -/// replaces LIMIT node whose ancestor LIMIT's skip is greater than or equal to current's fetch -/// with an [LogicalPlan::EmptyRelation] -/// removes OFFSET 0 from the [LogicalPlan] -fn eliminate_limit( - _optimizer: &EliminateLimit, - ancestor: &Ancestor, - plan: &LogicalPlan, - _optimizer_config: &OptimizerConfig, -) -> Result { - match plan { - LogicalPlan::Limit(Limit { - skip, fetch, input, .. - }) => { - let ancestor_skip = match ancestor { - Ancestor::FromLimit { skip } => *skip, - _ => 0, - }; - // If ancestor's skip is equal or greater than current's fetch, - // replaces with an [LogicalPlan::EmptyRelation]. - // For such query, the inner query(select * from xxx limit 5) should be optimized as an EmptyRelation: - // select * from (select * from xxx limit 5) a limit 2 offset 5; - match fetch { +impl OptimizerRule for EliminateLimit { + fn optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result { + if let LogicalPlan::Limit(limit) = plan { + match limit.fetch { Some(fetch) => { - if *fetch == 0 || ancestor_skip >= *fetch { + if fetch == 0 { return Ok(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: input.schema().clone(), + schema: limit.input.schema().clone(), })); } } None => { - if *skip == 0 { - // If there is no LIMIT and OFFSET is zero, LIMIT/OFFSET can be removed - return Ok(input.as_ref().clone()); + if limit.skip == 0 { + let input = &*limit.input; + return utils::optimize_children(self, input, optimizer_config); } } } - - let expr = plan.expressions(); - - // apply the optimization to all inputs of the plan - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|plan| { - eliminate_limit( - _optimizer, - &Ancestor::FromLimit { skip: *skip }, - plan, - _optimizer_config, - ) - }) - .collect::>>()?; - - from_plan(plan, &expr, &new_inputs) - } - // Rest: recurse and find possible LIMIT 0/Multi LIMIT OFFSET nodes - _ => { - // For those plans(projection/sort/..) which do not affect the output rows of sub-plans, we still use ancestor; - // otherwise, use NotRelevant instead. - let ancestor = match plan { - LogicalPlan::Projection { .. } | LogicalPlan::Sort { .. } => ancestor, - _ => &Ancestor::NotRelevant, - }; - - let expr = plan.expressions(); - - // apply the optimization to all inputs of the plan - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|plan| { - eliminate_limit(_optimizer, ancestor, plan, _optimizer_config) - }) - .collect::>>()?; - - from_plan(plan, &expr, &new_inputs) } - } -} - -impl OptimizerRule for EliminateLimit { - fn optimize( - &self, - plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, - ) -> Result { - eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config) + utils::optimize_children(self, plan, optimizer_config) } fn name(&self) -> &str { @@ -149,6 +71,7 @@ impl OptimizerRule for EliminateLimit { #[cfg(test)] mod tests { use super::*; + use crate::limit_push_down::LimitPushDown; use crate::test::*; use datafusion_common::Column; use datafusion_expr::{ @@ -157,180 +80,166 @@ mod tests { sum, }; - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = EliminateLimit::new(); - let optimized_plan = rule + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { + let optimized_plan = EliminateLimit::new() .optimize(plan, &mut OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); assert_eq!(plan.schema(), optimized_plan.schema()); + Ok(()) + } + + fn assert_optimized_plan_eq_with_pushdown( + plan: &LogicalPlan, + expected: &str, + ) -> Result<()> { + let optimized_plan = LimitPushDown::new() + .optimize(plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let optimized_plan = EliminateLimit::new() + .optimize(&optimized_plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimized_plan.schema()); + Ok(()) } #[test] - fn limit_0_root() { + fn limit_0_root() -> Result<()> { let table_scan = test_table_scan().unwrap(); let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(0)) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(0))? + .build()?; // No aggregate / scan / limit let expected = "EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn limit_0_nested() { - let table_scan = test_table_scan().unwrap(); + fn limit_0_nested() -> Result<()> { + let table_scan = test_table_scan()?; let plan1 = LogicalPlanBuilder::from(table_scan.clone()) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .build()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(0)) - .unwrap() - .union(plan1) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(0))? + .union(plan1)? + .build()?; // Left side is removed let expected = "Union\ \n EmptyRelation\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn limit_fetch_with_ancestor_limit_skip() { - let table_scan = test_table_scan().unwrap(); + fn limit_fetch_with_ancestor_limit_skip() -> Result<()> { + let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(2)) - .unwrap() - .limit(2, None) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(2))? + .limit(2, None)? + .build()?; // No aggregate / scan / limit - let expected = "Limit: skip=2, fetch=None\ - \n EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); + let expected = "EmptyRelation"; + assert_optimized_plan_eq_with_pushdown(&plan, expected) } #[test] - fn multi_limit_offset_sort_eliminate() { - let table_scan = test_table_scan().unwrap(); + fn multi_limit_offset_sort_eliminate() -> Result<()> { + let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(2)) - .unwrap() - .sort(vec![col("a")]) - .unwrap() - .limit(2, Some(1)) - .unwrap() - .build() - .unwrap(); - + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(2))? + .sort(vec![col("a")])? + .limit(2, Some(1))? + .build()?; + + // After remove global-state, we don't record the parent + // So, bottom don't know parent info, so can't eliminate. let expected = "Limit: skip=2, fetch=1\ - \n Sort: test.a\ - \n EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); + \n Sort: test.a, fetch=3\ + \n Limit: skip=0, fetch=2\ + \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ + \n TableScan: test"; + assert_optimized_plan_eq_with_pushdown(&plan, expected) } #[test] - fn limit_fetch_with_ancestor_limit_fetch() { - let table_scan = test_table_scan().unwrap(); + fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> { + let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(2)) - .unwrap() - .sort(vec![col("a")]) - .unwrap() - .limit(0, Some(1)) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(2))? + .sort(vec![col("a")])? + .limit(0, Some(1))? + .build()?; let expected = "Limit: skip=0, fetch=1\ \n Sort: test.a\ \n Limit: skip=0, fetch=2\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn limit_with_ancestor_limit() { + fn limit_with_ancestor_limit() -> Result<()> { let table_scan = test_table_scan().unwrap(); let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(2, Some(1)) - .unwrap() - .sort(vec![col("a")]) - .unwrap() - .limit(3, Some(1)) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(2, Some(1))? + .sort(vec![col("a")])? + .limit(3, Some(1))? + .build()?; let expected = "Limit: skip=3, fetch=1\ - \n Sort: test.a\ - \n EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); + \n Sort: test.a\ + \n Limit: skip=2, fetch=1\ + \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ + \n TableScan: test"; + assert_optimized_plan_eq(&plan, expected) } #[test] - fn limit_join_with_ancestor_limit() { - let table_scan = test_table_scan().unwrap(); - let table_scan_inner = test_table_scan_with_name("test1").unwrap(); + fn limit_join_with_ancestor_limit() -> Result<()> { + let table_scan = test_table_scan()?; + let table_scan_inner = test_table_scan_with_name("test1")?; let plan = LogicalPlanBuilder::from(table_scan) - .limit(2, Some(1)) - .unwrap() + .limit(2, Some(1))? .join_using( &table_scan_inner, JoinType::Inner, vec![Column::from_name("a".to_string())], - ) - .unwrap() - .limit(3, Some(1)) - .unwrap() - .build() - .unwrap(); + )? + .limit(3, Some(1))? + .build()?; let expected = "Limit: skip=3, fetch=1\ \n Inner Join: Using test.a = test1.a\ \n Limit: skip=2, fetch=1\ \n TableScan: test\ \n TableScan: test1"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn remove_zero_offset() { - let table_scan = test_table_scan().unwrap(); + fn remove_zero_offset() -> Result<()> { + let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, None) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, None)? + .build()?; let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } }