From cdcf6f3b8816b026cba64e28f6240060a993b1ed Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 13 Nov 2022 13:52:00 +0800 Subject: [PATCH 01/10] add propagate_empty_relation --- datafusion/expr/src/logical_plan/plan.rs | 2 +- datafusion/optimizer/src/lib.rs | 1 + datafusion/optimizer/src/optimizer.rs | 2 + .../optimizer/src/propagate_empty_relation.rs | 149 ++++++++++++++++++ 4 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 datafusion/optimizer/src/propagate_empty_relation.rs diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6ad643fd1ae3..17e3466f69fc 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1177,7 +1177,7 @@ pub struct Filter { /// The predicate expression, which must have Boolean type. predicate: Expr, /// The incoming logical plan - input: Arc, + pub input: Arc, } impl Filter { diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 13d4cf4a328a..e62cbbd73103 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -26,6 +26,7 @@ pub mod inline_table_scan; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; +pub mod propagate_empty_relation; pub mod reduce_cross_join; pub mod reduce_outer_join; pub mod scalar_subquery_to_join; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 315f47499f78..978cbb1f68b0 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -27,6 +27,7 @@ use crate::filter_push_down::FilterPushDown; use crate::inline_table_scan::InlineTableScan; use crate::limit_push_down::LimitPushDown; use crate::projection_push_down::ProjectionPushDown; +use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::reduce_cross_join::ReduceCrossJoin; use crate::reduce_outer_join::ReduceOuterJoin; use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate; @@ -162,6 +163,7 @@ impl Optimizer { // subqueries to joins Arc::new(SimplifyExpressions::new()), Arc::new(EliminateFilter::new()), + Arc::new(PropagateEmptyRelation::new()), Arc::new(ReduceCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs new file mode 100644 index 000000000000..0791cdb9aeab --- /dev/null +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::Result; +use datafusion_expr::logical_plan::LogicalPlan; + +use crate::{utils, OptimizerConfig, OptimizerRule}; + +/// Optimization rule that bottom-up to eliminate plan by propagating empty_relation. +#[derive(Default)] +pub struct PropagateEmptyRelation; + +impl PropagateEmptyRelation { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for PropagateEmptyRelation { + fn optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result { + // optimize child plans first + let optimized_children_plan = + utils::optimize_children(self, plan, optimizer_config)?; + let optimized_plan_opt = match optimized_children_plan { + LogicalPlan::Projection(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Window(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Join(_) + | LogicalPlan::CrossJoin(_) + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) => empty_child(&optimized_children_plan), + _ => None, + }; + + match optimized_plan_opt { + Some(optimized_plan) => Ok(optimized_plan), + None => Ok(optimized_children_plan), + } + } + + fn name(&self) -> &str { + "eliminate_limit" + } +} + +fn empty_child(plan: &LogicalPlan) -> Option { + let inputs = plan.inputs(); + + if inputs.len() == 1 { + let input = inputs.get(0)?; + match input { + LogicalPlan::EmptyRelation(empty) => { + if !empty.produce_one_row { + Some((*input).clone()) + } else { + None + } + } + _ => None, + } + } else if inputs.len() == 2 { + let left = inputs.get(0)?; + let right = inputs.get(1)?; + let left_opt = match left { + LogicalPlan::EmptyRelation(empty) => { + if !empty.produce_one_row { + Some((*left).clone()) + } else { + None + } + } + _ => None, + }; + if left_opt.is_some() { + return left_opt; + } + match right { + LogicalPlan::EmptyRelation(empty) => { + if !empty.produce_one_row { + Some((*right).clone()) + } else { + None + } + } + _ => None, + } + } else { + None + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::ScalarValue; + use datafusion_expr::{logical_plan::builder::LogicalPlanBuilder, Expr}; + + use super::*; + + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { + let rule = PropagateEmptyRelation::new(); + let optimized_plan = rule + .optimize(plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimized_plan.schema()); + } + + #[test] + fn propagate_empty() { + let filter_true = Expr::Literal(ScalarValue::Boolean(Some(true))); + + let plan = LogicalPlanBuilder::empty(false) + .filter(filter_true.clone()) + .unwrap() + .filter(filter_true.clone()) + .unwrap() + .filter(filter_true) + .unwrap() + .build() + .unwrap(); + + // No aggregate / scan / limit + let expected = "EmptyRelation"; + assert_optimized_plan_eq(&plan, expected); + } +} From e72f9a9a1e1b2f8daa15b7bbfc04771886d772d4 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 13 Nov 2022 14:34:43 +0800 Subject: [PATCH 02/10] refactor and add more UT. --- .../optimizer/src/propagate_empty_relation.rs | 130 +++++++++++------- 1 file changed, 80 insertions(+), 50 deletions(-) diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 0791cdb9aeab..2a5422fa2fd7 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -17,6 +17,7 @@ use datafusion_common::Result; use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::EmptyRelation; use crate::{utils, OptimizerConfig, OptimizerRule}; @@ -68,44 +69,36 @@ impl OptimizerRule for PropagateEmptyRelation { fn empty_child(plan: &LogicalPlan) -> Option { let inputs = plan.inputs(); - if inputs.len() == 1 { - let input = inputs.get(0)?; - match input { - LogicalPlan::EmptyRelation(empty) => { - if !empty.produce_one_row { - Some((*input).clone()) - } else { - None - } + let contains_empty = match inputs.len() { + 1 => { + let input = inputs.get(0)?; + match input { + LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row, + _ => false, } - _ => None, } - } else if inputs.len() == 2 { - let left = inputs.get(0)?; - let right = inputs.get(1)?; - let left_opt = match left { - LogicalPlan::EmptyRelation(empty) => { - if !empty.produce_one_row { - Some((*left).clone()) - } else { - None - } - } - _ => None, - }; - if left_opt.is_some() { - return left_opt; - } - match right { - LogicalPlan::EmptyRelation(empty) => { - if !empty.produce_one_row { - Some((*right).clone()) - } else { - None - } - } - _ => None, + 2 => { + let left = inputs.get(0)?; + let right = inputs.get(1)?; + + let left_empty = match left { + LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row, + _ => false, + }; + let right_empty = match right { + LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row, + _ => false, + }; + left_empty || right_empty } + _ => false, + }; + + if contains_empty { + Some(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: plan.schema().clone(), + })) } else { None } @@ -113,8 +106,13 @@ fn empty_child(plan: &LogicalPlan) -> Option { #[cfg(test)] mod tests { - use datafusion_common::ScalarValue; - use datafusion_expr::{logical_plan::builder::LogicalPlanBuilder, Expr}; + use crate::eliminate_filter::EliminateFilter; + use crate::test::{test_table_scan, test_table_scan_with_name}; + use datafusion_common::{Column, ScalarValue}; + use datafusion_expr::{ + binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, JoinType, + Operator, + }; use super::*; @@ -128,22 +126,54 @@ mod tests { assert_eq!(plan.schema(), optimized_plan.schema()); } - #[test] - fn propagate_empty() { - let filter_true = Expr::Literal(ScalarValue::Boolean(Some(true))); + fn assert_together_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { + let optimize_one = EliminateFilter::new() + .optimize(plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let optimize_two = PropagateEmptyRelation::new() + .optimize(&optimize_one, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimize_two); + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimize_two.schema()); + } + #[test] + fn propagate_empty() -> Result<()> { let plan = LogicalPlanBuilder::empty(false) - .filter(filter_true.clone()) - .unwrap() - .filter(filter_true.clone()) - .unwrap() - .filter(filter_true) - .unwrap() - .build() - .unwrap(); - - // No aggregate / scan / limit + .filter(Expr::Literal(ScalarValue::Boolean(Some(true))))? + .limit(10, None)? + .project(vec![binary_expr(lit(1), Operator::Plus, lit(1))])? + .build()?; + let expected = "EmptyRelation"; assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn cooperate_with_eliminate_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let left = LogicalPlanBuilder::from(table_scan).build()?; + let right_table_scan = test_table_scan_with_name("test2")?; + let right = LogicalPlanBuilder::from(right_table_scan) + .project(vec![col("a")])? + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .join_using( + &right, + JoinType::Inner, + vec![Column::from_name("a".to_string())], + )? + .filter(col("a").lt_eq(lit(1i64)))? + .build()?; + + let expected = "EmptyRelation"; + assert_together_optimized_plan_eq(&plan, expected); + + Ok(()) } } From 72375ca4cf1f83255c1bfc6e90ac0b2f62377dd2 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 13 Nov 2022 14:56:57 +0800 Subject: [PATCH 03/10] fix --- datafusion/optimizer/src/propagate_empty_relation.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 2a5422fa2fd7..3c340c634079 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -41,11 +41,10 @@ impl OptimizerRule for PropagateEmptyRelation { // optimize child plans first let optimized_children_plan = utils::optimize_children(self, plan, optimizer_config)?; - let optimized_plan_opt = match optimized_children_plan { + let optimized_plan_opt = match &optimized_children_plan { LogicalPlan::Projection(_) | LogicalPlan::Filter(_) | LogicalPlan::Window(_) - | LogicalPlan::Aggregate(_) | LogicalPlan::Sort(_) | LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_) From f90514423cff89b05ec192888b526a84947f4ce7 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 13 Nov 2022 15:28:28 +0800 Subject: [PATCH 04/10] remove pub --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 17e3466f69fc..6ad643fd1ae3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1177,7 +1177,7 @@ pub struct Filter { /// The predicate expression, which must have Boolean type. predicate: Expr, /// The incoming logical plan - pub input: Arc, + input: Arc, } impl Filter { From 704dabfe82398189b3cbcf4cb73e088df6f20dee Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 13 Nov 2022 17:47:45 +0800 Subject: [PATCH 05/10] fix review --- datafusion/optimizer/src/optimizer.rs | 2 +- datafusion/optimizer/src/propagate_empty_relation.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 978cbb1f68b0..c508d8508e01 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -163,10 +163,10 @@ impl Optimizer { // subqueries to joins Arc::new(SimplifyExpressions::new()), Arc::new(EliminateFilter::new()), - Arc::new(PropagateEmptyRelation::new()), Arc::new(ReduceCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), + Arc::new(PropagateEmptyRelation::new()), Arc::new(RewriteDisjunctivePredicate::new()), ]; if config.filter_null_keys { diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 3c340c634079..de40236e6d8e 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -61,7 +61,7 @@ impl OptimizerRule for PropagateEmptyRelation { } fn name(&self) -> &str { - "eliminate_limit" + "propagate_empty_relation" } } From b9d35fc9d8a38178790aa7b4ec34adef22c19446 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 14 Nov 2022 12:47:57 +0800 Subject: [PATCH 06/10] resolve agg, union, repartition, innerjoin. --- datafusion/expr/src/logical_plan/plan.rs | 16 ++ .../optimizer/src/propagate_empty_relation.rs | 221 +++++++++++++++--- 2 files changed, 203 insertions(+), 34 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6ad643fd1ae3..5ab00f1ae21b 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1145,6 +1145,22 @@ impl Projection { }) } + /// Create a new Projection using the specified output schema + pub fn new_from_schema(input: Arc, schema: DFSchemaRef) -> Self { + let expr: Vec = schema + .fields() + .iter() + .map(|field| field.qualified_column()) + .map(Expr::Column) + .collect(); + Self { + expr, + input, + schema, + alias: None, + } + } + pub fn try_from_plan(plan: &LogicalPlan) -> datafusion_common::Result<&Projection> { match plan { LogicalPlan::Projection(it) => Ok(it), diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index de40236e6d8e..a81f294dad95 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; -use datafusion_expr::EmptyRelation; +use datafusion_expr::{EmptyRelation, JoinType, Projection}; +use std::sync::Arc; use crate::{utils, OptimizerConfig, OptimizerRule}; @@ -41,22 +42,88 @@ impl OptimizerRule for PropagateEmptyRelation { // optimize child plans first let optimized_children_plan = utils::optimize_children(self, plan, optimizer_config)?; - let optimized_plan_opt = match &optimized_children_plan { + match &optimized_children_plan { + LogicalPlan::EmptyRelation(_) => Ok(optimized_children_plan), LogicalPlan::Projection(_) | LogicalPlan::Filter(_) | LogicalPlan::Window(_) | LogicalPlan::Sort(_) - | LogicalPlan::Join(_) - | LogicalPlan::CrossJoin(_) - | LogicalPlan::EmptyRelation(_) | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Limit(_) => empty_child(&optimized_children_plan), - _ => None, - }; - - match optimized_plan_opt { - Some(optimized_plan) => Ok(optimized_plan), - None => Ok(optimized_children_plan), + | LogicalPlan::Repartition(_) + | LogicalPlan::Limit(_) => match empty_child(&optimized_children_plan)? { + Some(empty) => Ok(empty), + None => Ok(optimized_children_plan), + }, + LogicalPlan::CrossJoin(_) => { + let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?; + if left_empty || right_empty { + Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: plan.schema().clone(), + })) + } else { + Ok(optimized_children_plan) + } + } + LogicalPlan::Join(join) => { + // TODO: For Join, more join type need to be careful: + // For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty. + // For LeftSemi Join, if the right side is empty, the Join result is empty. + // For LeftAnti Join, if the right side is empty, the Join result is left side(should exclude null ??). + // For RightOuter/RightSemi/RightAnti Join, only the right side is empty, the Join result is empty. + // For RightSemi Join, if the left side is empty, the Join result is empty. + // For RightAnti Join, if the left side is empty, the Join result is right side(should exclude null ??). + // For Full Join, only both sides are empty, the Join result is empty. + // For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side + // columns + right side columns replaced with null values. + // For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side + // columns + left side columns replaced with null values. + if join.join_type == JoinType::Inner { + let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?; + if left_empty || right_empty { + Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: plan.schema().clone(), + })) + } else { + Ok(optimized_children_plan) + } + } else { + Ok(optimized_children_plan) + } + } + LogicalPlan::Union(union) => { + let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?; + if left_empty && right_empty { + Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: plan.schema().clone(), + })) + } else if !left_empty && right_empty { + Ok(LogicalPlan::Projection(Projection::new_from_schema( + Arc::new((**(union.inputs.get(0).unwrap())).clone()), + plan.schema().clone(), + ))) + } else if left_empty && !right_empty { + Ok(LogicalPlan::Projection(Projection::new_from_schema( + Arc::new((**(union.inputs.get(1).unwrap())).clone()), + plan.schema().clone(), + ))) + } else { + Ok(optimized_children_plan) + } + } + LogicalPlan::Aggregate(agg) => { + if !agg.group_expr.is_empty() { + match empty_child(&optimized_children_plan)? { + Some(empty) => Ok(empty), + None => Ok(optimized_children_plan), + } + } else { + Ok(optimized_children_plan) + } + } + _ => Ok(optimized_children_plan), } } @@ -65,20 +132,14 @@ impl OptimizerRule for PropagateEmptyRelation { } } -fn empty_child(plan: &LogicalPlan) -> Option { +fn binary_plan_children_is_empty(plan: &LogicalPlan) -> Result<(bool, bool)> { let inputs = plan.inputs(); - let contains_empty = match inputs.len() { - 1 => { - let input = inputs.get(0)?; - match input { - LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row, - _ => false, - } - } + // all binary-plan need to deal with separately. + match inputs.len() { 2 => { - let left = inputs.get(0)?; - let right = inputs.get(1)?; + let left = inputs.get(0).unwrap(); + let right = inputs.get(1).unwrap(); let left_empty = match left { LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row, @@ -88,18 +149,38 @@ fn empty_child(plan: &LogicalPlan) -> Option { LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row, _ => false, }; - left_empty || right_empty + Ok((left_empty, right_empty)) } - _ => false, - }; + _ => Err(DataFusionError::Plan( + "plan just can have two child".to_string(), + )), + } +} - if contains_empty { - Some(LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: plan.schema().clone(), - })) - } else { - None +fn empty_child(plan: &LogicalPlan) -> Result> { + let inputs = plan.inputs(); + + // all binary-plan need to deal with separately. + match inputs.len() { + 1 => { + let input = inputs.get(0).unwrap(); + match input { + LogicalPlan::EmptyRelation(empty) => { + if !empty.produce_one_row { + Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: plan.schema().clone(), + }))) + } else { + Ok(None) + } + } + _ => Ok(None), + } + } + _ => Err(DataFusionError::Plan( + "plan just can have one child".to_string(), + )), } } @@ -151,6 +232,78 @@ mod tests { Ok(()) } + #[test] + fn propagate_union_right_empty() -> Result<()> { + let table_scan = test_table_scan()?; + let left = LogicalPlanBuilder::from(table_scan).build()?; + let right_table_scan = test_table_scan_with_name("test2")?; + let right = LogicalPlanBuilder::from(right_table_scan) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + + let plan = LogicalPlanBuilder::from(left).union(right)?.build()?; + + let expected = "Projection: a, b, c\ + \n TableScan: test"; + assert_together_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn propagate_union_left_empty() -> Result<()> { + let table_scan = test_table_scan()?; + let left = LogicalPlanBuilder::from(table_scan) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + let right_table_scan = test_table_scan_with_name("test2")?; + let right = LogicalPlanBuilder::from(right_table_scan).build()?; + + let plan = LogicalPlanBuilder::from(left).union(right)?.build()?; + + let expected = "Projection: a, b, c\ + \n TableScan: test2"; + assert_together_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn propagate_union_all_empty() -> Result<()> { + let table_scan = test_table_scan()?; + let left = LogicalPlanBuilder::from(table_scan) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + let right_table_scan = test_table_scan_with_name("test2")?; + let right = LogicalPlanBuilder::from(right_table_scan) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + + let plan = LogicalPlanBuilder::from(left).union(right)?.build()?; + + let expected = "EmptyRelation"; + assert_together_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn cross_join_empty() -> Result<()> { + let table_scan = test_table_scan()?; + let left = LogicalPlanBuilder::from(table_scan).build()?; + let right = LogicalPlanBuilder::empty(false).build()?; + + let plan = LogicalPlanBuilder::from(left) + .cross_join(&right)? + .filter(col("a").lt_eq(lit(1i64)))? + .build()?; + + let expected = "EmptyRelation"; + assert_together_optimized_plan_eq(&plan, expected); + + Ok(()) + } + #[test] fn cooperate_with_eliminate_filter() -> Result<()> { let table_scan = test_table_scan()?; From 0f8bdf38845890404d8be78a945d36c90d22af0e Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 14 Nov 2022 14:35:06 +0800 Subject: [PATCH 07/10] fix multi-union --- datafusion/expr/src/logical_plan/builder.rs | 12 ++ datafusion/expr/src/logical_plan/plan.rs | 8 +- .../optimizer/src/propagate_empty_relation.rs | 160 +++++++++++++----- 3 files changed, 135 insertions(+), 45 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 0782302d736e..afa5592636ab 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -419,6 +419,18 @@ impl LogicalPlanBuilder { Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?)) } + pub fn union_with_alias( + &self, + plan: LogicalPlan, + alias: Option, + ) -> Result { + Ok(Self::from(union_with_alias( + self.plan.clone(), + plan, + alias, + )?)) + } + /// Apply a union, removing duplicate rows pub fn union_distinct(&self, plan: LogicalPlan) -> Result { // unwrap top-level Distincts, to avoid duplication diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 5ab00f1ae21b..0880916b5fc7 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1146,7 +1146,11 @@ impl Projection { } /// Create a new Projection using the specified output schema - pub fn new_from_schema(input: Arc, schema: DFSchemaRef) -> Self { + pub fn new_from_schema( + input: Arc, + schema: DFSchemaRef, + alias: Option, + ) -> Self { let expr: Vec = schema .fields() .iter() @@ -1157,7 +1161,7 @@ impl Projection { expr, input, schema, - alias: None, + alias, } } diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index a81f294dad95..015f2e340edb 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -17,7 +17,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; -use datafusion_expr::{EmptyRelation, JoinType, Projection}; +use datafusion_expr::{EmptyRelation, JoinType, Projection, Union}; use std::sync::Arc; use crate::{utils, OptimizerConfig, OptimizerRule}; @@ -93,24 +93,35 @@ impl OptimizerRule for PropagateEmptyRelation { } } LogicalPlan::Union(union) => { - let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?; - if left_empty && right_empty { + let new_inputs = union + .inputs + .iter() + .filter(|input| match &***input { + LogicalPlan::EmptyRelation(empty) => empty.produce_one_row, + _ => true, + }) + .cloned() + .collect::>(); + + if new_inputs.len() == union.inputs.len() { + Ok(optimized_children_plan) + } else if new_inputs.is_empty() { Ok(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: plan.schema().clone(), })) - } else if !left_empty && right_empty { + } else if new_inputs.len() == 1 { Ok(LogicalPlan::Projection(Projection::new_from_schema( Arc::new((**(union.inputs.get(0).unwrap())).clone()), plan.schema().clone(), - ))) - } else if left_empty && !right_empty { - Ok(LogicalPlan::Projection(Projection::new_from_schema( - Arc::new((**(union.inputs.get(1).unwrap())).clone()), - plan.schema().clone(), + union.alias.clone(), ))) } else { - Ok(optimized_children_plan) + Ok(LogicalPlan::Union(Union { + inputs: new_inputs, + schema: union.schema.clone(), + alias: union.alias.clone(), + })) } } LogicalPlan::Aggregate(agg) => { @@ -188,7 +199,9 @@ fn empty_child(plan: &LogicalPlan) -> Result> { mod tests { use crate::eliminate_filter::EliminateFilter; use crate::test::{test_table_scan, test_table_scan_with_name}; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{Column, ScalarValue}; + use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, JoinType, Operator, @@ -233,11 +246,34 @@ mod tests { } #[test] - fn propagate_union_right_empty() -> Result<()> { + fn cooperate_with_eliminate_filter() -> Result<()> { let table_scan = test_table_scan()?; let left = LogicalPlanBuilder::from(table_scan).build()?; let right_table_scan = test_table_scan_with_name("test2")?; let right = LogicalPlanBuilder::from(right_table_scan) + .project(vec![col("a")])? + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .join_using( + &right, + JoinType::Inner, + vec![Column::from_name("a".to_string())], + )? + .filter(col("a").lt_eq(lit(1i64)))? + .build()?; + + let expected = "EmptyRelation"; + assert_together_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn propagate_union_empty() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan()?).build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?) .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? .build()?; @@ -251,18 +287,27 @@ mod tests { } #[test] - fn propagate_union_left_empty() -> Result<()> { - let table_scan = test_table_scan()?; - let left = LogicalPlanBuilder::from(table_scan) + fn propagate_union_multi_empty() -> Result<()> { + let one = + LogicalPlanBuilder::from(test_table_scan_with_name("test1")?).build()?; + let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?) .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? .build()?; - let right_table_scan = test_table_scan_with_name("test2")?; - let right = LogicalPlanBuilder::from(right_table_scan).build()?; + let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + let four = + LogicalPlanBuilder::from(test_table_scan_with_name("test4")?).build()?; - let plan = LogicalPlanBuilder::from(left).union(right)?.build()?; + let plan = LogicalPlanBuilder::from(one) + .union(two)? + .union(three)? + .union(four)? + .build()?; - let expected = "Projection: a, b, c\ - \n TableScan: test2"; + let expected = "Union\ + \n TableScan: test1\ + \n TableScan: test4"; assert_together_optimized_plan_eq(&plan, expected); Ok(()) @@ -270,16 +315,24 @@ mod tests { #[test] fn propagate_union_all_empty() -> Result<()> { - let table_scan = test_table_scan()?; - let left = LogicalPlanBuilder::from(table_scan) + let one = LogicalPlanBuilder::from(test_table_scan_with_name("test1")?) .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? .build()?; - let right_table_scan = test_table_scan_with_name("test2")?; - let right = LogicalPlanBuilder::from(right_table_scan) + let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; + let four = LogicalPlanBuilder::from(test_table_scan_with_name("test4")?) .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? .build()?; - let plan = LogicalPlanBuilder::from(left).union(right)?.build()?; + let plan = LogicalPlanBuilder::from(one) + .union(two)? + .union(three)? + .union(four)? + .build()?; let expected = "EmptyRelation"; assert_together_optimized_plan_eq(&plan, expected); @@ -288,38 +341,59 @@ mod tests { } #[test] - fn cross_join_empty() -> Result<()> { - let table_scan = test_table_scan()?; - let left = LogicalPlanBuilder::from(table_scan).build()?; - let right = LogicalPlanBuilder::empty(false).build()?; + fn propagate_union_children_different_schema() -> Result<()> { + let one_schema = Schema::new(vec![Field::new("t1a", DataType::UInt32, false)]); + let t1_scan = table_scan(Some("test1"), &one_schema, None)?.build()?; + let one = LogicalPlanBuilder::from(t1_scan) + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?; - let plan = LogicalPlanBuilder::from(left) - .cross_join(&right)? - .filter(col("a").lt_eq(lit(1i64)))? + let two_schema = Schema::new(vec![Field::new("t2a", DataType::UInt32, false)]); + let t2_scan = table_scan(Some("test2"), &two_schema, None)?.build()?; + let two = LogicalPlanBuilder::from(t2_scan).build()?; + + let three_schema = Schema::new(vec![Field::new("t3a", DataType::UInt32, false)]); + let t3_scan = table_scan(Some("test3"), &three_schema, None)?.build()?; + let three = LogicalPlanBuilder::from(t3_scan).build()?; + + let plan = LogicalPlanBuilder::from(one) + .union(two)? + .union(three)? .build()?; - let expected = "EmptyRelation"; + let expected = "Union\ + \n TableScan: test2\ + \n TableScan: test3"; assert_together_optimized_plan_eq(&plan, expected); Ok(()) } #[test] - fn cooperate_with_eliminate_filter() -> Result<()> { - let table_scan = test_table_scan()?; - let left = LogicalPlanBuilder::from(table_scan).build()?; - let right_table_scan = test_table_scan_with_name("test2")?; - let right = LogicalPlanBuilder::from(right_table_scan) - .project(vec![col("a")])? + fn propagate_union_alias() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan()?).build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?) .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? .build()?; let plan = LogicalPlanBuilder::from(left) - .join_using( - &right, - JoinType::Inner, - vec![Column::from_name("a".to_string())], - )? + .union_with_alias(right, Some("union".to_string()))? + .build()?; + + let expected = "Projection: union.a, union.b, union.c, alias=union\ + \n TableScan: test"; + assert_together_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn cross_join_empty() -> Result<()> { + let table_scan = test_table_scan()?; + let left = LogicalPlanBuilder::from(table_scan).build()?; + let right = LogicalPlanBuilder::empty(false).build()?; + + let plan = LogicalPlanBuilder::from(left) + .cross_join(&right)? .filter(col("a").lt_eq(lit(1i64)))? .build()?; From 1545aeca8df1169526e02042946c4a84a9196169 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 14 Nov 2022 22:11:10 +0800 Subject: [PATCH 08/10] when schema same, don't add projection. --- .../optimizer/src/propagate_empty_relation.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 015f2e340edb..0c7fafe4912a 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -111,11 +111,16 @@ impl OptimizerRule for PropagateEmptyRelation { schema: plan.schema().clone(), })) } else if new_inputs.len() == 1 { - Ok(LogicalPlan::Projection(Projection::new_from_schema( - Arc::new((**(union.inputs.get(0).unwrap())).clone()), - plan.schema().clone(), - union.alias.clone(), - ))) + let child = (**(union.inputs.get(0).unwrap())).clone(); + if child.schema().eq(plan.schema()) { + Ok(child) + } else { + Ok(LogicalPlan::Projection(Projection::new_from_schema( + Arc::new(child), + plan.schema().clone(), + union.alias.clone(), + ))) + } } else { Ok(LogicalPlan::Union(Union { inputs: new_inputs, From ab947d95028e2708898e8918ac237e6ec932989a Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 15 Nov 2022 11:26:39 +0800 Subject: [PATCH 09/10] add integration test --- .../optimizer/src/propagate_empty_relation.rs | 16 +++++++++------- datafusion/optimizer/tests/integration-test.rs | 9 +++++++++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 0c7fafe4912a..59f88cef6716 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -55,11 +55,12 @@ impl OptimizerRule for PropagateEmptyRelation { None => Ok(optimized_children_plan), }, LogicalPlan::CrossJoin(_) => { - let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?; + let (left_empty, right_empty) = + binary_plan_children_is_empty(&optimized_children_plan)?; if left_empty || right_empty { Ok(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: plan.schema().clone(), + schema: optimized_children_plan.schema().clone(), })) } else { Ok(optimized_children_plan) @@ -79,11 +80,12 @@ impl OptimizerRule for PropagateEmptyRelation { // For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side // columns + left side columns replaced with null values. if join.join_type == JoinType::Inner { - let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?; + let (left_empty, right_empty) = + binary_plan_children_is_empty(&optimized_children_plan)?; if left_empty || right_empty { Ok(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: plan.schema().clone(), + schema: optimized_children_plan.schema().clone(), })) } else { Ok(optimized_children_plan) @@ -108,16 +110,16 @@ impl OptimizerRule for PropagateEmptyRelation { } else if new_inputs.is_empty() { Ok(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: plan.schema().clone(), + schema: optimized_children_plan.schema().clone(), })) } else if new_inputs.len() == 1 { let child = (**(union.inputs.get(0).unwrap())).clone(); - if child.schema().eq(plan.schema()) { + if child.schema().eq(optimized_children_plan.schema()) { Ok(child) } else { Ok(LogicalPlan::Projection(Projection::new_from_schema( Arc::new(child), - plan.schema().clone(), + optimized_children_plan.schema().clone(), union.alias.clone(), ))) } diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index 779f156c035f..4a2caeb8c302 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -258,6 +258,15 @@ fn timestamp_nano_ts_utc_predicates() { assert_eq!(expected, format!("{:?}", plan)); } +#[test] +fn propagate_empty_relation() { + let sql = "select col_int32 from test join ( select col_int32 from test where false ) as ta1 on test.col_int32 = ta1.col_int32;"; + let plan = test_sql(sql).unwrap(); + // when children exist EmptyRelation, it will bottom-up propagate. + let expected = "EmptyRelation"; + assert_eq!(expected, format!("{:?}", plan)); +} + fn test_sql(sql: &str) -> Result { // parse the SQL let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ... From fbbf7ccd63f7571545bb58f636ef7a6acbc12853 Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 15 Nov 2022 11:29:36 +0800 Subject: [PATCH 10/10] uppercase letter --- datafusion/optimizer/tests/integration-test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index 4a2caeb8c302..7d092b00bba6 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -260,7 +260,7 @@ fn timestamp_nano_ts_utc_predicates() { #[test] fn propagate_empty_relation() { - let sql = "select col_int32 from test join ( select col_int32 from test where false ) as ta1 on test.col_int32 = ta1.col_int32;"; + let sql = "SELECT col_int32 FROM test JOIN ( SELECT col_int32 FROM test WHERE false ) AS ta1 ON test.col_int32 = ta1.col_int32;"; let plan = test_sql(sql).unwrap(); // when children exist EmptyRelation, it will bottom-up propagate. let expected = "EmptyRelation";