diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 559908bcfdfa..c858a9873434 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -41,9 +41,8 @@ use crate::utils::{ find_valid_equijoin_key_pair, group_window_expr_by_sort_keys, }; use crate::{ - and, binary_expr, logical_plan::tree_node::unwrap_arc, DmlStatement, Expr, - ExprSchemable, Operator, RecursiveQuery, TableProviderFilterPushDown, TableSource, - WriteOp, + and, binary_expr, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery, + TableProviderFilterPushDown, TableSource, WriteOp, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; @@ -376,7 +375,7 @@ impl LogicalPlanBuilder { self, expr: impl IntoIterator>, ) -> Result { - project(unwrap_arc(self.plan), expr).map(Self::new) + project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new) } /// Select the given column indices @@ -429,7 +428,7 @@ impl LogicalPlanBuilder { /// Apply an alias pub fn alias(self, alias: impl Into) -> Result { - subquery_alias(unwrap_arc(self.plan), alias).map(Self::new) + subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new) } /// Add missing sort columns to all downstream projection @@ -484,7 +483,7 @@ impl LogicalPlanBuilder { Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?; } expr.extend(missing_exprs); - project(unwrap_arc(input), expr) + project(Arc::unwrap_or_clone(input), expr) } _ => { let is_distinct = @@ -580,8 +579,11 @@ impl LogicalPlanBuilder { let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); let is_distinct = false; - let plan = - Self::add_missing_columns(unwrap_arc(self.plan), &missing_cols, is_distinct)?; + let plan = Self::add_missing_columns( + Arc::unwrap_or_clone(self.plan), + &missing_cols, + is_distinct, + )?; let sort_plan = LogicalPlan::Sort(Sort { expr: normalize_cols(exprs, &plan)?, input: Arc::new(plan), @@ -595,12 +597,12 @@ impl LogicalPlanBuilder { /// Apply a union, preserving duplicate rows pub fn union(self, plan: LogicalPlan) -> Result { - union(unwrap_arc(self.plan), plan).map(Self::new) + union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new) } /// Apply a union, removing duplicate rows pub fn union_distinct(self, plan: LogicalPlan) -> Result { - let left_plan: LogicalPlan = unwrap_arc(self.plan); + let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan); let right_plan: LogicalPlan = plan; Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new( @@ -1064,7 +1066,7 @@ impl LogicalPlanBuilder { /// Build the plan pub fn build(self) -> Result { - Ok(unwrap_arc(self.plan)) + Ok(Arc::unwrap_or_clone(self.plan)) } /// Apply a join with the expression on constraint. @@ -1138,7 +1140,7 @@ impl LogicalPlanBuilder { /// Unnest the given column. pub fn unnest_column(self, column: impl Into) -> Result { - unnest(unwrap_arc(self.plan), vec![column.into()]).map(Self::new) + unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new) } /// Unnest the given column given [`UnnestOptions`] @@ -1147,8 +1149,12 @@ impl LogicalPlanBuilder { column: impl Into, options: UnnestOptions, ) -> Result { - unnest_with_options(unwrap_arc(self.plan), vec![column.into()], options) - .map(Self::new) + unnest_with_options( + Arc::unwrap_or_clone(self.plan), + vec![column.into()], + options, + ) + .map(Self::new) } /// Unnest the given columns with the given [`UnnestOptions`] @@ -1157,7 +1163,8 @@ impl LogicalPlanBuilder { columns: Vec, options: UnnestOptions, ) -> Result { - unnest_with_options(unwrap_arc(self.plan), columns, options).map(Self::new) + unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options) + .map(Self::new) } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3ede7f25b753..03ebdd8fad46 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -51,7 +51,6 @@ use datafusion_common::{ // backwards compatibility use crate::display::PgJsonVisitor; -use crate::logical_plan::tree_node::unwrap_arc; pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; pub use datafusion_common::{JoinConstraint, JoinType}; @@ -770,7 +769,7 @@ impl LogicalPlan { .. }) => { // Update schema with unnested column type. - unnest_with_options(unwrap_arc(input), exec_columns, options) + unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options) } } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 539cb1cf5fb2..273404c8df31 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -379,21 +379,12 @@ impl TreeNode for LogicalPlan { } } -/// Converts a `Arc` without copying, if possible. Copies the plan -/// if there is a shared reference -pub fn unwrap_arc(plan: Arc) -> LogicalPlan { - Arc::try_unwrap(plan) - // if None is returned, there is another reference to this - // LogicalPlan, so we can not own it, and must clone instead - .unwrap_or_else(|node| node.as_ref().clone()) -} - /// Applies `f` to rewrite a `Arc` without copying, if possible fn rewrite_arc Result>>( plan: Arc, mut f: F, ) -> Result>> { - f(unwrap_arc(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan))) + f(Arc::unwrap_or_clone(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan))) } /// rewrite a `Vec` of `Arc` without copying, if possible diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 68ab2e13005f..0fd4a5dfdd78 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -37,7 +37,6 @@ use datafusion_expr::expr::{ }; use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; use datafusion_expr::expr_schema::cast_subquery; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::Subquery; use datafusion_expr::type_coercion::binary::{ comparison_coercion, get_input_types, like_coercion, @@ -241,15 +240,19 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { subquery, outer_ref_columns, }) => { - let new_plan = analyze_internal(self.schema, unwrap_arc(subquery))?.data; + let new_plan = + analyze_internal(self.schema, Arc::unwrap_or_clone(subquery))?.data; Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { subquery: Arc::new(new_plan), outer_ref_columns, }))) } Expr::Exists(Exists { subquery, negated }) => { - let new_plan = - analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; + let new_plan = analyze_internal( + self.schema, + Arc::unwrap_or_clone(subquery.subquery), + )? + .data; Ok(Transformed::yes(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(new_plan), @@ -263,8 +266,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { subquery, negated, }) => { - let new_plan = - analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; + let new_plan = analyze_internal( + self.schema, + Arc::unwrap_or_clone(subquery.subquery), + )? + .data; let expr_type = expr.get_type(self.schema)?; let subquery_type = new_plan.schema().field(0).data_type(); let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!( diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index feccf5679efb..3fcee4123b76 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -33,7 +33,6 @@ use datafusion_common::tree_node::{ }; use datafusion_common::{qualified_name, Column, DFSchema, DFSchemaRef, Result}; use datafusion_expr::expr::{Alias, ScalarFunction}; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::{ Aggregate, Filter, LogicalPlan, Projection, Sort, Window, }; @@ -314,7 +313,7 @@ impl CommonSubexprEliminate { schema, .. } = projection; - let input = unwrap_arc(input); + let input = Arc::unwrap_or_clone(input); self.try_unary_plan(expr, input, config)? .map_data(|(new_expr, new_input)| { Projection::try_new_with_schema(new_expr, Arc::new(new_input), schema) @@ -327,7 +326,7 @@ impl CommonSubexprEliminate { config: &dyn OptimizerConfig, ) -> Result> { let Sort { expr, input, fetch } = sort; - let input = unwrap_arc(input); + let input = Arc::unwrap_or_clone(input); let new_sort = self.try_unary_plan(expr, input, config)?.update_data( |(new_expr, new_input)| { LogicalPlan::Sort(Sort { @@ -348,7 +347,7 @@ impl CommonSubexprEliminate { let Filter { predicate, input, .. } = filter; - let input = unwrap_arc(input); + let input = Arc::unwrap_or_clone(input); let expr = vec![predicate]; self.try_unary_plan(expr, input, config)? .map_data(|(mut new_expr, new_input)| { @@ -458,7 +457,7 @@ impl CommonSubexprEliminate { schema, .. } = aggregate; - let input = unwrap_arc(input); + let input = Arc::unwrap_or_clone(input); // Extract common sub-expressions from the aggregate and grouping expressions. self.find_common_exprs(vec![group_expr, aggr_expr], config, ExprMask::Normal)? .map_data(|common| { @@ -729,7 +728,7 @@ fn get_consecutive_window_exprs( window_expr_list.push(window_expr); window_schemas.push(schema); - plan = unwrap_arc(input); + plan = Arc::unwrap_or_clone(input); } (window_expr_list, window_schemas, plan) } diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index b6d49490d437..f1cae1099a4d 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -37,7 +37,6 @@ use datafusion_expr::{ LogicalPlan, LogicalPlanBuilder, Operator, }; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use log::debug; /// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins @@ -55,8 +54,10 @@ impl DecorrelatePredicateSubquery { mut subquery: Subquery, config: &dyn OptimizerConfig, ) -> Result { - subquery.subquery = - Arc::new(self.rewrite(unwrap_arc(subquery.subquery), config)?.data); + subquery.subquery = Arc::new( + self.rewrite(Arc::unwrap_or_clone(subquery.subquery), config)? + .data, + ); Ok(subquery) } @@ -164,7 +165,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery { } // iterate through all exists clauses in predicate, turning each into a join - let mut cur_input = unwrap_arc(input); + let mut cur_input = Arc::unwrap_or_clone(input); for subquery in subqueries { if let Some(plan) = build_join(&subquery, &cur_input, config.alias_generator())? diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index fc4eaef80903..20e6641e4d62 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -24,7 +24,6 @@ use crate::join_key_set::JoinKeySet; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{internal_err, Result}; use datafusion_expr::expr::{BinaryExpr, Expr}; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::{ CrossJoin, Filter, Join, JoinConstraint, JoinType, LogicalPlan, Projection, }; @@ -114,7 +113,7 @@ impl OptimizerRule for EliminateCrossJoin { input, predicate, .. } = filter; flatten_join_inputs( - unwrap_arc(input), + Arc::unwrap_or_clone(input), &mut possible_join_keys, &mut all_inputs, )?; @@ -217,12 +216,28 @@ fn flatten_join_inputs( ); } possible_join_keys.insert_all_owned(join.on); - flatten_join_inputs(unwrap_arc(join.left), possible_join_keys, all_inputs)?; - flatten_join_inputs(unwrap_arc(join.right), possible_join_keys, all_inputs)?; + flatten_join_inputs( + Arc::unwrap_or_clone(join.left), + possible_join_keys, + all_inputs, + )?; + flatten_join_inputs( + Arc::unwrap_or_clone(join.right), + possible_join_keys, + all_inputs, + )?; } LogicalPlan::CrossJoin(join) => { - flatten_join_inputs(unwrap_arc(join.left), possible_join_keys, all_inputs)?; - flatten_join_inputs(unwrap_arc(join.right), possible_join_keys, all_inputs)?; + flatten_join_inputs( + Arc::unwrap_or_clone(join.left), + possible_join_keys, + all_inputs, + )?; + flatten_join_inputs( + Arc::unwrap_or_clone(join.right), + possible_join_keys, + all_inputs, + )?; } _ => { all_inputs.push(plan); diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index 84bb8e782142..bb2b4547e9c2 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -19,7 +19,6 @@ use datafusion_common::tree_node::Transformed; use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::{EmptyRelation, Expr, Filter, LogicalPlan}; use std::sync::Arc; @@ -65,7 +64,7 @@ impl OptimizerRule for EliminateFilter { input, .. }) => match v { - Some(true) => Ok(Transformed::yes(unwrap_arc(input))), + Some(true) => Ok(Transformed::yes(Arc::unwrap_or_clone(input))), Some(false) | None => Ok(Transformed::yes(LogicalPlan::EmptyRelation( EmptyRelation { produce_one_row: false, diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index a42fe6a6f95b..e48f37a77cd3 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -20,7 +20,7 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::Transformed; use datafusion_common::Result; -use datafusion_expr::logical_plan::{tree_node::unwrap_arc, EmptyRelation, LogicalPlan}; +use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan}; use std::sync::Arc; /// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is @@ -74,7 +74,9 @@ impl OptimizerRule for EliminateLimit { } } else if limit.skip == 0 { // input also can be Limit, so we should apply again. - return Ok(self.rewrite(unwrap_arc(limit.input), _config).unwrap()); + return Ok(self + .rewrite(Arc::unwrap_or_clone(limit.input), _config) + .unwrap()); } Ok(Transformed::no(LogicalPlan::Limit(limit))) } diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index 5d7895bba4d8..e9b38567a982 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -21,7 +21,6 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::Transformed; use datafusion_common::Result; use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::{Distinct, LogicalPlan, Union}; use itertools::Itertools; use std::sync::Arc; @@ -69,7 +68,7 @@ impl OptimizerRule for EliminateNestedUnion { }))) } LogicalPlan::Distinct(Distinct::All(nested_plan)) => { - match unwrap_arc(nested_plan) { + match Arc::unwrap_or_clone(nested_plan) { LogicalPlan::Union(Union { inputs, schema }) => { let inputs = inputs .into_iter() @@ -96,16 +95,17 @@ impl OptimizerRule for EliminateNestedUnion { } fn extract_plans_from_union(plan: Arc) -> Vec { - match unwrap_arc(plan) { - LogicalPlan::Union(Union { inputs, .. }) => { - inputs.into_iter().map(unwrap_arc).collect::>() - } + match Arc::unwrap_or_clone(plan) { + LogicalPlan::Union(Union { inputs, .. }) => inputs + .into_iter() + .map(Arc::unwrap_or_clone) + .collect::>(), plan => vec![plan], } } fn extract_plan_from_distinct(plan: Arc) -> Arc { - match unwrap_arc(plan) { + match Arc::unwrap_or_clone(plan) { LogicalPlan::Distinct(Distinct::All(plan)) => plan, plan => Arc::new(plan), } diff --git a/datafusion/optimizer/src/eliminate_one_union.rs b/datafusion/optimizer/src/eliminate_one_union.rs index 43024107c4f8..383b33637f6f 100644 --- a/datafusion/optimizer/src/eliminate_one_union.rs +++ b/datafusion/optimizer/src/eliminate_one_union.rs @@ -16,9 +16,11 @@ // under the License. //! [`EliminateOneUnion`] eliminates single element `Union` + use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{tree_node::Transformed, Result}; -use datafusion_expr::logical_plan::{tree_node::unwrap_arc, LogicalPlan, Union}; +use datafusion_expr::logical_plan::{LogicalPlan, Union}; +use std::sync::Arc; use crate::optimizer::ApplyOrder; @@ -48,9 +50,9 @@ impl OptimizerRule for EliminateOneUnion { _config: &dyn OptimizerConfig, ) -> Result> { match plan { - LogicalPlan::Union(Union { mut inputs, .. }) if inputs.len() == 1 => { - Ok(Transformed::yes(unwrap_arc(inputs.pop().unwrap()))) - } + LogicalPlan::Union(Union { mut inputs, .. }) if inputs.len() == 1 => Ok( + Transformed::yes(Arc::unwrap_or_clone(inputs.pop().unwrap())), + ), _ => Ok(Transformed::no(plan)), } } diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 12534e058152..e7c88df55122 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -18,7 +18,6 @@ //! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{Column, DFSchema, Result}; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan}; use datafusion_expr::{Expr, Filter, Operator}; @@ -79,7 +78,7 @@ impl OptimizerRule for EliminateOuterJoin { _config: &dyn OptimizerConfig, ) -> Result> { match plan { - LogicalPlan::Filter(mut filter) => match unwrap_arc(filter.input) { + LogicalPlan::Filter(mut filter) => match Arc::unwrap_or_clone(filter.input) { LogicalPlan::Join(join) => { let mut non_nullable_cols: Vec = vec![]; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index ac4ed87a4a1a..5db5afd11062 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -41,7 +41,6 @@ use crate::utils::NamePreserver; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, }; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; /// Optimizer rule to prune unnecessary columns from intermediate schemas /// inside the [`LogicalPlan`]. This rule: @@ -181,7 +180,7 @@ fn optimize_projections( let necessary_exprs = necessary_indices.get_required_exprs(schema); return optimize_projections( - unwrap_arc(aggregate.input), + Arc::unwrap_or_clone(aggregate.input), config, necessary_indices, )? @@ -221,7 +220,7 @@ fn optimize_projections( child_reqs.with_exprs(&input_schema, &new_window_expr)?; return optimize_projections( - unwrap_arc(window.input), + Arc::unwrap_or_clone(window.input), config, required_indices.clone(), )? @@ -488,7 +487,7 @@ fn merge_consecutive_projections(proj: Projection) -> Result { let parents_predicates = split_conjunction_owned(filter.predicate); @@ -1139,19 +1138,19 @@ fn convert_to_cross_join_if_beneficial( match plan { // Can be converted back to cross join LogicalPlan::Join(join) if join.on.is_empty() && join.filter.is_none() => { - LogicalPlanBuilder::from(unwrap_arc(join.left)) - .cross_join(unwrap_arc(join.right))? + LogicalPlanBuilder::from(Arc::unwrap_or_clone(join.left)) + .cross_join(Arc::unwrap_or_clone(join.right))? .build() .map(Transformed::yes) } - LogicalPlan::Filter(filter) => convert_to_cross_join_if_beneficial(unwrap_arc( - filter.input, - ))? - .transform_data(|child_plan| { - Filter::try_new(filter.predicate, Arc::new(child_plan)) - .map(LogicalPlan::Filter) - .map(Transformed::yes) - }), + LogicalPlan::Filter(filter) => { + convert_to_cross_join_if_beneficial(Arc::unwrap_or_clone(filter.input))? + .transform_data(|child_plan| { + Filter::try_new(filter.predicate, Arc::new(child_plan)) + .map(LogicalPlan::Filter) + .map(Transformed::yes) + }) + } plan => Ok(Transformed::no(plan)), } } diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 290b893577b8..dff0b61c6b22 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -26,7 +26,6 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::Transformed; use datafusion_common::utils::combine_limit; use datafusion_common::Result; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::{Join, JoinType, Limit, LogicalPlan}; /// Optimization rule that tries to push down `LIMIT`. @@ -83,7 +82,7 @@ impl OptimizerRule for PushDownLimit { }))); }; - match unwrap_arc(input) { + match Arc::unwrap_or_clone(input) { LogicalPlan::TableScan(mut scan) => { let rows_needed = if fetch != 0 { fetch + skip } else { 0 }; let new_fetch = scan diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 45fda094557b..384893bfa94c 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -33,7 +33,6 @@ use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, }; -use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, }; @@ -361,9 +360,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .build() } LogicalPlan::Filter(mut filter) => { - filter.input = Arc::new( - self.try_process_aggregate_unnest(unwrap_arc(filter.input))?, - ); + filter.input = + Arc::new(self.try_process_aggregate_unnest(Arc::unwrap_or_clone( + filter.input, + ))?); Ok(LogicalPlan::Filter(filter)) } _ => Ok(input), @@ -401,7 +401,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Projection: tab.array_col AS unnest(tab.array_col) // TableScan: tab // ``` - let mut intermediate_plan = unwrap_arc(input); + let mut intermediate_plan = Arc::unwrap_or_clone(input); let mut intermediate_select_exprs = group_expr; loop {