From 3f870c4cf6a08a56b1de752c6869c5cc162bd857 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 23 Jun 2021 18:05:16 +0800 Subject: [PATCH] fix 593 --- .../core/src/serde/logical_plan/from_proto.rs | 18 +++---- datafusion/src/execution/context.rs | 4 +- datafusion/src/execution/dataframe_impl.rs | 22 ++++---- datafusion/src/logical_plan/builder.rs | 36 ++++++------- datafusion/src/optimizer/constant_folding.rs | 28 +++++----- datafusion/src/optimizer/eliminate_limit.rs | 6 +-- datafusion/src/optimizer/filter_push_down.rs | 54 +++++++++---------- .../src/optimizer/hash_build_probe_order.rs | 3 +- datafusion/src/optimizer/limit_push_down.rs | 12 ++--- .../src/optimizer/projection_push_down.rs | 28 +++++----- .../src/optimizer/simplify_expressions.rs | 4 +- datafusion/src/optimizer/utils.rs | 2 +- datafusion/src/sql/planner.rs | 54 +++++++++---------- datafusion/tests/custom_sources.rs | 2 +- 14 files changed, 137 insertions(+), 136 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 418d60de3e7a..6d82629016d5 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -61,14 +61,14 @@ impl TryInto for &protobuf::LogicalPlanNode { .iter() .map(|expr| expr.try_into()) .collect::, _>>()?; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .project(x)? .build() .map_err(|e| e.into()) } LogicalPlanType::Selection(selection) => { let input: LogicalPlan = convert_box_required!(selection.input)?; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .filter( selection .expr @@ -86,7 +86,7 @@ impl TryInto for &protobuf::LogicalPlanNode { .iter() .map(|expr| expr.try_into()) .collect::, _>>()?; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .window(window_expr)? .build() .map_err(|e| e.into()) @@ -103,7 +103,7 @@ impl TryInto for &protobuf::LogicalPlanNode { .iter() .map(|expr| expr.try_into()) .collect::, _>>()?; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .aggregate(group_expr, aggr_expr)? .build() .map_err(|e| e.into()) @@ -162,7 +162,7 @@ impl TryInto for &protobuf::LogicalPlanNode { .iter() .map(|expr| expr.try_into()) .collect::, _>>()?; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .sort(sort_expr)? .build() .map_err(|e| e.into()) @@ -193,7 +193,7 @@ impl TryInto for &protobuf::LogicalPlanNode { } }; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .repartition(partitioning_scheme)? .build() .map_err(|e| e.into()) @@ -223,14 +223,14 @@ impl TryInto for &protobuf::LogicalPlanNode { } LogicalPlanType::Explain(explain) => { let input: LogicalPlan = convert_box_required!(explain.input)?; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .explain(explain.verbose)? .build() .map_err(|e| e.into()) } LogicalPlanType::Limit(limit) => { let input: LogicalPlan = convert_box_required!(limit.input)?; - LogicalPlanBuilder::from(&input) + LogicalPlanBuilder::from(input) .limit(limit.limit as usize)? .build() .map_err(|e| e.into()) @@ -255,7 +255,7 @@ impl TryInto for &protobuf::LogicalPlanNode { protobuf::JoinType::Semi => JoinType::Semi, protobuf::JoinType::Anti => JoinType::Anti, }; - LogicalPlanBuilder::from(&convert_box_required!(join.left)?) + LogicalPlanBuilder::from(convert_box_required!(join.left)?) .join( &convert_box_required!(join.right)?, join_type, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 165263084cc7..8ce408de86a5 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1073,7 +1073,7 @@ mod tests { let ctx = create_ctx(&tmp_dir, partition_count)?; let table = ctx.table("test")?; - let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan()) + let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()) .project(vec![col("c2")])? .build()?; @@ -2566,7 +2566,7 @@ mod tests { let t = ctx.table("t")?; - let plan = LogicalPlanBuilder::from(&t.to_logical_plan()) + let plan = LogicalPlanBuilder::from(t.to_logical_plan()) .project(vec![ col("a"), col("b"), diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 99eb7f077c96..7cf779740c47 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -63,7 +63,7 @@ impl DataFrame for DataFrameImpl { /// Create a projection based on arbitrary expressions fn select(&self, expr_list: Vec) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan) + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) .project(expr_list)? .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) @@ -71,7 +71,7 @@ impl DataFrame for DataFrameImpl { /// Create a filter based on a predicate expression fn filter(&self, predicate: Expr) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan) + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) .filter(predicate)? .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) @@ -83,7 +83,7 @@ impl DataFrame for DataFrameImpl { group_expr: Vec, aggr_expr: Vec, ) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan) + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) .aggregate(group_expr, aggr_expr)? .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) @@ -91,13 +91,17 @@ impl DataFrame for DataFrameImpl { /// Limit the number of rows fn limit(&self, n: usize) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan).limit(n)?.build()?; + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) + .limit(n)? + .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) } /// Sort by specified sorting expressions fn sort(&self, expr: Vec) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?; + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) + .sort(expr)? + .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) } @@ -109,7 +113,7 @@ impl DataFrame for DataFrameImpl { left_cols: &[&str], right_cols: &[&str], ) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan) + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) .join( &right.to_logical_plan(), join_type, @@ -124,7 +128,7 @@ impl DataFrame for DataFrameImpl { &self, partitioning_scheme: Partitioning, ) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan) + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) .repartition(partitioning_scheme)? .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) @@ -161,7 +165,7 @@ impl DataFrame for DataFrameImpl { } fn explain(&self, verbose: bool) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan) + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) .explain(verbose)? .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) @@ -173,7 +177,7 @@ impl DataFrame for DataFrameImpl { } fn union(&self, dataframe: Arc) -> Result> { - let plan = LogicalPlanBuilder::from(&self.plan) + let plan = LogicalPlanBuilder::from(self.to_logical_plan()) .union(dataframe.to_logical_plan())? .build()?; Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 147f8322df5d..640b37f66b21 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -89,15 +89,15 @@ pub struct LogicalPlanBuilder { impl LogicalPlanBuilder { /// Create a builder from an existing plan - pub fn from(plan: &LogicalPlan) -> Self { - Self { plan: plan.clone() } + pub fn from(plan: LogicalPlan) -> Self { + Self { plan } } /// Create an empty relation. /// /// `produce_one_row` set to true means this empty node needs to produce a placeholder row. pub fn empty(produce_one_row: bool) -> Self { - Self::from(&LogicalPlan::EmptyRelation { + Self::from(LogicalPlan::EmptyRelation { produce_one_row, schema: DFSchemaRef::new(DFSchema::empty()), }) @@ -182,7 +182,7 @@ impl LogicalPlanBuilder { limit: None, }; - Ok(Self::from(&table_scan)) + Ok(Self::from(table_scan)) } /// Apply a projection. @@ -214,7 +214,7 @@ impl LogicalPlanBuilder { let schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?; - Ok(Self::from(&LogicalPlan::Projection { + Ok(Self::from(LogicalPlan::Projection { expr: projected_expr, input: Arc::new(self.plan.clone()), schema: DFSchemaRef::new(schema), @@ -224,7 +224,7 @@ impl LogicalPlanBuilder { /// Apply a filter pub fn filter(&self, expr: Expr) -> Result { let expr = normalize_col(expr, &self.plan.all_schemas())?; - Ok(Self::from(&LogicalPlan::Filter { + Ok(Self::from(LogicalPlan::Filter { predicate: expr, input: Arc::new(self.plan.clone()), })) @@ -232,7 +232,7 @@ impl LogicalPlanBuilder { /// Apply a limit pub fn limit(&self, n: usize) -> Result { - Ok(Self::from(&LogicalPlan::Limit { + Ok(Self::from(LogicalPlan::Limit { n, input: Arc::new(self.plan.clone()), })) @@ -241,7 +241,7 @@ impl LogicalPlanBuilder { /// Apply a sort pub fn sort(&self, exprs: impl IntoIterator) -> Result { let schemas = self.plan.all_schemas(); - Ok(Self::from(&LogicalPlan::Sort { + Ok(Self::from(LogicalPlan::Sort { expr: normalize_cols(exprs, &schemas)?, input: Arc::new(self.plan.clone()), })) @@ -249,11 +249,7 @@ impl LogicalPlanBuilder { /// Apply a union pub fn union(&self, plan: LogicalPlan) -> Result { - Ok(Self::from(&union_with_alias( - self.plan.clone(), - plan, - None, - )?)) + Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?)) } /// Apply a join with on constraint @@ -287,7 +283,7 @@ impl LogicalPlanBuilder { &JoinConstraint::On, )?; - Ok(Self::from(&LogicalPlan::Join { + Ok(Self::from(LogicalPlan::Join { left: Arc::new(self.plan.clone()), right: Arc::new(right.clone()), on, @@ -323,7 +319,7 @@ impl LogicalPlanBuilder { &JoinConstraint::Using, )?; - Ok(Self::from(&LogicalPlan::Join { + Ok(Self::from(LogicalPlan::Join { left: Arc::new(self.plan.clone()), right: Arc::new(right.clone()), on, @@ -336,7 +332,7 @@ impl LogicalPlanBuilder { /// Apply a cross join pub fn cross_join(&self, right: &LogicalPlan) -> Result { let schema = self.plan.schema().join(right.schema())?; - Ok(Self::from(&LogicalPlan::CrossJoin { + Ok(Self::from(LogicalPlan::CrossJoin { left: Arc::new(self.plan.clone()), right: Arc::new(right.clone()), schema: DFSchemaRef::new(schema), @@ -345,7 +341,7 @@ impl LogicalPlanBuilder { /// Repartition pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result { - Ok(Self::from(&LogicalPlan::Repartition { + Ok(Self::from(LogicalPlan::Repartition { input: Arc::new(self.plan.clone()), partitioning_scheme, })) @@ -359,7 +355,7 @@ impl LogicalPlanBuilder { let mut window_fields: Vec = exprlist_to_fields(all_expr, self.plan.schema())?; window_fields.extend_from_slice(self.plan.schema().fields()); - Ok(Self::from(&LogicalPlan::Window { + Ok(Self::from(LogicalPlan::Window { input: Arc::new(self.plan.clone()), window_expr, schema: Arc::new(DFSchema::new(window_fields)?), @@ -384,7 +380,7 @@ impl LogicalPlanBuilder { let aggr_schema = DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?; - Ok(Self::from(&LogicalPlan::Aggregate { + Ok(Self::from(LogicalPlan::Aggregate { input: Arc::new(self.plan.clone()), group_expr, aggr_expr, @@ -401,7 +397,7 @@ impl LogicalPlanBuilder { let schema = LogicalPlan::explain_schema(); - Ok(Self::from(&LogicalPlan::Explain { + Ok(Self::from(LogicalPlan::Explain { verbose, plan: Arc::new(self.plan.clone()), stringified_plans, diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index 956f74adc28f..79833df66129 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -544,7 +544,7 @@ mod tests { #[test] fn optimize_plan_eq_expr() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(col("b").eq(lit(true)))? .filter(col("c").eq(lit(false)))? .project(vec![col("a")])? @@ -563,7 +563,7 @@ mod tests { #[test] fn optimize_plan_not_eq_expr() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(col("b").not_eq(lit(true)))? .filter(col("c").not_eq(lit(false)))? .limit(1)? @@ -584,7 +584,7 @@ mod tests { #[test] fn optimize_plan_and_expr() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(col("b").not_eq(lit(true)).and(col("c").eq(lit(true))))? .project(vec![col("a")])? .build()?; @@ -601,7 +601,7 @@ mod tests { #[test] fn optimize_plan_or_expr() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(col("b").not_eq(lit(true)).or(col("c").eq(lit(false))))? .project(vec![col("a")])? .build()?; @@ -618,7 +618,7 @@ mod tests { #[test] fn optimize_plan_not_expr() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(col("b").eq(lit(false)).not())? .project(vec![col("a")])? .build()?; @@ -635,7 +635,7 @@ mod tests { #[test] fn optimize_plan_support_projection() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("d"), col("b").eq(lit(false))])? .build()?; @@ -650,7 +650,7 @@ mod tests { #[test] fn optimize_plan_support_aggregate() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("c"), col("b")])? .aggregate( vec![col("a"), col("c")], @@ -691,7 +691,7 @@ mod tests { )))], fun: BuiltinScalarFunction::ToTimestamp, }]; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(proj) .unwrap() .build() @@ -713,7 +713,7 @@ mod tests { )))], fun: BuiltinScalarFunction::ToTimestamp, }]; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(proj) .unwrap() .build() @@ -732,7 +732,7 @@ mod tests { args: vec![], fun: BuiltinScalarFunction::ToTimestamp, }]; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(proj) .unwrap() .build() @@ -751,7 +751,7 @@ mod tests { expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some("0".to_string())))), data_type: DataType::Int32, }]; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(proj) .unwrap() .build() @@ -770,7 +770,7 @@ mod tests { expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some("".to_string())))), data_type: DataType::Int32, }]; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(proj) .unwrap() .build() @@ -790,7 +790,7 @@ mod tests { fun: BuiltinScalarFunction::Now, }]; let time = chrono::Utc::now(); - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(proj) .unwrap() .build() @@ -823,7 +823,7 @@ mod tests { "t2".to_string(), ), ]; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(proj) .unwrap() .build() diff --git a/datafusion/src/optimizer/eliminate_limit.rs b/datafusion/src/optimizer/eliminate_limit.rs index 4b5a634889a7..bf3f2b3be283 100644 --- a/datafusion/src/optimizer/eliminate_limit.rs +++ b/datafusion/src/optimizer/eliminate_limit.rs @@ -88,7 +88,7 @@ mod tests { #[test] fn limit_0_root() { let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b"))]) .unwrap() .limit(0) @@ -104,12 +104,12 @@ mod tests { #[test] fn limit_0_nested() { let table_scan = test_table_scan().unwrap(); - let plan1 = LogicalPlanBuilder::from(&table_scan) + let plan1 = LogicalPlanBuilder::from(table_scan.clone()) .aggregate(vec![col("a")], vec![sum(col("b"))]) .unwrap() .build() .unwrap(); - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b"))]) .unwrap() .limit(0) diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index e5f8dcfbfffd..7b1ff326c3c6 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -483,7 +483,7 @@ mod tests { #[test] fn filter_before_projection() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("b")])? .filter(col("a").eq(lit(1i64)))? .build()?; @@ -499,7 +499,7 @@ mod tests { #[test] fn filter_after_limit() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("b")])? .limit(10)? .filter(col("a").eq(lit(1i64)))? @@ -517,7 +517,7 @@ mod tests { #[test] fn filter_no_columns() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(lit(0i64).eq(lit(1i64)))? .build()?; let expected = "\ @@ -530,7 +530,7 @@ mod tests { #[test] fn filter_jump_2_plans() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("b"), col("c")])? .project(vec![col("c"), col("b")])? .filter(col("a").eq(lit(1i64)))? @@ -548,7 +548,7 @@ mod tests { #[test] fn filter_move_agg() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b")).alias("total_salary")])? .filter(col("a").gt(lit(10i64)))? .build()?; @@ -564,7 +564,7 @@ mod tests { #[test] fn filter_keep_agg() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b")).alias("b")])? .filter(col("b").gt(lit(10i64)))? .build()?; @@ -581,7 +581,7 @@ mod tests { #[test] fn alias() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a").alias("b"), col("c")])? .filter(col("b").eq(lit(1i64)))? .build()?; @@ -614,7 +614,7 @@ mod tests { #[test] fn complex_expression() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![ add(multiply(col("a"), lit(2)), col("c")).alias("b"), col("c"), @@ -644,7 +644,7 @@ mod tests { #[test] fn complex_plan() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![ add(multiply(col("a"), lit(2)), col("c")).alias("b"), col("c"), @@ -680,7 +680,7 @@ mod tests { fn multi_filter() -> Result<()> { // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c)) let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a").alias("b"), col("c")])? .aggregate(vec![col("b")], vec![sum(col("c"))])? .filter(col("b").gt(lit(10i64)))? @@ -716,7 +716,7 @@ mod tests { fn split_filter() -> Result<()> { // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c)) let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a").alias("b"), col("c")])? .aggregate(vec![col("b")], vec![sum(col("c"))])? .filter(and( @@ -751,7 +751,7 @@ mod tests { #[test] fn double_limit() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("b")])? .limit(20)? .limit(10)? @@ -773,8 +773,8 @@ mod tests { #[test] fn union_all() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) - .union(LogicalPlanBuilder::from(&table_scan).build()?)? + let plan = LogicalPlanBuilder::from(table_scan.clone()) + .union(LogicalPlanBuilder::from(table_scan).build()?)? .filter(col("a").eq(lit(1i64)))? .build()?; // filter appears below Union @@ -792,7 +792,7 @@ mod tests { #[test] fn filter_2_breaks_limits() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a")])? .filter(col("a").lt_eq(lit(1i64)))? .limit(1)? @@ -828,7 +828,7 @@ mod tests { #[test] fn two_filters_on_same_depth() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .limit(1)? .filter(col("a").lt_eq(lit(1i64)))? .filter(col("a").gt_eq(lit(1i64)))? @@ -860,7 +860,7 @@ mod tests { #[test] fn filters_user_defined_node() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(col("a").lt_eq(lit(1i64)))? .build()?; @@ -882,11 +882,11 @@ mod tests { #[test] fn filter_join_on_common_independent() -> Result<()> { let table_scan = test_table_scan()?; - let left = LogicalPlanBuilder::from(&table_scan).build()?; - let right = LogicalPlanBuilder::from(&table_scan) + let left = LogicalPlanBuilder::from(table_scan.clone()).build()?; + let right = LogicalPlanBuilder::from(table_scan) .project(vec![col("a")])? .build()?; - let plan = LogicalPlanBuilder::from(&left) + let plan = LogicalPlanBuilder::from(left) .join( &right, JoinType::Inner, @@ -923,13 +923,13 @@ mod tests { #[test] fn filter_join_on_common_dependent() -> Result<()> { let table_scan = test_table_scan()?; - let left = LogicalPlanBuilder::from(&table_scan) + let left = LogicalPlanBuilder::from(table_scan.clone()) .project(vec![col("a"), col("c")])? .build()?; - let right = LogicalPlanBuilder::from(&table_scan) + let right = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("b")])? .build()?; - let plan = LogicalPlanBuilder::from(&left) + let plan = LogicalPlanBuilder::from(left) .join( &right, JoinType::Inner, @@ -962,13 +962,13 @@ mod tests { #[test] fn filter_join_on_one_side() -> Result<()> { let table_scan = test_table_scan()?; - let left = LogicalPlanBuilder::from(&table_scan) + let left = LogicalPlanBuilder::from(table_scan.clone()) .project(vec![col("a"), col("b")])? .build()?; - let right = LogicalPlanBuilder::from(&table_scan) + let right = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("c")])? .build()?; - let plan = LogicalPlanBuilder::from(&left) + let plan = LogicalPlanBuilder::from(left) .join( &right, JoinType::Inner, @@ -1060,7 +1060,7 @@ mod tests { limit: None, }; - LogicalPlanBuilder::from(&table_scan) + LogicalPlanBuilder::from(table_scan) .filter(col("a").eq(lit(1i64)))? .build() } diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index a2a99ae364a7..ecb3b40e3203 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -166,7 +166,8 @@ impl OptimizerRule for HashBuildProbeOrder { let left = self.optimize(left, execution_props)?; let right = self.optimize(right, execution_props)?; if should_swap_join_order(&left, &right) { - let swapped = LogicalPlanBuilder::from(&right).cross_join(&left)?; + let swapped = + LogicalPlanBuilder::from(right.clone()).cross_join(&left)?; // wrap plan with projection to maintain column order let left_cols = left .schema() diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index afd993710a5f..21b82a6c9698 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -155,7 +155,7 @@ mod test { fn limit_pushdown_projection_table_provider() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a")])? .limit(1000)? .build()?; @@ -174,7 +174,7 @@ mod test { fn limit_push_down_take_smaller_limit() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .limit(1000)? .limit(10)? .build()?; @@ -195,7 +195,7 @@ mod test { fn limit_doesnt_push_down_aggregation() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![max(col("b"))])? .limit(1000)? .build()?; @@ -214,8 +214,8 @@ mod test { fn limit_should_push_down_union() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) - .union(LogicalPlanBuilder::from(&table_scan).build()?)? + let plan = LogicalPlanBuilder::from(table_scan.clone()) + .union(LogicalPlanBuilder::from(table_scan).build()?)? .limit(1000)? .build()?; @@ -236,7 +236,7 @@ mod test { fn multi_stage_limit_recurses_to_deeper_limit() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .limit(1000)? .aggregate(vec![col("a")], vec![max(col("b"))])? .limit(10)? diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index a9e571f3d00b..790731d8f4ff 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -255,7 +255,7 @@ fn optimize_plan( &mut new_required_columns, )?; - LogicalPlanBuilder::from(&optimize_plan( + LogicalPlanBuilder::from(optimize_plan( optimizer, input, &new_required_columns, @@ -446,7 +446,7 @@ mod tests { fn aggregate_no_group_by() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![], vec![max(col("b"))])? .build()?; @@ -462,7 +462,7 @@ mod tests { fn aggregate_group_by() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("c")], vec![max(col("b"))])? .build()?; @@ -478,7 +478,7 @@ mod tests { fn aggregate_no_group_by_with_filter() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .filter(col("c"))? .aggregate(vec![], vec![max(col("b"))])? .build()?; @@ -500,7 +500,7 @@ mod tests { let table2_scan = LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .join(&table2_scan, JoinType::Left, vec!["a"], vec!["c1"])? .project(vec![col("a"), col("b"), col("c1")])? .build()?; @@ -533,7 +533,7 @@ mod tests { fn cast() -> Result<()> { let table_scan = test_table_scan()?; - let projection = LogicalPlanBuilder::from(&table_scan) + let projection = LogicalPlanBuilder::from(table_scan) .project(vec![Expr::Cast { expr: Box::new(col("c")), data_type: DataType::Float64, @@ -554,7 +554,7 @@ mod tests { assert_eq!(3, table_scan.schema().fields().len()); assert_fields_eq(&table_scan, vec!["a", "b", "c"]); - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("b")])? .build()?; @@ -574,7 +574,7 @@ mod tests { assert_eq!(3, table_scan.schema().fields().len()); assert_fields_eq(&table_scan, vec!["a", "b", "c"]); - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("c"), col("a")])? .limit(5)? .build()?; @@ -593,7 +593,7 @@ mod tests { #[test] fn table_scan_without_projection() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan).build()?; + let plan = LogicalPlanBuilder::from(table_scan).build()?; // should expand projection to all columns without projection let expected = "TableScan: test projection=Some([0, 1, 2])"; assert_optimized_plan_eq(&plan, expected); @@ -603,7 +603,7 @@ mod tests { #[test] fn table_scan_with_literal_projection() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![lit(1_i64), lit(2_i64)])? .build()?; let expected = "Projection: Int64(1), Int64(2)\ @@ -620,7 +620,7 @@ mod tests { assert_fields_eq(&table_scan, vec!["a", "b", "c"]); // we never use "b" in the first projection => remove it - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("c"), col("a"), col("b")])? .filter(col("c").gt(lit(1)))? .aggregate(vec![col("c")], vec![max(col("a"))])? @@ -647,7 +647,7 @@ mod tests { assert_fields_eq(&table_scan, vec!["a", "b", "c"]); // there is no need for the first projection - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("b")])? .project(vec![lit(1).alias("a")])? .build()?; @@ -668,7 +668,7 @@ mod tests { fn test_double_optimization() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("b")])? .project(vec![lit(1).alias("a")])? .build()?; @@ -691,7 +691,7 @@ mod tests { assert_fields_eq(&table_scan, vec!["a", "b", "c"]); // we never use "min(b)" => remove it - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])? .filter(col("c").gt(lit(1)))? .project(vec![col("c"), col("a"), col("MAX(test.b)")])? diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index 4253d2fd4f00..0e65de07305f 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -502,7 +502,7 @@ mod tests { #[test] fn test_simplify_optimized_plan() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a")])? .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))? .build()?; @@ -521,7 +521,7 @@ mod tests { #[test] fn test_simplify_optimized_plan_with_composed_and() -> Result<()> { let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(&table_scan) + let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a")])? .filter(and( and(col("a").gt(lit(5)), col("b").lt(lit(6))), diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 76f44b84657c..394308f5af80 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -232,7 +232,7 @@ pub fn from_plan( }) } LogicalPlan::CrossJoin { .. } => { - let left = &inputs[0]; + let left = inputs[0].clone(); let right = &inputs[1]; LogicalPlanBuilder::from(left).cross_join(right)?.build() } diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 17181230c26c..eac48d340192 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -143,9 +143,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } let plan = self.set_expr_to_plan(set_expr, alias, ctes)?; - let plan = self.order_by(&plan, &query.order_by)?; + let plan = self.order_by(plan, &query.order_by)?; - self.limit(&plan, &query.limit) + self.limit(plan, &query.limit) } fn set_expr_to_plan( @@ -309,9 +309,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { match t.joins.len() { 0 => Ok(left), n => { - let mut left = self.parse_relation_join(&left, &t.joins[0], ctes)?; + let mut left = self.parse_relation_join(left, &t.joins[0], ctes)?; for i in 1..n { - left = self.parse_relation_join(&left, &t.joins[i], ctes)?; + left = self.parse_relation_join(left, &t.joins[i], ctes)?; } Ok(left) } @@ -320,7 +320,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn parse_relation_join( &self, - left: &LogicalPlan, + left: LogicalPlan, join: &Join, ctes: &mut HashMap, ) -> Result { @@ -347,7 +347,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } fn parse_cross_join( &self, - left: &LogicalPlan, + left: LogicalPlan, right: &LogicalPlan, ) -> Result { LogicalPlanBuilder::from(left).cross_join(right)?.build() @@ -355,7 +355,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn parse_join( &self, - left: &LogicalPlan, + left: LogicalPlan, right: &LogicalPlan, constraint: &JoinConstraint, join_type: JoinType, @@ -489,13 +489,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } if join_keys.is_empty() { left = - LogicalPlanBuilder::from(&left).cross_join(right)?.build()?; + LogicalPlanBuilder::from(left).cross_join(right)?.build()?; } else { let left_keys: Vec = join_keys.iter().map(|(l, _)| l.clone()).collect(); let right_keys: Vec = join_keys.iter().map(|(_, r)| r.clone()).collect(); - let builder = LogicalPlanBuilder::from(&left); + let builder = LogicalPlanBuilder::from(left); left = builder .join(right, JoinType::Inner, left_keys, right_keys)? .build()?; @@ -507,7 +507,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // remove join expressions from filter match remove_join_expressions(&filter_expr, &all_join_keys)? { Some(filter_expr) => { - LogicalPlanBuilder::from(&left).filter(filter_expr)?.build() + LogicalPlanBuilder::from(left).filter(filter_expr)?.build() } _ => Ok(left), } @@ -519,7 +519,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut left = plans[0].clone(); for right in plans.iter().skip(1) { left = - LogicalPlanBuilder::from(&left).cross_join(right)?.build()?; + LogicalPlanBuilder::from(left).cross_join(right)?.build()?; } Ok(left) } @@ -531,7 +531,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let select_exprs = self.prepare_select_exprs(&plan, &select.projection)?; // having and group by clause may reference aliases defined in select projection - let projected_plan = self.project(&plan, select_exprs.clone())?; + let projected_plan = self.project(plan.clone(), select_exprs.clone())?; let mut combined_schema = (**projected_plan.schema()).clone(); combined_schema.merge(plan.schema()); @@ -597,7 +597,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { || !aggr_exprs.is_empty() { self.aggregate( - &plan, + plan, &select_exprs, &having_expr_opt, group_by_exprs, @@ -625,7 +625,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr_opt { - LogicalPlanBuilder::from(&plan) + LogicalPlanBuilder::from(plan) .filter(having_expr_post_aggr)? .build()? } else { @@ -642,14 +642,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; let plan = if select.distinct { - return LogicalPlanBuilder::from(&plan) + return LogicalPlanBuilder::from(plan) .aggregate(select_exprs_post_aggr, vec![])? .build(); } else { plan }; - self.project(&plan, select_exprs_post_aggr) + self.project(plan, select_exprs_post_aggr) } /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions. @@ -672,7 +672,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } /// Wrap a plan in a projection - fn project(&self, input: &LogicalPlan, expr: Vec) -> Result { + fn project(&self, input: LogicalPlan, expr: Vec) -> Result { self.validate_schema_satisfies_exprs(input.schema(), &expr)?; LogicalPlanBuilder::from(input).project(expr)?.build() } @@ -691,7 +691,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let window_exprs = exprs.into_iter().cloned().collect::>(); // the partition and sort itself is done at physical level, see physical_planner's // fn create_initial_plan - plan = LogicalPlanBuilder::from(&plan) + plan = LogicalPlanBuilder::from(plan) .window(window_exprs)? .build()?; } @@ -702,7 +702,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Wrap a plan in an aggregate fn aggregate( &self, - input: &LogicalPlan, + input: LogicalPlan, select_exprs: &[Expr], having_expr_opt: &Option, group_by_exprs: Vec, @@ -714,7 +714,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .cloned() .collect::>(); - let plan = LogicalPlanBuilder::from(input) + let plan = LogicalPlanBuilder::from(input.clone()) .aggregate(group_by_exprs, aggr_exprs)? .build()?; @@ -722,14 +722,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // available to next phases of planning. let column_exprs_post_aggr = aggr_projection_exprs .iter() - .map(|expr| expr_as_column_expr(expr, input)) + .map(|expr| expr_as_column_expr(expr, &input)) .collect::>>()?; // Rewrite the SELECT expression to use the columns produced by the // aggregation. let select_exprs_post_aggr = select_exprs .iter() - .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) + .map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input)) .collect::>>()?; if !can_columns_satisfy_exprs(&column_exprs_post_aggr, &select_exprs_post_aggr)? { @@ -742,7 +742,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // aggregation. let having_expr_post_aggr_opt = if let Some(having_expr) = having_expr_opt { let having_expr_post_aggr = - rebase_expr(having_expr, &aggr_projection_exprs, input)?; + rebase_expr(having_expr, &aggr_projection_exprs, &input)?; if !can_columns_satisfy_exprs( &column_exprs_post_aggr, @@ -762,7 +762,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } /// Wrap a plan in a limit - fn limit(&self, input: &LogicalPlan, limit: &Option) -> Result { + fn limit(&self, input: LogicalPlan, limit: &Option) -> Result { match *limit { Some(ref limit_expr) => { let n = match self.sql_to_rex(limit_expr, input.schema())? { @@ -774,18 +774,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { LogicalPlanBuilder::from(input).limit(n)?.build() } - _ => Ok(input.clone()), + _ => Ok(input), } } /// Wrap the logical in a sort fn order_by( &self, - plan: &LogicalPlan, + plan: LogicalPlan, order_by: &[OrderByExpr], ) -> Result { if order_by.is_empty() { - return Ok(plan.clone()); + return Ok(plan); } let order_by_rex = order_by diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 75fbe8e8eede..36adbea1be0e 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -178,7 +178,7 @@ async fn custom_source_dataframe() -> Result<()> { let mut ctx = ExecutionContext::new(); let table = ctx.read_table(Arc::new(CustomTableProvider))?; - let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan()) + let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()) .project(vec![col("c2")])? .build()?;