From 7e14b2b244dfe39f93129267cd196ac0c5bcb4fe Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 16 Jul 2021 14:20:21 +0800 Subject: [PATCH 1/3] eliminate super-set project Signed-off-by: Ruihang Xia --- datafusion/src/execution/context.rs | 51 ++++++---------- .../src/optimizer/projection_push_down.rs | 61 ++++++++++++++----- 2 files changed, 67 insertions(+), 45 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index d2dcec5f47d7..53aa5cc78968 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1098,22 +1098,18 @@ mod tests { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { - LogicalPlan::TableScan { - source, - projected_schema, - .. - } => { - assert_eq!(source.schema().fields().len(), 3); - assert_eq!(projected_schema.fields().len(), 1); - } - _ => panic!("input to projection should be TableScan"), - }, - _ => panic!("expect optimized_plan to be projection"), + LogicalPlan::TableScan { + source, + projected_schema, + .. + } => { + assert_eq!(source.schema().fields().len(), 3); + assert_eq!(projected_schema.fields().len(), 1); + } + _ => panic!("input to projection should be TableScan"), } - let expected = "Projection: #test.c2\ - \n TableScan: test projection=Some([1])"; + let expected = "TableScan: test projection=Some([1])"; assert_eq!(format!("{:?}", optimized_plan), expected); let physical_plan = ctx.create_physical_plan(&optimized_plan)?; @@ -1171,25 +1167,18 @@ mod tests { let ctx = ExecutionContext::new(); let optimized_plan = ctx.optimize(&plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { - LogicalPlan::TableScan { - source, - projected_schema, - .. - } => { - assert_eq!(source.schema().fields().len(), 3); - assert_eq!(projected_schema.fields().len(), 1); - } - _ => panic!("input to projection should be InMemoryScan"), - }, - _ => panic!("expect optimized_plan to be projection"), + LogicalPlan::TableScan { + source, + projected_schema, + .. + } => { + assert_eq!(source.schema().fields().len(), 3); + assert_eq!(projected_schema.fields().len(), 1); + } + _ => panic!("input to projection should be InMemoryScan"), } - let expected = format!( - "Projection: #{}.b\ - \n TableScan: {} projection=Some([1])", - UNNAMED_TABLE, UNNAMED_TABLE - ); + let expected = format!("TableScan: {} projection=Some([1])", UNNAMED_TABLE); assert_eq!(format!("{:?}", optimized_plan), expected); let physical_plan = ctx.create_physical_plan(&optimized_plan)?; diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 0272b9f7872c..cd542068099c 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -21,7 +21,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::{ - build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, + build_join_schema, Column, DFField, DFSchema, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, ToDFSchema, }; use crate::optimizer::optimizer::OptimizerRule; @@ -45,6 +45,8 @@ impl OptimizerRule for ProjectionPushDown { plan: &LogicalPlan, execution_props: &ExecutionProps, ) -> Result { + println!("before optimize: {:?}", plan); + // set of all columns refered by the plan (and thus considered required by the root) let required_columns = plan .schema() @@ -52,7 +54,10 @@ impl OptimizerRule for ProjectionPushDown { .iter() .map(|f| f.qualified_column()) .collect::>(); - optimize_plan(self, plan, &required_columns, false, execution_props) + let optimized = + optimize_plan(self, plan, &required_columns, false, execution_props)?; + println!("after optimize: {:?}", optimized); + Ok(optimized) } fn name(&self) -> &str { @@ -173,7 +178,26 @@ fn optimize_plan( true, execution_props, )?; - if new_fields.is_empty() { + + let new_required_columns_optimized = new_input + .schema() + .fields() + .iter() + .map(|f| f.qualified_column()) + .collect::>(); + + println!("a:\t{:?}", required_columns); + println!("b:\t{:?}", new_required_columns); + println!("c:\t{:?}", new_required_columns_optimized); + + let is_all_column_expr = + expr.iter().all(|expr| matches!(expr, Expr::Column(_))); + println!("d: {}", is_all_column_expr); + + if new_fields.is_empty() + || (new_required_columns_optimized.is_subset(required_columns) + && is_all_column_expr) + { // no need for an expression at all Ok(new_input) } else { @@ -496,6 +520,20 @@ mod tests { Ok(()) } + #[test] + fn redundunt_project() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b"), col("c")])? + .project(vec![col("a"), col("c"), col("b")])? + .build()?; + + assert_optimized_plan_eq(&plan, "TableScan: test projection=Some([0, 1, 2])"); + + Ok(()) + } + #[test] fn join_schema_trim_full_join_column_projection() -> Result<()> { let table_scan = test_table_scan()?; @@ -644,8 +682,7 @@ mod tests { assert_fields_eq(&plan, vec!["a", "b"]); - let expected = "Projection: #test.a, #test.b\ - \n TableScan: test projection=Some([0, 1])"; + let expected = "TableScan: test projection=Some([0, 1])"; assert_optimized_plan_eq(&plan, expected); @@ -695,8 +732,7 @@ mod tests { assert_fields_eq(&plan, vec!["c", "a"]); let expected = "Limit: 5\ - \n Projection: #test.c, #test.a\ - \n TableScan: test projection=Some([0, 2])"; + \n TableScan: test projection=Some([0, 2])"; assert_optimized_plan_eq(&plan, expected); @@ -744,8 +780,7 @@ mod tests { let expected = "\ Aggregate: groupBy=[[#test.c]], aggr=[[MAX(#test.a)]]\ \n Filter: #test.c Gt Int32(1)\ - \n Projection: #test.c, #test.a\ - \n TableScan: test projection=Some([0, 2])"; + \n TableScan: test projection=Some([0, 2])"; assert_optimized_plan_eq(&plan, expected); @@ -812,11 +847,9 @@ mod tests { assert_fields_eq(&plan, vec!["c", "a", "MAX(test.b)"]); - let expected = "\ - Projection: #test.c, #test.a, #MAX(test.b)\ - \n Filter: #test.c Gt Int32(1)\ - \n Aggregate: groupBy=[[#test.a, #test.c]], aggr=[[MAX(#test.b)]]\ - \n TableScan: test projection=Some([0, 1, 2])"; + let expected = "Filter: #test.c Gt Int32(1)\ + \n Aggregate: groupBy=[[#test.a, #test.c]], aggr=[[MAX(#test.b)]]\ + \n TableScan: test projection=Some([0, 1, 2])"; assert_optimized_plan_eq(&plan, expected); From d872788fe050494203634379297bf557a8930597 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 18 Jul 2021 18:17:44 +0800 Subject: [PATCH 2/3] keep projection right before table scan Signed-off-by: Ruihang Xia --- datafusion/src/execution/context.rs | 52 ++-- .../src/optimizer/projection_push_down.rs | 247 +++++++++++++++++- 2 files changed, 267 insertions(+), 32 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 53aa5cc78968..9625959cd4ae 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1098,18 +1098,22 @@ mod tests { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::TableScan { - source, - projected_schema, - .. - } => { - assert_eq!(source.schema().fields().len(), 3); - assert_eq!(projected_schema.fields().len(), 1); - } - _ => panic!("input to projection should be TableScan"), + LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::TableScan { + source, + projected_schema, + .. + } => { + assert_eq!(source.schema().fields().len(), 3); + assert_eq!(projected_schema.fields().len(), 1); + } + _ => panic!("input to projection should be TableScan"), + }, + _ => panic!("expect optimized_plan to be projection"), } - let expected = "TableScan: test projection=Some([1])"; + let expected = "Projection: #test.c2\ + \n TableScan: test projection=Some([1])"; assert_eq!(format!("{:?}", optimized_plan), expected); let physical_plan = ctx.create_physical_plan(&optimized_plan)?; @@ -1167,18 +1171,26 @@ mod tests { let ctx = ExecutionContext::new(); let optimized_plan = ctx.optimize(&plan)?; match &optimized_plan { - LogicalPlan::TableScan { - source, - projected_schema, - .. - } => { - assert_eq!(source.schema().fields().len(), 3); - assert_eq!(projected_schema.fields().len(), 1); - } - _ => panic!("input to projection should be InMemoryScan"), + LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::TableScan { + source, + projected_schema, + .. + } => { + assert_eq!(source.schema().fields().len(), 3); + assert_eq!(projected_schema.fields().len(), 1); + } + _ => panic!("input to projection should be InMemoryScan"), + }, + _ => panic!("expect optimized_plan to be projection"), } - let expected = format!("TableScan: {} projection=Some([1])", UNNAMED_TABLE); + // let expected = format!("TableScan: {} projection=Some([1])", UNNAMED_TABLE); + let expected = format!( + "Projection: #{}.b\ + \n TableScan: {} projection=Some([1])", + UNNAMED_TABLE, UNNAMED_TABLE + ); assert_eq!(format!("{:?}", optimized_plan), expected); let physical_plan = ctx.create_physical_plan(&optimized_plan)?; diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index cd542068099c..3fa1c6b048c8 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -56,7 +56,13 @@ impl OptimizerRule for ProjectionPushDown { .collect::>(); let optimized = optimize_plan(self, plan, &required_columns, false, execution_props)?; - println!("after optimize: {:?}", optimized); + println!("after optimize:\n{:?}", optimized); + + // let eliminated = + // eliminate_projection(&optimized, plan.expressions(), &required_columns)?; + // println!("after eliminated:\n{:?}", eliminated); + // Ok(eliminated) + Ok(optimized) } @@ -132,6 +138,178 @@ fn get_projected_schema( Ok((projection, projected_fields.to_dfschema_ref()?)) } +// check and remove redundent projection. +fn eliminate_projection( + plan: &LogicalPlan, + new_expr: Vec, + // new_fields: Vec, + required_columns: &HashSet, +) -> Result { + let is_all_column_expr = new_expr.iter().all(|expr| matches!(expr, Expr::Column(_))); + println!("all column: {}", is_all_column_expr); + + println!("enter plan: {:?}", plan); + + // if &new_required_columns_optimized == required_columns + // && is_all_column_expr + // && !matches!(new_input, LogicalPlan::TableScan { .. }) + // { + // return Ok(plan); + // } + + match plan { + LogicalPlan::Projection { + input, + expr, + schema, + } => { + let new_required_columns = input + .schema() + .fields() + .iter() + .map(|f| f.qualified_column()) + .collect::>(); + + // println!("new fields: {:?}", new_fields); + + // if &new_required_columns != required_columns { + // if schema.fields() == input.schema().fields() { + if required_columns == &new_required_columns { + return Ok(eliminate_projection(input, new_expr, required_columns)?); + } else { + let expr = expr.clone(); + let new_input = + eliminate_projection(input, expr.clone(), required_columns)?; + return Ok(LogicalPlan::Projection { + input: Arc::new(new_input), + expr, + schema: schema.clone(), + }); + } + + // Ok(( + // true, + // LogicalPlan::Projection { + // input: input.clone(), + // expr: new_expr, + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // }, + // )) + } + LogicalPlan::Window { + input, + window_expr, + schema, + } => + // + // { + // Ok(LogicalPlan::Window { + // input: input.clone(), + // window_expr: window_expr.clone(), + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // }) + // } + { + // eliminate_projection(input, new_expr, new_fields, required_columns) + eliminate_projection(plan, new_expr, required_columns) + } + LogicalPlan::Aggregate { + input, + group_expr, + aggr_expr, + schema, + } => todo!(), + // Ok(LogicalPlan::Aggregate { + // input: input.clone(), + // group_expr: group_expr.clone(), + // aggr_expr: aggr_expr.clone(), + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // }), + LogicalPlan::Join { + left, + right, + on, + join_type, + join_constraint, + schema, + } => todo!(), + // Ok(LogicalPlan::Join { + // left: left.clone(), + // right: right.clone(), + // on: on.clone(), + // join_type: join_type.clone(), + // join_constraint: join_constraint.clone(), + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // }), + LogicalPlan::CrossJoin { + left, + right, + schema, + } => todo!(), + // Ok(LogicalPlan::CrossJoin { + // left: left.clone(), + // right: right.clone(), + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // }), + LogicalPlan::Union { + inputs, + schema, + alias, + } => todo!(), + // Ok(LogicalPlan::Union { + // inputs: inputs.clone(), + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // alias: alias.clone(), + // }), + LogicalPlan::TableScan { .. } => Ok(plan.to_owned()), + LogicalPlan::EmptyRelation { + produce_one_row, + schema, + } => todo!(), + // Ok(LogicalPlan::EmptyRelation { + // produce_one_row: *produce_one_row, + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // }), + LogicalPlan::Explain { + verbose, + plan, + stringified_plans, + schema, + } => todo!(), + // Ok(LogicalPlan::Explain { + // verbose: *verbose, + // plan: plan.clone(), + // stringified_plans: stringified_plans.clone(), + // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), + // }), + LogicalPlan::Filter { .. } + | LogicalPlan::Extension { .. } + | LogicalPlan::Sort { .. } + | LogicalPlan::Repartition { .. } + | LogicalPlan::Limit { .. } + | LogicalPlan::CreateExternalTable { .. } => + // Ok(( + // true, + // utils::from_plan(plan, &plan.expressions(), &plan.inputs())?, + // )), + { + let expr = plan.expressions(); + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|input_plan| { + eliminate_projection(input_plan, new_expr.clone(), required_columns) + }) + .collect::>>()?; + // eliminate_projection(plan, new_expr, new_fields, required_columns) + + utils::from_plan(plan, &expr, &new_inputs) + } + } + + // todo!() +} + /// Recursively transverses the logical plan removing expressions and that are not needed. fn optimize_plan( optimizer: &ProjectionPushDown, @@ -194,12 +372,13 @@ fn optimize_plan( expr.iter().all(|expr| matches!(expr, Expr::Column(_))); println!("d: {}", is_all_column_expr); - if new_fields.is_empty() - || (new_required_columns_optimized.is_subset(required_columns) - && is_all_column_expr) - { + if new_fields.is_empty() { // no need for an expression at all Ok(new_input) + } else if &new_required_columns_optimized == required_columns + && has_projection + { + Ok(new_input) } else { Ok(LogicalPlan::Projection { expr: new_expr, @@ -528,8 +707,48 @@ mod tests { .project(vec![col("a"), col("b"), col("c")])? .project(vec![col("a"), col("c"), col("b")])? .build()?; + let expected = "Projection: #test.a, #test.c, #test.b\ + \n TableScan: test projection=Some([0, 1, 2])"; - assert_optimized_plan_eq(&plan, "TableScan: test projection=Some([0, 1, 2])"); + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn reorder_projection() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("c"), col("b"), col("a")])? + .build()?; + let expected = "Projection: #test.c, #test.b, #test.a\ + \n TableScan: test projection=Some([0, 1, 2])"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn noncontiguous_redundunt_projection() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("c"), col("b"), col("a")])? + .filter(col("c").gt(lit(1)))? + .project(vec![col("c"), col("a"), col("b")])? + .filter(col("b").gt(lit(1)))? + .filter(col("a").gt(lit(1)))? + .project(vec![col("a"), col("c"), col("b")])? + .build()?; + let expected = "Projection: #test.a, #test.c, #test.b\ + \n Filter: #test.a Gt Int32(1)\ + \n Filter: #test.b Gt Int32(1)\ + \n Filter: #test.c Gt Int32(1)\ + \n TableScan: test projection=Some([0, 1, 2])"; + + assert_optimized_plan_eq(&plan, expected); Ok(()) } @@ -682,7 +901,8 @@ mod tests { assert_fields_eq(&plan, vec!["a", "b"]); - let expected = "TableScan: test projection=Some([0, 1])"; + let expected = "Projection: #test.a, #test.b\ + \n TableScan: test projection=Some([0, 1])"; assert_optimized_plan_eq(&plan, expected); @@ -732,7 +952,8 @@ mod tests { assert_fields_eq(&plan, vec!["c", "a"]); let expected = "Limit: 5\ - \n TableScan: test projection=Some([0, 2])"; + \n Projection: #test.c, #test.a\ + \n TableScan: test projection=Some([0, 2])"; assert_optimized_plan_eq(&plan, expected); @@ -780,7 +1001,8 @@ mod tests { let expected = "\ Aggregate: groupBy=[[#test.c]], aggr=[[MAX(#test.a)]]\ \n Filter: #test.c Gt Int32(1)\ - \n TableScan: test projection=Some([0, 2])"; + \n Projection: #test.c, #test.a\ + \n TableScan: test projection=Some([0, 2])"; assert_optimized_plan_eq(&plan, expected); @@ -847,9 +1069,10 @@ mod tests { assert_fields_eq(&plan, vec!["c", "a", "MAX(test.b)"]); - let expected = "Filter: #test.c Gt Int32(1)\ - \n Aggregate: groupBy=[[#test.a, #test.c]], aggr=[[MAX(#test.b)]]\ - \n TableScan: test projection=Some([0, 1, 2])"; + let expected = "Projection: #test.c, #test.a, #MAX(test.b)\ + \n Filter: #test.c Gt Int32(1)\ + \n Aggregate: groupBy=[[#test.a, #test.c]], aggr=[[MAX(#test.b)]]\ + \n TableScan: test projection=Some([0, 1, 2])"; assert_optimized_plan_eq(&plan, expected); From ab5f67ca9ada7e9722bcc4d6e1029111765d054e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 18 Jul 2021 19:35:39 +0800 Subject: [PATCH 3/3] tidy Signed-off-by: Ruihang Xia --- datafusion/src/execution/context.rs | 1 - .../src/optimizer/projection_push_down.rs | 203 +----------------- 2 files changed, 5 insertions(+), 199 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 9625959cd4ae..d2dcec5f47d7 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1185,7 +1185,6 @@ mod tests { _ => panic!("expect optimized_plan to be projection"), } - // let expected = format!("TableScan: {} projection=Some([1])", UNNAMED_TABLE); let expected = format!( "Projection: #{}.b\ \n TableScan: {} projection=Some([1])", diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 3fa1c6b048c8..089dca2318c9 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -21,7 +21,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::{ - build_join_schema, Column, DFField, DFSchema, DFSchemaRef, Expr, LogicalPlan, + build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, }; use crate::optimizer::optimizer::OptimizerRule; @@ -45,8 +45,6 @@ impl OptimizerRule for ProjectionPushDown { plan: &LogicalPlan, execution_props: &ExecutionProps, ) -> Result { - println!("before optimize: {:?}", plan); - // set of all columns refered by the plan (and thus considered required by the root) let required_columns = plan .schema() @@ -54,16 +52,7 @@ impl OptimizerRule for ProjectionPushDown { .iter() .map(|f| f.qualified_column()) .collect::>(); - let optimized = - optimize_plan(self, plan, &required_columns, false, execution_props)?; - println!("after optimize:\n{:?}", optimized); - - // let eliminated = - // eliminate_projection(&optimized, plan.expressions(), &required_columns)?; - // println!("after eliminated:\n{:?}", eliminated); - // Ok(eliminated) - - Ok(optimized) + optimize_plan(self, plan, &required_columns, false, execution_props) } fn name(&self) -> &str { @@ -138,178 +127,6 @@ fn get_projected_schema( Ok((projection, projected_fields.to_dfschema_ref()?)) } -// check and remove redundent projection. -fn eliminate_projection( - plan: &LogicalPlan, - new_expr: Vec, - // new_fields: Vec, - required_columns: &HashSet, -) -> Result { - let is_all_column_expr = new_expr.iter().all(|expr| matches!(expr, Expr::Column(_))); - println!("all column: {}", is_all_column_expr); - - println!("enter plan: {:?}", plan); - - // if &new_required_columns_optimized == required_columns - // && is_all_column_expr - // && !matches!(new_input, LogicalPlan::TableScan { .. }) - // { - // return Ok(plan); - // } - - match plan { - LogicalPlan::Projection { - input, - expr, - schema, - } => { - let new_required_columns = input - .schema() - .fields() - .iter() - .map(|f| f.qualified_column()) - .collect::>(); - - // println!("new fields: {:?}", new_fields); - - // if &new_required_columns != required_columns { - // if schema.fields() == input.schema().fields() { - if required_columns == &new_required_columns { - return Ok(eliminate_projection(input, new_expr, required_columns)?); - } else { - let expr = expr.clone(); - let new_input = - eliminate_projection(input, expr.clone(), required_columns)?; - return Ok(LogicalPlan::Projection { - input: Arc::new(new_input), - expr, - schema: schema.clone(), - }); - } - - // Ok(( - // true, - // LogicalPlan::Projection { - // input: input.clone(), - // expr: new_expr, - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // }, - // )) - } - LogicalPlan::Window { - input, - window_expr, - schema, - } => - // - // { - // Ok(LogicalPlan::Window { - // input: input.clone(), - // window_expr: window_expr.clone(), - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // }) - // } - { - // eliminate_projection(input, new_expr, new_fields, required_columns) - eliminate_projection(plan, new_expr, required_columns) - } - LogicalPlan::Aggregate { - input, - group_expr, - aggr_expr, - schema, - } => todo!(), - // Ok(LogicalPlan::Aggregate { - // input: input.clone(), - // group_expr: group_expr.clone(), - // aggr_expr: aggr_expr.clone(), - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // }), - LogicalPlan::Join { - left, - right, - on, - join_type, - join_constraint, - schema, - } => todo!(), - // Ok(LogicalPlan::Join { - // left: left.clone(), - // right: right.clone(), - // on: on.clone(), - // join_type: join_type.clone(), - // join_constraint: join_constraint.clone(), - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // }), - LogicalPlan::CrossJoin { - left, - right, - schema, - } => todo!(), - // Ok(LogicalPlan::CrossJoin { - // left: left.clone(), - // right: right.clone(), - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // }), - LogicalPlan::Union { - inputs, - schema, - alias, - } => todo!(), - // Ok(LogicalPlan::Union { - // inputs: inputs.clone(), - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // alias: alias.clone(), - // }), - LogicalPlan::TableScan { .. } => Ok(plan.to_owned()), - LogicalPlan::EmptyRelation { - produce_one_row, - schema, - } => todo!(), - // Ok(LogicalPlan::EmptyRelation { - // produce_one_row: *produce_one_row, - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // }), - LogicalPlan::Explain { - verbose, - plan, - stringified_plans, - schema, - } => todo!(), - // Ok(LogicalPlan::Explain { - // verbose: *verbose, - // plan: plan.clone(), - // stringified_plans: stringified_plans.clone(), - // schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - // }), - LogicalPlan::Filter { .. } - | LogicalPlan::Extension { .. } - | LogicalPlan::Sort { .. } - | LogicalPlan::Repartition { .. } - | LogicalPlan::Limit { .. } - | LogicalPlan::CreateExternalTable { .. } => - // Ok(( - // true, - // utils::from_plan(plan, &plan.expressions(), &plan.inputs())?, - // )), - { - let expr = plan.expressions(); - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|input_plan| { - eliminate_projection(input_plan, new_expr.clone(), required_columns) - }) - .collect::>>()?; - // eliminate_projection(plan, new_expr, new_fields, required_columns) - - utils::from_plan(plan, &expr, &new_inputs) - } - } - - // todo!() -} - /// Recursively transverses the logical plan removing expressions and that are not needed. fn optimize_plan( optimizer: &ProjectionPushDown, @@ -364,20 +181,10 @@ fn optimize_plan( .map(|f| f.qualified_column()) .collect::>(); - println!("a:\t{:?}", required_columns); - println!("b:\t{:?}", new_required_columns); - println!("c:\t{:?}", new_required_columns_optimized); - - let is_all_column_expr = - expr.iter().all(|expr| matches!(expr, Expr::Column(_))); - println!("d: {}", is_all_column_expr); - - if new_fields.is_empty() { - // no need for an expression at all - Ok(new_input) - } else if &new_required_columns_optimized == required_columns - && has_projection + if new_fields.is_empty() + || (has_projection && &new_required_columns_optimized == required_columns) { + // no need for an expression at all Ok(new_input) } else { Ok(LogicalPlan::Projection {