From 7200e30f0121d5b83ca5a9337091ec47ea45dbe9 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 22 May 2024 09:38:41 +0200 Subject: [PATCH] use stricter references in apply() and visit() --- .../examples/custom_datasource.rs | 2 +- datafusion/common/src/tree_node.rs | 125 ++++-------------- .../datasource/physical_plan/arrow_file.rs | 2 +- .../core/src/datasource/physical_plan/avro.rs | 2 +- .../core/src/datasource/physical_plan/csv.rs | 2 +- .../core/src/datasource/physical_plan/json.rs | 2 +- .../datasource/physical_plan/parquet/mod.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 4 +- .../aggregate_statistics.rs | 2 +- .../enforce_distribution.rs | 4 +- .../src/physical_optimizer/enforce_sorting.rs | 2 +- .../limited_distinct_aggregation.rs | 4 +- .../physical_optimizer/output_requirements.rs | 6 +- .../physical_optimizer/projection_pushdown.rs | 8 +- .../src/physical_optimizer/sort_pushdown.rs | 2 +- .../physical_optimizer/topk_aggregation.rs | 2 +- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/src/test/mod.rs | 2 +- datafusion/core/src/test_util/mod.rs | 2 +- datafusion/core/src/test_util/parquet.rs | 2 +- .../provider_filter_pushdown.rs | 2 +- .../tests/custom_sources_cases/statistics.rs | 2 +- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 4 +- .../core/tests/parquet/filter_pushdown.rs | 2 +- .../tests/user_defined/user_defined_plan.rs | 4 +- datafusion/expr/src/logical_plan/display.rs | 14 +- datafusion/expr/src/logical_plan/plan.rs | 18 +-- datafusion/expr/src/logical_plan/tree_node.rs | 10 +- datafusion/expr/src/tree_node.rs | 2 +- .../optimizer/src/common_subexpr_eliminate.rs | 6 +- .../src/expressions/column.rs | 2 +- .../physical-expr-common/src/physical_expr.rs | 2 +- .../physical-expr-common/src/tree_node.rs | 9 +- datafusion/physical-expr-common/src/utils.rs | 7 +- .../physical-expr/src/equivalence/class.rs | 2 +- .../physical-expr/src/expressions/binary.rs | 4 +- .../physical-expr/src/expressions/case.rs | 17 +-- .../physical-expr/src/expressions/cast.rs | 4 +- .../physical-expr/src/expressions/column.rs | 2 +- .../physical-expr/src/expressions/in_list.rs | 6 +- .../src/expressions/is_not_null.rs | 4 +- .../physical-expr/src/expressions/is_null.rs | 4 +- .../physical-expr/src/expressions/like.rs | 4 +- .../physical-expr/src/expressions/literal.rs | 2 +- .../physical-expr/src/expressions/negative.rs | 4 +- .../physical-expr/src/expressions/no_op.rs | 2 +- .../physical-expr/src/expressions/not.rs | 4 +- .../physical-expr/src/expressions/try_cast.rs | 4 +- .../physical-expr/src/scalar_function.rs | 4 +- .../physical-plan/src/aggregates/mod.rs | 6 +- datafusion/physical-plan/src/analyze.rs | 4 +- .../physical-plan/src/coalesce_batches.rs | 4 +- .../physical-plan/src/coalesce_partitions.rs | 4 +- datafusion/physical-plan/src/display.rs | 2 +- datafusion/physical-plan/src/empty.rs | 2 +- datafusion/physical-plan/src/explain.rs | 2 +- datafusion/physical-plan/src/filter.rs | 4 +- datafusion/physical-plan/src/insert.rs | 4 +- .../physical-plan/src/joins/cross_join.rs | 4 +- .../physical-plan/src/joins/hash_join.rs | 4 +- .../src/joins/nested_loop_join.rs | 4 +- .../src/joins/sort_merge_join.rs | 4 +- .../src/joins/symmetric_hash_join.rs | 4 +- datafusion/physical-plan/src/lib.rs | 6 +- datafusion/physical-plan/src/limit.rs | 8 +- datafusion/physical-plan/src/memory.rs | 2 +- .../physical-plan/src/placeholder_row.rs | 2 +- datafusion/physical-plan/src/projection.rs | 4 +- .../physical-plan/src/recursive_query.rs | 8 +- .../physical-plan/src/repartition/mod.rs | 4 +- .../physical-plan/src/sorts/partial_sort.rs | 4 +- datafusion/physical-plan/src/sorts/sort.rs | 4 +- .../src/sorts/sort_preserving_merge.rs | 4 +- datafusion/physical-plan/src/streaming.rs | 2 +- datafusion/physical-plan/src/test/exec.rs | 12 +- datafusion/physical-plan/src/tree_node.rs | 9 +- datafusion/physical-plan/src/union.rs | 8 +- datafusion/physical-plan/src/unnest.rs | 4 +- datafusion/physical-plan/src/values.rs | 2 +- .../src/windows/bounded_window_agg_exec.rs | 4 +- .../src/windows/window_agg_exec.rs | 4 +- datafusion/physical-plan/src/work_table.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 1 + 83 files changed, 209 insertions(+), 267 deletions(-) diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index c2ea6f2b52a..cfb49b02315 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -237,7 +237,7 @@ impl ExecutionPlan for CustomExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index f0b9e39af28..40d9088bf2a 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -123,26 +123,13 @@ pub trait TreeNode: Sized { /// TreeNodeVisitor::f_up(ChildNode2) /// TreeNodeVisitor::f_up(ParentNode) /// ``` - fn visit>( - &self, - visitor: &mut V, - ) -> Result { - visitor - .f_down(self)? - .visit_children(|| self.apply_children(|c| c.visit(visitor)))? - .visit_parent(|| visitor.f_up(self)) - } - - /// Similar to [`TreeNode::visit()`], but the lifetimes of the [`TreeNode`] references - /// passed to [`TreeNodeRefVisitor::f_down()`] and [`TreeNodeRefVisitor::f_up()`] - /// methods match the lifetime of the original root [`TreeNode`] reference. - fn visit_ref<'n, V: TreeNodeRefVisitor<'n, Node = Self>>( + fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>( &'n self, visitor: &mut V, ) -> Result { visitor .f_down(self)? - .visit_children(|| self.apply_children_ref(|c| c.visit_ref(visitor)))? + .visit_children(|| self.apply_children(|c| c.visit(visitor)))? .visit_parent(|| visitor.f_up(self)) } @@ -203,39 +190,18 @@ pub trait TreeNode: Sized { /// # See Also /// * [`Self::transform_down`] for the equivalent transformation API. /// * [`Self::visit`] for both top-down and bottom up traversal. - fn apply Result>( - &self, - mut f: F, - ) -> Result { - fn apply_impl Result>( - node: &N, - f: &mut F, - ) -> Result { - f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c, f))) - } - - apply_impl(self, &mut f) - } - - /// Similar to [`TreeNode::apply()`], but the lifetime of the [`TreeNode`] references - /// passed to the `f` closures match the lifetime of the original root [`TreeNode`] - /// reference. - fn apply_ref<'n, F: FnMut(&'n Self) -> Result>( + fn apply<'n, F: FnMut(&'n Self) -> Result>( &'n self, mut f: F, ) -> Result { - fn apply_ref_impl< - 'n, - N: TreeNode, - F: FnMut(&'n N) -> Result, - >( + fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result>( node: &'n N, f: &mut F, ) -> Result { - f(node)?.visit_children(|| node.apply_children_ref(|c| apply_ref_impl(c, f))) + f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c, f))) } - apply_ref_impl(self, &mut f) + apply_impl(self, &mut f) } /// Recursively rewrite the node's children and then the node using `f` @@ -461,18 +427,7 @@ pub trait TreeNode: Sized { /// /// Description: Apply `f` to inspect node's children (but not the node /// itself). - fn apply_children Result>( - &self, - f: F, - ) -> Result { - // The default implementation is the stricter `apply_children_ref()` - self.apply_children_ref(f) - } - - /// Similar to [`TreeNode::apply_children()`], but the lifetime of the [`TreeNode`] - /// references passed to the `f` closures match the lifetime of the original root - /// [`TreeNode`] reference. - fn apply_children_ref<'n, F: FnMut(&'n Self) -> Result>( + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( &'n self, f: F, ) -> Result; @@ -511,27 +466,7 @@ pub trait TreeNode: Sized { /// /// # See Also: /// * [`TreeNode::rewrite`] to rewrite owned `TreeNode`s -pub trait TreeNodeVisitor: Sized { - /// The node type which is visitable. - type Node: TreeNode; - - /// Invoked while traversing down the tree, before any children are visited. - /// Default implementation continues the recursion. - fn f_down(&mut self, _node: &Self::Node) -> Result { - Ok(TreeNodeRecursion::Continue) - } - - /// Invoked while traversing up the tree after children are visited. Default - /// implementation continues the recursion. - fn f_up(&mut self, _node: &Self::Node) -> Result { - Ok(TreeNodeRecursion::Continue) - } -} - -/// Similar to [`TreeNodeVisitor`], but the lifetimes of the [`TreeNode`] references -/// passed to [`TreeNodeRefVisitor::f_down()`] and [`TreeNodeRefVisitor::f_up()`] methods -/// match the lifetime of the original root [`TreeNode`] reference. -pub trait TreeNodeRefVisitor<'n>: Sized { +pub trait TreeNodeVisitor<'n>: Sized { /// The node type which is visitable. type Node: TreeNode; @@ -920,11 +855,7 @@ impl TransformedResult for Result> { /// its related `Arc` will automatically implement [`TreeNode`]. pub trait DynTreeNode { /// Returns all children of the specified `TreeNode`. - fn arc_children(&self) -> Vec>; - - fn children(&self) -> Vec<&Arc> { - panic!("DynTreeNode::children is not implemented yet") - } + fn arc_children(&self) -> Vec<&Arc>; /// Constructs a new node with the specified children. fn with_new_arc_children( @@ -937,18 +868,11 @@ pub trait DynTreeNode { /// Blanket implementation for any `Arc` where `T` implements [`DynTreeNode`] /// (such as [`Arc`]). impl TreeNode for Arc { - fn apply_children Result>( - &self, - f: F, - ) -> Result { - self.arc_children().iter().apply_until_stop(f) - } - - fn apply_children_ref<'n, F: FnMut(&'n Self) -> Result>( + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( &'n self, f: F, ) -> Result { - self.children().into_iter().apply_until_stop(f) + self.arc_children().into_iter().apply_until_stop(f) } fn map_children Result>>( @@ -957,7 +881,10 @@ impl TreeNode for Arc { ) -> Result> { let children = self.arc_children(); if !children.is_empty() { - let new_children = children.into_iter().map_until_stop_and_collect(f)?; + let new_children = children + .into_iter() + .cloned() + .map_until_stop_and_collect(f)?; // Propagate up `new_children.transformed` and `new_children.tnr` // along with the node containing transformed children. if new_children.transformed { @@ -989,7 +916,7 @@ pub trait ConcreteTreeNode: Sized { } impl TreeNode for T { - fn apply_children_ref<'n, F: FnMut(&'n Self) -> Result>( + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( &'n self, f: F, ) -> Result { @@ -1018,8 +945,8 @@ mod tests { use std::fmt::Display; use crate::tree_node::{ - Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, TreeNodeRefVisitor, - TreeNodeRewriter, TreeNodeVisitor, + Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, TreeNodeRewriter, + TreeNodeVisitor, }; use crate::Result; @@ -1036,7 +963,7 @@ mod tests { } impl TreeNode for TestTreeNode { - fn apply_children_ref<'n, F: FnMut(&'n Self) -> Result>( + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( &'n self, f: F, ) -> Result { @@ -1536,15 +1463,15 @@ mod tests { } } - impl TreeNodeVisitor for TestVisitor { + impl<'n, T: Display> TreeNodeVisitor<'n> for TestVisitor { type Node = TestTreeNode; - fn f_down(&mut self, node: &Self::Node) -> Result { + fn f_down(&mut self, node: &'n Self::Node) -> Result { self.visits.push(format!("f_down({})", node.data)); (*self.f_down)(node) } - fn f_up(&mut self, node: &Self::Node) -> Result { + fn f_up(&mut self, node: &'n Self::Node) -> Result { self.visits.push(format!("f_up({})", node.data)); (*self.f_up)(node) } @@ -2001,7 +1928,7 @@ mod tests { // | // A #[test] - fn test_apply_ref() -> Result<()> { + fn test_apply_and_visit_references() -> Result<()> { let node_a = TestTreeNode::new(vec![], "a".to_string()); let node_b = TestTreeNode::new(vec![], "b".to_string()); let node_d = TestTreeNode::new(vec![node_a], "d".to_string()); @@ -2022,7 +1949,7 @@ mod tests { let node_a_ref = &node_d_ref.children[0]; let mut m: HashMap<&TestTreeNode, usize> = HashMap::new(); - tree.apply_ref(|e| { + tree.apply(|e| { *m.entry(e).or_insert(0) += 1; Ok(TreeNodeRecursion::Continue) })?; @@ -2041,7 +1968,7 @@ mod tests { m: HashMap<&'n TestTreeNode, (usize, usize)>, } - impl<'n> TreeNodeRefVisitor<'n> for TestVisitor<'n> { + impl<'n> TreeNodeVisitor<'n> for TestVisitor<'n> { type Node = TestTreeNode; fn f_down(&mut self, node: &'n Self::Node) -> Result { @@ -2058,7 +1985,7 @@ mod tests { } let mut visitor = TestVisitor { m: HashMap::new() }; - tree.visit_ref(&mut visitor)?; + tree.visit(&mut visitor)?; let expected = HashMap::from([ (node_f_ref, (1, 1)), diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 1e877573101..e536ae82323 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -134,7 +134,7 @@ impl ExecutionPlan for ArrowExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { Vec::new() } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 4e5140e82d3..934846046a4 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -111,7 +111,7 @@ impl ExecutionPlan for AvroExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { Vec::new() } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index cc7c837e471..f0aee82457c 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -173,7 +173,7 @@ impl ExecutionPlan for CsvExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 0180caa8501..cfd8f00439a 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -138,7 +138,7 @@ impl ExecutionPlan for NdJsonExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { Vec::new() } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index dd953878df4..0c09e1bdf1d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -363,7 +363,7 @@ impl ExecutionPlan for ParquetExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 59238ffc559..a1b91324975 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -2441,10 +2441,10 @@ impl<'a> BadPlanVisitor<'a> { } } -impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> { +impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> { type Node = LogicalPlan; - fn f_down(&mut self, node: &Self::Node) -> Result { + fn f_down(&mut self, node: &'n Self::Node) -> Result { match node { LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => { plan_err!("DDL not supported: {}", ddl.name()) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 1a82dac4658..05f05d95b8d 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -123,7 +123,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> return Some(child); } } - if let [ref childrens_child] = child.children().as_slice() { + if let [childrens_child] = child.children().as_slice() { child = Arc::clone(childrens_child); } else { break; diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index cd84e911d38..df74a005885 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1375,8 +1375,8 @@ pub(crate) mod tests { vec![false] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } // model that it requires the output ordering of its input diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index bc435626c6a..24306647c68 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -567,7 +567,7 @@ fn remove_corresponding_sort_from_sub_plan( // Replace with variants that do not preserve order. if is_sort_preserving_merge(&node.plan) { node.children = node.children.swap_remove(0).children; - node.plan = node.plan.children().swap_remove(0); + node.plan = node.plan.children().swap_remove(0).clone(); } else if let Some(repartition) = node.plan.as_any().downcast_ref::() { diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index 950bb3c8eeb..1274fbe50a5 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -78,7 +78,7 @@ impl LimitedDistinctAggregation { let mut is_global_limit = false; if let Some(local_limit) = plan.as_any().downcast_ref::() { limit = local_limit.fetch(); - children = local_limit.children(); + children = local_limit.children().into_iter().cloned().collect(); } else if let Some(global_limit) = plan.as_any().downcast_ref::() { global_fetch = global_limit.fetch(); @@ -86,7 +86,7 @@ impl LimitedDistinctAggregation { global_skip = global_limit.skip(); // the aggregate must read at least fetch+skip number of rows limit = global_fetch.unwrap() + global_skip; - children = global_limit.children(); + children = global_limit.children().into_iter().cloned().collect(); is_global_limit = true } else { return None; diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 5bf86e88d64..67b38dba90c 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -157,8 +157,8 @@ impl ExecutionPlan for OutputRequirementExec { vec![true] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn required_input_ordering(&self) -> Vec>> { @@ -273,7 +273,7 @@ fn require_top_ordering_helper( // When an operator requires an ordering, any `SortExec` below can not // be responsible for (i.e. the originator of) the global ordering. let (new_child, is_changed) = - require_top_ordering_helper(children.swap_remove(0))?; + require_top_ordering_helper(children.swap_remove(0).clone())?; Ok((plan.with_new_children(vec![new_child])?, is_changed)) } else { // Stop searching, there is no global ordering desired for the query. diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index fe1290e4077..07766884490 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -378,7 +378,7 @@ fn try_swapping_with_coalesce_partitions( return Ok(None); } // CoalescePartitionsExec always has a single child, so zero indexing is safe. - make_with_child(projection, &projection.input().children()[0]) + make_with_child(projection, projection.input().children()[0]) .map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _)) } @@ -526,7 +526,7 @@ fn try_pushdown_through_union( let new_children = union .children() .into_iter() - .map(|child| make_with_child(projection, &child)) + .map(|child| make_with_child(projection, child)) .collect::>>()?; Ok(Some(Arc::new(UnionExec::new(new_children)))) @@ -813,8 +813,8 @@ fn try_swapping_with_sort_merge_join( projection_as_columns, far_right_left_col_ind, far_left_right_col_ind, - &sm_join.children()[0], - &sm_join.children()[1], + sm_join.children()[0], + sm_join.children()[1], )?; Ok(Some(Arc::new(SortMergeJoinExec::try_new( diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index c527819e774..83531da3ca8 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -123,7 +123,7 @@ fn pushdown_requirement_to_children( if is_window(plan) { let required_input_ordering = plan.required_input_ordering(); let request_child = required_input_ordering[0].as_deref().unwrap_or(&[]); - let child_plan = plan.children().swap_remove(0); + let child_plan = plan.children().swap_remove(0).clone(); match determine_children_requirement(parent_required, request_child, child_plan) { RequirementsCompatibility::Satisfy => { let req = (!request_child.is_empty()).then(|| request_child.to_vec()); diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index 7c0519eda3b..b754ee75ef3 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -88,7 +88,7 @@ impl TopKAggregation { let sort = plan.as_any().downcast_ref::()?; let children = sort.children(); - let child = children.iter().exactly_one().ok()?; + let child = children.into_iter().exactly_one().ok()?; let order = sort.properties().output_ordering()?; let order = order.iter().exactly_one().ok()?; let limit = sort.fetch()?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 090b1d59d9a..f38c34b98a4 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2876,7 +2876,7 @@ mod tests { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 1152c70d439..133b8d87b53 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -426,7 +426,7 @@ impl ExecutionPlan for StatisticsExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 75ef364d01f..7aec66825de 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -293,7 +293,7 @@ impl ExecutionPlan for UnboundedExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 1d5668c7ec5..d151afc2c13 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -195,7 +195,7 @@ impl TestParquetFile { /// /// Recursively searches for ParquetExec and returns the metrics /// on the first one it finds - pub fn parquet_metrics(plan: Arc) -> Option { + pub fn parquet_metrics(plan: &Arc) -> Option { if let Some(parquet) = plan.as_any().downcast_ref::() { return parquet.metrics(); } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 4579fe806d6..8c9cffcf08d 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -101,7 +101,7 @@ impl ExecutionPlan for CustomPlan { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 85ac47dc97f..c7be89533f1 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -153,7 +153,7 @@ impl ExecutionPlan for StatisticsValidation { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 8df16e7944d..1e4ef0ecb2c 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -312,10 +312,10 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) { } let mut visitor = Visitor { expected_sort }; - impl TreeNodeVisitor for Visitor { + impl<'n> TreeNodeVisitor<'n> for Visitor { type Node = Arc; - fn f_down(&mut self, node: &Self::Node) -> Result { + fn f_down(&mut self, node: &'n Self::Node) -> Result { if let Some(exec) = node.as_any().downcast_ref::() { if self.expected_sort { assert!(matches!( diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index feb928a3a47..8c7624f0781 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -529,7 +529,7 @@ impl<'a> TestCase<'a> { // verify expected pushdown let metrics = - TestParquetFile::parquet_metrics(exec).expect("found parquet metrics"); + TestParquetFile::parquet_metrics(&exec).expect("found parquet metrics"); let pushdown_expected = if scan_options.pushdown_filters { self.pushdown_expected diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 54dcffe35f6..07622e48afa 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -472,8 +472,8 @@ impl ExecutionPlan for TopKExec { vec![Distribution::SinglePartition] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 3a2ed9ffc2d..44ced6dc5ba 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -58,12 +58,12 @@ impl<'a, 'b> IndentVisitor<'a, 'b> { } } -impl<'a, 'b> TreeNodeVisitor for IndentVisitor<'a, 'b> { +impl<'n, 'a, 'b> TreeNodeVisitor<'n> for IndentVisitor<'a, 'b> { type Node = LogicalPlan; fn f_down( &mut self, - plan: &LogicalPlan, + plan: &'n LogicalPlan, ) -> datafusion_common::Result { if self.indent > 0 { writeln!(self.f)?; @@ -84,7 +84,7 @@ impl<'a, 'b> TreeNodeVisitor for IndentVisitor<'a, 'b> { fn f_up( &mut self, - _plan: &LogicalPlan, + _plan: &'n LogicalPlan, ) -> datafusion_common::Result { self.indent -= 1; Ok(TreeNodeRecursion::Continue) @@ -180,12 +180,12 @@ impl<'a, 'b> GraphvizVisitor<'a, 'b> { } } -impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> { +impl<'n, 'a, 'b> TreeNodeVisitor<'n> for GraphvizVisitor<'a, 'b> { type Node = LogicalPlan; fn f_down( &mut self, - plan: &LogicalPlan, + plan: &'n LogicalPlan, ) -> datafusion_common::Result { let id = self.graphviz_builder.next_id(); @@ -648,12 +648,12 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { } } -impl<'a, 'b> TreeNodeVisitor for PgJsonVisitor<'a, 'b> { +impl<'n, 'a, 'b> TreeNodeVisitor<'n> for PgJsonVisitor<'a, 'b> { type Node = LogicalPlan; fn f_down( &mut self, - node: &LogicalPlan, + node: &'n LogicalPlan, ) -> datafusion_common::Result { let id = self.next_id; self.next_id += 1; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 42f3e1f163a..243b9203854 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2984,10 +2984,10 @@ digraph { strings: Vec, } - impl TreeNodeVisitor for OkVisitor { + impl<'n> TreeNodeVisitor<'n> for OkVisitor { type Node = LogicalPlan; - fn f_down(&mut self, plan: &LogicalPlan) -> Result { + fn f_down(&mut self, plan: &'n LogicalPlan) -> Result { let s = match plan { LogicalPlan::Projection { .. } => "pre_visit Projection", LogicalPlan::Filter { .. } => "pre_visit Filter", @@ -3001,7 +3001,7 @@ digraph { Ok(TreeNodeRecursion::Continue) } - fn f_up(&mut self, plan: &LogicalPlan) -> Result { + fn f_up(&mut self, plan: &'n LogicalPlan) -> Result { let s = match plan { LogicalPlan::Projection { .. } => "post_visit Projection", LogicalPlan::Filter { .. } => "post_visit Filter", @@ -3067,10 +3067,10 @@ digraph { return_false_from_post_in: OptionalCounter, } - impl TreeNodeVisitor for StoppingVisitor { + impl<'n> TreeNodeVisitor<'n> for StoppingVisitor { type Node = LogicalPlan; - fn f_down(&mut self, plan: &LogicalPlan) -> Result { + fn f_down(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_false_from_pre_in.dec() { return Ok(TreeNodeRecursion::Stop); } @@ -3079,7 +3079,7 @@ digraph { Ok(TreeNodeRecursion::Continue) } - fn f_up(&mut self, plan: &LogicalPlan) -> Result { + fn f_up(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_false_from_post_in.dec() { return Ok(TreeNodeRecursion::Stop); } @@ -3136,10 +3136,10 @@ digraph { return_error_from_post_in: OptionalCounter, } - impl TreeNodeVisitor for ErrorVisitor { + impl<'n> TreeNodeVisitor<'n> for ErrorVisitor { type Node = LogicalPlan; - fn f_down(&mut self, plan: &LogicalPlan) -> Result { + fn f_down(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_error_from_pre_in.dec() { return not_impl_err!("Error in pre_visit"); } @@ -3147,7 +3147,7 @@ digraph { self.inner.f_down(plan) } - fn f_up(&mut self, plan: &LogicalPlan) -> Result { + fn f_up(&mut self, plan: &'n LogicalPlan) -> Result { if self.return_error_from_post_in.dec() { return not_impl_err!("Error in post_visit"); } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ed582b6c534..3dc7aad33c6 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -56,7 +56,7 @@ use datafusion_common::{ }; impl TreeNode for LogicalPlan { - fn apply_children_ref<'n, F: FnMut(&'n Self) -> Result>( + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( &'n self, f: F, ) -> Result { @@ -736,10 +736,10 @@ impl LogicalPlan { /// Visits a plan similarly to [`Self::visit`], including subqueries that /// may appear in expressions such as `IN (SELECT ...)`. - pub fn visit_with_subqueries>( - &self, - visitor: &mut V, - ) -> Result { + pub fn visit_with_subqueries(&self, visitor: &mut V) -> Result + where + for<'n> V: TreeNodeVisitor<'n, Node = Self>, + { visitor .f_down(self)? .visit_children(|| { diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index 7cdcdf93540..b3ef45fa23f 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -30,7 +30,7 @@ use datafusion_common::tree_node::{ use datafusion_common::{map_until_stop_and_collect, Result}; impl TreeNode for Expr { - fn apply_children_ref<'n, F: FnMut(&'n Self) -> Result>( + fn apply_children<'n, F: FnMut(&'n Self) -> Result>( &'n self, f: F, ) -> Result { diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 3532a57f620..174440dac31 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -682,10 +682,10 @@ impl ExprIdentifierVisitor<'_> { } } -impl TreeNodeVisitor for ExprIdentifierVisitor<'_> { +impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_> { type Node = Expr; - fn f_down(&mut self, expr: &Expr) -> Result { + fn f_down(&mut self, expr: &'n Expr) -> Result { // related to https://github.com/apache/arrow-datafusion/issues/8814 // If the expr contain volatile expression or is a short-circuit expression, skip it. // TODO: propagate is_volatile state bottom-up + consider non-volatile sub-expressions for CSE @@ -704,7 +704,7 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> { Ok(TreeNodeRecursion::Continue) } - fn f_up(&mut self, expr: &Expr) -> Result { + fn f_up(&mut self, expr: &'n Expr) -> Result { let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else { return Ok(TreeNodeRecursion::Continue); }; diff --git a/datafusion/physical-expr-common/src/expressions/column.rs b/datafusion/physical-expr-common/src/expressions/column.rs index 2cd52d6332f..956c33d59b2 100644 --- a/datafusion/physical-expr-common/src/expressions/column.rs +++ b/datafusion/physical-expr-common/src/expressions/column.rs @@ -92,7 +92,7 @@ impl PhysicalExpr for Column { Ok(ColumnarValue::Array(batch.column(self.index).clone())) } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 00b3dd725dc..fd8d07dc966 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -65,7 +65,7 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { } /// Get a list of child PhysicalExpr that provide the input for this expr. - fn children(&self) -> Vec>; + fn children(&self) -> Vec<&Arc>; /// Returns a new PhysicalExpr where all children were replaced by new exprs. fn with_new_children( diff --git a/datafusion/physical-expr-common/src/tree_node.rs b/datafusion/physical-expr-common/src/tree_node.rs index 42dc6673af6..6e5a22199f2 100644 --- a/datafusion/physical-expr-common/src/tree_node.rs +++ b/datafusion/physical-expr-common/src/tree_node.rs @@ -26,7 +26,7 @@ use datafusion_common::tree_node::{ConcreteTreeNode, DynTreeNode}; use datafusion_common::Result; impl DynTreeNode for dyn PhysicalExpr { - fn arc_children(&self) -> Vec> { + fn arc_children(&self) -> Vec<&Arc> { self.children() } @@ -70,7 +70,12 @@ impl ExprContext { impl ExprContext { pub fn new_default(plan: Arc) -> Self { - let children = plan.children().into_iter().map(Self::new_default).collect(); + let children = plan + .children() + .into_iter() + .cloned() + .map(Self::new_default) + .collect(); Self::new(plan, Default::default(), children) } } diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 601d344e4aa..487aba945aa 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -35,7 +35,12 @@ impl ExprPropertiesNode { /// given physical expression. This node initializes with default properties /// and recursively applies this to all child expressions. pub fn new_unknown(expr: Arc) -> Self { - let children = expr.children().into_iter().map(Self::new_unknown).collect(); + let children = expr + .children() + .into_iter() + .cloned() + .map(Self::new_unknown) + .collect(); Self { expr, data: ExprProperties::new_unknown(), diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 9ea456b0f87..b4d12e96361 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -374,7 +374,7 @@ impl EquivalenceGroup { } children .into_iter() - .map(|child| self.project_expr(mapping, &child)) + .map(|child| self.project_expr(mapping, child)) .collect::>>() .map(|children| expr.clone().with_new_children(children).unwrap()) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 08f7523f92f..98df0cba9f3 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -312,8 +312,8 @@ impl PhysicalExpr for BinaryExpr { .map(ColumnarValue::Array) } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 7b10df9ac14..1a9c2d8390b 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -314,20 +314,17 @@ impl PhysicalExpr for CaseExpr { } } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { let mut children = vec![]; - match &self.expr { - Some(expr) => children.push(expr.clone()), - None => children.push(Arc::new(NoOp::new())), + if let Some(e) = self.expr.as_ref() { + children.push(e) } self.when_then_expr.iter().for_each(|(cond, value)| { - children.push(cond.clone()); - children.push(value.clone()); + children.push(cond); + children.push(value); }); - - match &self.else_expr { - Some(expr) => children.push(expr.clone()), - None => children.push(Arc::new(NoOp::new())), + if let Some(e) = self.else_expr.as_ref() { + children.push(e) } children } diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 79a44ac30cf..4f940a792bb 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -123,8 +123,8 @@ impl PhysicalExpr for CastExpr { value.cast_to(&self.cast_type, Some(&self.cast_options)) } - fn children(&self) -> Vec> { - vec![self.expr.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.expr] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 634a56d1d68..f6525c7c046 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -77,7 +77,7 @@ impl PhysicalExpr for UnKnownColumn { internal_err!("UnKnownColumn::evaluate() should not be called") } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 9ae4c2784cc..dd61fc80244 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -372,10 +372,10 @@ impl PhysicalExpr for InListExpr { Ok(ColumnarValue::Array(Arc::new(r))) } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { let mut children = vec![]; - children.push(self.expr.clone()); - children.extend(self.list.clone()); + children.push(&self.expr); + children.extend(&self.list); children } diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index c5c673ec28e..1918f0891ff 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -82,8 +82,8 @@ impl PhysicalExpr for IsNotNullExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index b0f70b6f0d7..3430efcd763 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -83,8 +83,8 @@ impl PhysicalExpr for IsNullExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index 6e0beeb0bee..eec347db8ed 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -112,8 +112,8 @@ impl PhysicalExpr for LikeExpr { } } - fn children(&self) -> Vec> { - vec![self.expr.clone(), self.pattern.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.expr, &self.pattern] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 371028959ab..fcaf229af0a 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -75,7 +75,7 @@ impl PhysicalExpr for Literal { Ok(ColumnarValue::Scalar(self.value.clone())) } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 62f865bd9b3..aed2675e044 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -89,8 +89,8 @@ impl PhysicalExpr for NegativeExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr/src/expressions/no_op.rs index b558ccab154..9148cb7c1c1 100644 --- a/datafusion/physical-expr/src/expressions/no_op.rs +++ b/datafusion/physical-expr/src/expressions/no_op.rs @@ -68,7 +68,7 @@ impl PhysicalExpr for NoOp { internal_err!("NoOp::evaluate() should not be called") } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 1428be71cc2..9aaab0658d3 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -89,8 +89,8 @@ impl PhysicalExpr for NotExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index d25a904f7d6..d31306e239b 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -97,8 +97,8 @@ impl PhysicalExpr for TryCastExpr { } } - fn children(&self) -> Vec> { - vec![self.expr.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.expr] } fn with_new_children( diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 14631caec55..10e29b41031 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -143,8 +143,8 @@ impl PhysicalExpr for ScalarFunctionExpr { Ok(output) } - fn children(&self) -> Vec> { - self.args.clone() + fn children(&self) -> Vec<&Arc> { + self.args.iter().collect() } fn with_new_children( diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 21608db40d5..aa197605dc9 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -696,8 +696,8 @@ impl ExecutionPlan for AggregateExec { vec![self.required_input_ordering.clone()] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( @@ -1640,7 +1640,7 @@ mod tests { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index c420581c432..5b859804163 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -124,8 +124,8 @@ impl ExecutionPlan for AnalyzeExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } /// AnalyzeExec is handled specially so this value is ignored diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index bc7c4a3d067..804fabff71a 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -117,8 +117,8 @@ impl ExecutionPlan for CoalesceBatchesExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 1c725ce31f1..ce67cba2cd0 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -102,8 +102,8 @@ impl ExecutionPlan for CoalescePartitionsExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index ca93ce5e7b8..ed85c80251d 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -501,7 +501,7 @@ mod tests { unimplemented!() } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 33bf1668b3c..11af0624db1 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -114,7 +114,7 @@ impl ExecutionPlan for EmptyExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 64994699322..4b2edbf2045 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -111,7 +111,7 @@ impl ExecutionPlan for ExplainExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6729e3b9e60..6153dbacfbf 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -242,8 +242,8 @@ impl ExecutionPlan for FilterExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 259db644ae0..fa30141a193 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -248,8 +248,8 @@ impl ExecutionPlan for DataSinkExec { vec![true] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 9d1de3715f5..92443d06856 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -207,8 +207,8 @@ impl ExecutionPlan for CrossJoinExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d3abedbe380..e669517be40 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -683,8 +683,8 @@ impl ExecutionPlan for HashJoinExec { Self::maintains_input_order(self.join_type) } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 47e262c3c8f..18518600ef2 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -292,8 +292,8 @@ impl ExecutionPlan for NestedLoopJoinExec { ] } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 1cc7bf4700d..ec83fe3f2af 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -298,8 +298,8 @@ impl ExecutionPlan for SortMergeJoinExec { Self::maintains_input_order(self.join_type) } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 9d48c2a7d40..0d902af9c6c 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -427,8 +427,8 @@ impl ExecutionPlan for SymmetricHashJoinExec { ] } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 8d8a3e71031..739bff2cfa2 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -215,7 +215,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// The returned list will be empty for leaf nodes such as scans, will contain /// a single value for unary nodes, or two values for binary nodes (such as /// joins). - fn children(&self) -> Vec>; + fn children(&self) -> Vec<&Arc>; /// Returns a new `ExecutionPlan` where all existing children were replaced /// by the `children`, in order @@ -841,7 +841,7 @@ mod tests { unimplemented!() } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -900,7 +900,7 @@ mod tests { unimplemented!() } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 4f8ff4c5606..4c6d1b3674d 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -124,8 +124,8 @@ impl ExecutionPlan for GlobalLimitExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn required_input_distribution(&self) -> Vec { @@ -334,8 +334,8 @@ impl ExecutionPlan for LocalLimitExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 883cdb540a9..39ae8d551f4 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -116,7 +116,7 @@ impl ExecutionPlan for MemoryExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index c94c2b0607d..3b10cc0ac43 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -132,7 +132,7 @@ impl ExecutionPlan for PlaceholderRowExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f72815c01a9..8341549340d 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -193,8 +193,8 @@ impl ExecutionPlan for ProjectionExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index ed897d78f0c..9a0b66caba3 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -120,8 +120,8 @@ impl ExecutionPlan for RecursiveQueryExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.static_term.clone(), self.recursive_term.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.static_term, &self.recursive_term] } // TODO: control these hints and see whether we can @@ -358,7 +358,9 @@ fn reset_plan_states(plan: Arc) -> Result() { Ok(Transformed::no(plan)) } else { - let new_plan = plan.clone().with_new_children(plan.children())?; + let new_plan = plan + .clone() + .with_new_children(plan.children().into_iter().cloned().collect())?; Ok(Transformed::yes(new_plan)) } }) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index e31fdc6ee2c..65f7d5070a5 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -519,8 +519,8 @@ impl ExecutionPlan for RepartitionExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index d24bc5a670e..ad5d485cffc 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -250,8 +250,8 @@ impl ExecutionPlan for PartialSortExec { vec![false] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c684748bb29..2a486253459 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -903,8 +903,8 @@ impl ExecutionPlan for SortExec { } } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 88c6c312b94..8a349bd22ab 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -173,8 +173,8 @@ impl ExecutionPlan for SortPreservingMergeExec { vec![true] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index d174e3b8b6c..ff57adde4e2 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -217,7 +217,7 @@ impl ExecutionPlan for StreamingTableExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index b4f1eac0a65..d5ad9292b49 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -185,7 +185,7 @@ impl ExecutionPlan for MockExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -343,7 +343,7 @@ impl ExecutionPlan for BarrierExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { unimplemented!() } @@ -452,7 +452,7 @@ impl ExecutionPlan for ErrorExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { unimplemented!() } @@ -535,7 +535,7 @@ impl ExecutionPlan for StatisticsExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -627,7 +627,7 @@ impl ExecutionPlan for BlockingExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } @@ -768,7 +768,7 @@ impl ExecutionPlan for PanicExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { // this is a leaf node and has no children vec![] } diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 46460cbb668..45bd320d894 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -26,7 +26,7 @@ use datafusion_common::tree_node::{ConcreteTreeNode, DynTreeNode}; use datafusion_common::Result; impl DynTreeNode for dyn ExecutionPlan { - fn arc_children(&self) -> Vec> { + fn arc_children(&self) -> Vec<&Arc> { self.children() } @@ -71,7 +71,12 @@ impl PlanContext { impl PlanContext { pub fn new_default(plan: Arc) -> Self { - let children = plan.children().into_iter().map(Self::new_default).collect(); + let children = plan + .children() + .into_iter() + .cloned() + .map(Self::new_default) + .collect(); Self::new(plan, Default::default(), children) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 1354644788e..dc7d270bae2 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -196,8 +196,8 @@ impl ExecutionPlan for UnionExec { &self.cache } - fn children(&self) -> Vec> { - self.inputs.clone() + fn children(&self) -> Vec<&Arc> { + self.inputs.iter().collect() } fn maintains_input_order(&self) -> Vec { @@ -387,8 +387,8 @@ impl ExecutionPlan for InterleaveExec { &self.cache } - fn children(&self) -> Vec> { - self.inputs.clone() + fn children(&self) -> Vec<&Arc> { + self.inputs.iter().collect() } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 06dd8230d39..af50a307efb 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -127,8 +127,8 @@ impl ExecutionPlan for UnnestExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 2aa893fd291..4d385812d4a 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -167,7 +167,7 @@ impl ExecutionPlan for ValuesExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index cff91283eb6..48f1bee59bb 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -250,8 +250,8 @@ impl ExecutionPlan for BoundedWindowAggExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn required_input_ordering(&self) -> Vec>> { diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 1507902c22e..eb01da2ec09 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -184,8 +184,8 @@ impl ExecutionPlan for WindowAggExec { &self.cache } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index b3c9043d4fd..003957947fe 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -169,7 +169,7 @@ impl ExecutionPlan for WorkTableExec { &self.cache } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0515ed5006a..4c581f99b32 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1972,6 +1972,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let inputs: Vec = plan_clone .children() .into_iter() + .cloned() .map(|i| { protobuf::PhysicalPlanNode::try_from_physical_plan( i,