From 9b2ee9fedb7bf63b0cc99dcd63c1052f6cd3c268 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 13 Mar 2023 17:37:18 +0800 Subject: [PATCH 1/5] Add a utility function to collect all of the PartitionedFile for an ExecutionPlan --- .../src/datasource/file_format/parquet.rs | 20 ++++++ .../src/physical_plan/file_format/json.rs | 5 ++ .../core/src/physical_plan/file_format/mod.rs | 48 +++++++++++++ datafusion/core/src/physical_plan/mod.rs | 2 + .../core/src/physical_plan/tree_node.rs | 67 +++++++++++++++++++ .../core/src/physical_plan/visitable.rs | 45 +++++++++++++ 6 files changed, 187 insertions(+) create mode 100644 datafusion/core/src/physical_plan/tree_node.rs create mode 100644 datafusion/core/src/physical_plan/visitable.rs diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 53e94167d845..ba18e9f62c25 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -620,6 +620,7 @@ mod tests { use super::*; use crate::datasource::file_format::parquet::test_util::store_parquet; + use crate::physical_plan::file_format::get_scan_files; use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; @@ -1215,6 +1216,25 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_scan_files() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let projection = Some(vec![9]); + let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; + let scan_files = get_scan_files(exec)?; + assert_eq!(scan_files.len(), 1); + assert_eq!(scan_files[0].len(), 1); + assert_eq!(scan_files[0][0].len(), 1); + assert!(scan_files[0][0][0] + .object_meta + .location + .to_string() + .contains("alltypes_plain.parquet")); + + Ok(()) + } + fn check_page_index_validation( page_index: Option<&ParquetColumnIndex>, offset_index: Option<&ParquetOffsetIndex>, diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 6f26080122c6..146227335c99 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -73,6 +73,11 @@ impl NdJsonExec { file_compression_type, } } + + /// Ref to the base configs + pub fn base_config(&self) -> &FileScanConfig { + &self.base_config + } } impl ExecutionPlan for NdJsonExec { diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 97b091cedfef..c343fe5e5637 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -45,6 +45,10 @@ use crate::datasource::{ listing::{FileRange, PartitionedFile}, object_store::ObjectStoreUrl, }; +use crate::physical_plan::tree_node::{ + TreeNodeVisitable, TreeNodeVisitor, VisitRecursion, +}; +use crate::physical_plan::ExecutionPlan; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -68,6 +72,50 @@ pub fn partition_type_wrap(val_type: DataType) -> DataType { DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type)) } +/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`] +pub fn get_scan_files( + plan: Arc, +) -> Result>>> { + let mut collector = FileScanCollector::new(); + collector = plan.accept(collector)?; + Ok(collector.file_groups) +} + +struct FileScanCollector { + file_groups: Vec>>, +} + +impl FileScanCollector { + fn new() -> Self { + Self { + file_groups: vec![], + } + } +} + +impl TreeNodeVisitor for FileScanCollector { + type N = Arc; + + fn pre_visit(mut self, node: &Self::N) -> Result> { + let plan_any = node.as_any(); + let file_groups = + if let Some(parquet_exec) = plan_any.downcast_ref::() { + parquet_exec.base_config().file_groups.clone() + } else if let Some(avro_exec) = plan_any.downcast_ref::() { + avro_exec.base_config().file_groups.clone() + } else if let Some(json_exec) = plan_any.downcast_ref::() { + json_exec.base_config().file_groups.clone() + } else if let Some(csv_exec) = plan_any.downcast_ref::() { + csv_exec.base_config().file_groups.clone() + } else { + return Ok(VisitRecursion::Continue(self)); + }; + + self.file_groups.push(file_groups); + Ok(VisitRecursion::Stop(self)) + } +} + /// The base configurations to provide when creating a physical plan for /// any given file format. #[derive(Debug, Clone)] diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index dbd1024ae482..f504ca80befe 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -657,10 +657,12 @@ pub mod rewrite; pub mod sorts; pub mod stream; pub mod streaming; +pub mod tree_node; pub mod udaf; pub mod union; pub mod unnest; pub mod values; +pub mod visitable; pub mod windows; use crate::execution::context::TaskContext; diff --git a/datafusion/core/src/physical_plan/tree_node.rs b/datafusion/core/src/physical_plan/tree_node.rs new file mode 100644 index 000000000000..b96d3da39348 --- /dev/null +++ b/datafusion/core/src/physical_plan/tree_node.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides common traits for visiting or rewriting tree nodes easily. + +use datafusion_common::Result; + +/// Implements the [visitor +/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively walking [`TreeNodeVisitable`]s. +/// +/// [`TreeNodeVisitor`] allows keeping the algorithms +/// separate from the code to traverse the structure of the `TreeNodeVisitable` +/// tree and makes it easier to add new types of tree node and +/// algorithms by. +/// +/// When passed to[`TreeNodeVisitable::accept`], [`TreeNodeVisitor::pre_visit`] +/// and [`TreeNodeVisitor::post_visit`] are invoked recursively +/// on an node tree. +/// +/// If an [`Err`] result is returned, recursion is stopped +/// immediately. +/// +/// If [`Recursion::Stop`] is returned on a call to pre_visit, no +/// children of that tree node are visited, nor is post_visit +/// called on that tree node +pub trait TreeNodeVisitor: Sized { + /// The node type which is visitable. + type N: TreeNodeVisitable; + + /// Invoked before any children of `node` are visited. + fn pre_visit(self, node: &Self::N) -> Result>; + + /// Invoked after all children of `node` are visited. Default + /// implementation does nothing. + fn post_visit(self, _node: &Self::N) -> Result { + Ok(self) + } +} + +/// trait for types that can be visited by [`TreeNodeVisitor`] +pub trait TreeNodeVisitable: Sized { + /// accept a visitor, calling `visit` on all children of this + fn accept>(&self, visitor: V) -> Result; +} + +/// Controls how the visitor recursion should proceed. +pub enum VisitRecursion { + /// Attempt to visit all the children, recursively. + Continue(V), + /// Do not visit the children of this tree node, though the walk + /// of parents of this tree node will not be affected + Stop(V), +} diff --git a/datafusion/core/src/physical_plan/visitable.rs b/datafusion/core/src/physical_plan/visitable.rs new file mode 100644 index 000000000000..ada7820643e7 --- /dev/null +++ b/datafusion/core/src/physical_plan/visitable.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ExecutionPlan visitor + +use crate::physical_plan::tree_node::{ + TreeNodeVisitable, TreeNodeVisitor, VisitRecursion, +}; +use crate::physical_plan::ExecutionPlan; +use std::sync::Arc; + +impl TreeNodeVisitable for Arc { + /// Performs a depth first walk of a node and + /// its children, see [`TreeNodeVisitor`] for more details + fn accept>>( + &self, + visitor: V, + ) -> datafusion_common::Result { + let mut visitor = match visitor.pre_visit(self)? { + VisitRecursion::Continue(visitor) => visitor, + // If the recursion should stop, do not visit children + VisitRecursion::Stop(visitor) => return Ok(visitor), + }; + + for child in self.children() { + visitor = child.accept(visitor)?; + } + + visitor.post_visit(self) + } +} From 4332add7d69c116c36c62259122b831448ef0109 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 14 Mar 2023 11:17:46 +0800 Subject: [PATCH 2/5] Reorganize the tree node related files --- datafusion/core/src/physical_plan/mod.rs | 1 - .../{tree_node.rs => tree_node/mod.rs} | 21 ++++++++++++++-- .../{ => tree_node}/visitable.rs | 25 +++---------------- 3 files changed, 23 insertions(+), 24 deletions(-) rename datafusion/core/src/physical_plan/{tree_node.rs => tree_node/mod.rs} (81%) rename datafusion/core/src/physical_plan/{ => tree_node}/visitable.rs (54%) diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index f504ca80befe..7c132246e889 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -662,7 +662,6 @@ pub mod udaf; pub mod union; pub mod unnest; pub mod values; -pub mod visitable; pub mod windows; use crate::execution::context::TaskContext; diff --git a/datafusion/core/src/physical_plan/tree_node.rs b/datafusion/core/src/physical_plan/tree_node/mod.rs similarity index 81% rename from datafusion/core/src/physical_plan/tree_node.rs rename to datafusion/core/src/physical_plan/tree_node/mod.rs index b96d3da39348..7684757f6e48 100644 --- a/datafusion/core/src/physical_plan/tree_node.rs +++ b/datafusion/core/src/physical_plan/tree_node/mod.rs @@ -17,6 +17,8 @@ //! This module provides common traits for visiting or rewriting tree nodes easily. +pub mod visitable; + use datafusion_common::Result; /// Implements the [visitor @@ -53,8 +55,23 @@ pub trait TreeNodeVisitor: Sized { /// trait for types that can be visited by [`TreeNodeVisitor`] pub trait TreeNodeVisitable: Sized { - /// accept a visitor, calling `visit` on all children of this - fn accept>(&self, visitor: V) -> Result; + /// Return the children of this tree node + fn get_children(&self) -> Vec; + + /// Accept a visitor, calling `visit` on all children of this + fn accept>(&self, visitor: V) -> Result { + let mut visitor = match visitor.pre_visit(self)? { + VisitRecursion::Continue(visitor) => visitor, + // If the recursion should stop, do not visit children + VisitRecursion::Stop(visitor) => return Ok(visitor), + }; + + for child in self.get_children() { + visitor = child.accept(visitor)?; + } + + visitor.post_visit(self) + } } /// Controls how the visitor recursion should proceed. diff --git a/datafusion/core/src/physical_plan/visitable.rs b/datafusion/core/src/physical_plan/tree_node/visitable.rs similarity index 54% rename from datafusion/core/src/physical_plan/visitable.rs rename to datafusion/core/src/physical_plan/tree_node/visitable.rs index ada7820643e7..935c8adb7ea7 100644 --- a/datafusion/core/src/physical_plan/visitable.rs +++ b/datafusion/core/src/physical_plan/tree_node/visitable.rs @@ -15,31 +15,14 @@ // specific language governing permissions and limitations // under the License. -//! ExecutionPlan visitor +//! Tree node visitable implementations -use crate::physical_plan::tree_node::{ - TreeNodeVisitable, TreeNodeVisitor, VisitRecursion, -}; +use crate::physical_plan::tree_node::TreeNodeVisitable; use crate::physical_plan::ExecutionPlan; use std::sync::Arc; impl TreeNodeVisitable for Arc { - /// Performs a depth first walk of a node and - /// its children, see [`TreeNodeVisitor`] for more details - fn accept>>( - &self, - visitor: V, - ) -> datafusion_common::Result { - let mut visitor = match visitor.pre_visit(self)? { - VisitRecursion::Continue(visitor) => visitor, - // If the recursion should stop, do not visit children - VisitRecursion::Stop(visitor) => return Ok(visitor), - }; - - for child in self.children() { - visitor = child.accept(visitor)?; - } - - visitor.post_visit(self) + fn get_children(&self) -> Vec { + self.children() } } From a8bb92935fc1ab383b4674b7cbf873de522c8ed7 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 14 Mar 2023 11:50:59 +0800 Subject: [PATCH 3/5] Move TreeNodeRewritable for ExecutionPlan to the tree_node --- .../physical_optimizer/coalesce_batches.rs | 2 +- .../physical_optimizer/dist_enforcement.rs | 2 +- .../global_sort_selection.rs | 2 +- .../src/physical_optimizer/join_selection.rs | 2 +- .../physical_optimizer/pipeline_checker.rs | 2 +- .../src/physical_optimizer/pipeline_fixer.rs | 2 +- .../physical_optimizer/sort_enforcement.rs | 2 +- datafusion/core/src/physical_plan/mod.rs | 1 - datafusion/core/src/physical_plan/rewrite.rs | 166 ------------------ .../core/src/physical_plan/tree_node/mod.rs | 129 +++++++++++++- .../src/physical_plan/tree_node/rewritable.rs | 39 ++++ 11 files changed, 174 insertions(+), 175 deletions(-) delete mode 100644 datafusion/core/src/physical_plan/rewrite.rs create mode 100644 datafusion/core/src/physical_plan/tree_node/rewritable.rs diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index a1566c37cd2d..9fd29cf89674 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -24,7 +24,7 @@ use crate::{ physical_optimizer::PhysicalOptimizerRule, physical_plan::{ coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, - repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning, + repartition::RepartitionExec, tree_node::TreeNodeRewritable, Partitioning, }, }; use std::sync::Arc; diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index 919273af7467..95d4427e6dc4 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -28,8 +28,8 @@ use crate::physical_plan::joins::{ }; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortOptions; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index 81b4b59e3a14..647558fbff12 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -22,9 +22,9 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; -use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::ExecutionPlan; /// Currently for a sort operator, if diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index d9787881f161..2308b2c85dda 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -32,7 +32,7 @@ use crate::physical_plan::{ExecutionPlan, PhysicalExpr}; use super::optimizer::PhysicalOptimizerRule; use crate::error::Result; -use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::tree_node::TreeNodeRewritable; /// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make /// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 8a6b0e003adf..f097196cd1fc 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -22,7 +22,7 @@ use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; -use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use std::sync::Arc; diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs index 7e85f0c0d5f7..7532914c1258 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs @@ -34,7 +34,7 @@ use crate::physical_plan::joins::{ convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode, SymmetricHashJoinExec, }; -use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::ExecutionPlan; use datafusion_common::DataFusionError; use datafusion_expr::logical_plan::JoinType; diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 70880b750552..261c19600c81 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -39,9 +39,9 @@ use crate::physical_optimizer::utils::add_sort_above; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 7c132246e889..9e0e03a77d04 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -653,7 +653,6 @@ pub mod metrics; pub mod planner; pub mod projection; pub mod repartition; -pub mod rewrite; pub mod sorts; pub mod stream; pub mod streaming; diff --git a/datafusion/core/src/physical_plan/rewrite.rs b/datafusion/core/src/physical_plan/rewrite.rs deleted file mode 100644 index 2972b546bb0e..000000000000 --- a/datafusion/core/src/physical_plan/rewrite.rs +++ /dev/null @@ -1,166 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Trait to make Executionplan rewritable - -use crate::physical_plan::with_new_children_if_necessary; -use crate::physical_plan::ExecutionPlan; -use datafusion_common::Result; - -use std::sync::Arc; - -/// a Trait for marking tree node types that are rewritable -pub trait TreeNodeRewritable: Clone { - /// Transform the tree node using the given [TreeNodeRewriter] - /// It performs a depth first walk of an node and its children. - /// - /// For an node tree such as - /// ```text - /// ParentNode - /// left: ChildNode1 - /// right: ChildNode2 - /// ``` - /// - /// The nodes are visited using the following order - /// ```text - /// pre_visit(ParentNode) - /// pre_visit(ChildNode1) - /// mutate(ChildNode1) - /// pre_visit(ChildNode2) - /// mutate(ChildNode2) - /// mutate(ParentNode) - /// ``` - /// - /// If an Err result is returned, recursion is stopped immediately - /// - /// If [`false`] is returned on a call to pre_visit, no - /// children of that node are visited, nor is mutate - /// called on that node - /// - fn transform_using>( - self, - rewriter: &mut R, - ) -> Result { - let need_mutate = match rewriter.pre_visit(&self)? { - RewriteRecursion::Mutate => return rewriter.mutate(self), - RewriteRecursion::Stop => return Ok(self), - RewriteRecursion::Continue => true, - RewriteRecursion::Skip => false, - }; - - let after_op_children = - self.map_children(|node| node.transform_using(rewriter))?; - - // now rewrite this node itself - if need_mutate { - rewriter.mutate(after_op_children) - } else { - Ok(after_op_children) - } - } - - /// Convenience utils for writing optimizers rule: recursively apply the given `op` to the node tree. - /// When `op` does not apply to a given node, it is left unchanged. - /// The default tree traversal direction is transform_up(Postorder Traversal). - fn transform(self, op: &F) -> Result - where - F: Fn(Self) -> Result>, - { - self.transform_up(op) - } - - /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its - /// children(Preorder Traversal). - /// When the `op` does not apply to a given node, it is left unchanged. - fn transform_down(self, op: &F) -> Result - where - F: Fn(Self) -> Result>, - { - let node_cloned = self.clone(); - let after_op = match op(node_cloned)? { - Some(value) => value, - None => self, - }; - after_op.map_children(|node| node.transform_down(op)) - } - - /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its - /// children and then itself(Postorder Traversal). - /// When the `op` does not apply to a given node, it is left unchanged. - fn transform_up(self, op: &F) -> Result - where - F: Fn(Self) -> Result>, - { - let after_op_children = self.map_children(|node| node.transform_up(op))?; - - let after_op_children_clone = after_op_children.clone(); - let new_node = match op(after_op_children)? { - Some(value) => value, - None => after_op_children_clone, - }; - Ok(new_node) - } - - /// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder) - fn map_children(self, transform: F) -> Result - where - F: FnMut(Self) -> Result; -} - -/// Trait for potentially recursively transform an [`TreeNodeRewritable`] node -/// tree. When passed to `TreeNodeRewritable::transform_using`, `TreeNodeRewriter::mutate` is -/// invoked recursively on all nodes of a tree. -pub trait TreeNodeRewriter: Sized { - /// Invoked before (Preorder) any children of `node` are rewritten / - /// visited. Default implementation returns `Ok(RewriteRecursion::Continue)` - fn pre_visit(&mut self, _node: &N) -> Result { - Ok(RewriteRecursion::Continue) - } - - /// Invoked after (Postorder) all children of `node` have been mutated and - /// returns a potentially modified node. - fn mutate(&mut self, node: N) -> Result; -} - -/// Controls how the [TreeNodeRewriter] recursion should proceed. -#[allow(dead_code)] -pub enum RewriteRecursion { - /// Continue rewrite / visit this node tree. - Continue, - /// Call 'op' immediately and return. - Mutate, - /// Do not rewrite / visit the children of this node. - Stop, - /// Keep recursive but skip apply op on this node - Skip, -} - -impl TreeNodeRewritable for Arc { - fn map_children(self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - let children = self.children(); - if !children.is_empty() { - let new_children: Result> = - children.into_iter().map(transform).collect(); - with_new_children_if_necessary(self, new_children?) - } else { - Ok(self) - } - } -} diff --git a/datafusion/core/src/physical_plan/tree_node/mod.rs b/datafusion/core/src/physical_plan/tree_node/mod.rs index 7684757f6e48..ce14a4b90136 100644 --- a/datafusion/core/src/physical_plan/tree_node/mod.rs +++ b/datafusion/core/src/physical_plan/tree_node/mod.rs @@ -17,6 +17,7 @@ //! This module provides common traits for visiting or rewriting tree nodes easily. +pub mod rewritable; pub mod visitable; use datafusion_common::Result; @@ -53,7 +54,7 @@ pub trait TreeNodeVisitor: Sized { } } -/// trait for types that can be visited by [`TreeNodeVisitor`] +/// Trait for types that can be visited by [`TreeNodeVisitor`] pub trait TreeNodeVisitable: Sized { /// Return the children of this tree node fn get_children(&self) -> Vec; @@ -82,3 +83,129 @@ pub enum VisitRecursion { /// of parents of this tree node will not be affected Stop(V), } + +/// Trait for marking tree node as rewritable +pub trait TreeNodeRewritable: Clone { + /// Convenience utils for writing optimizers rule: recursively apply the given `op` to the node tree. + /// When `op` does not apply to a given node, it is left unchanged. + /// The default tree traversal direction is transform_up(Postorder Traversal). + fn transform(self, op: &F) -> Result + where + F: Fn(Self) -> Result>, + { + self.transform_up(op) + } + + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its + /// children(Preorder Traversal). + /// When the `op` does not apply to a given node, it is left unchanged. + fn transform_down(self, op: &F) -> Result + where + F: Fn(Self) -> Result>, + { + let node_cloned = self.clone(); + let after_op = match op(node_cloned)? { + Some(value) => value, + None => self, + }; + after_op.map_children(|node| node.transform_down(op)) + } + + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its + /// children and then itself(Postorder Traversal). + /// When the `op` does not apply to a given node, it is left unchanged. + fn transform_up(self, op: &F) -> Result + where + F: Fn(Self) -> Result>, + { + let after_op_children = self.map_children(|node| node.transform_up(op))?; + + let after_op_children_clone = after_op_children.clone(); + let new_node = match op(after_op_children)? { + Some(value) => value, + None => after_op_children_clone, + }; + Ok(new_node) + } + + /// Transform the tree node using the given [TreeNodeRewriter] + /// It performs a depth first walk of an node and its children. + /// + /// For an node tree such as + /// ```text + /// ParentNode + /// left: ChildNode1 + /// right: ChildNode2 + /// ``` + /// + /// The nodes are visited using the following order + /// ```text + /// pre_visit(ParentNode) + /// pre_visit(ChildNode1) + /// mutate(ChildNode1) + /// pre_visit(ChildNode2) + /// mutate(ChildNode2) + /// mutate(ParentNode) + /// ``` + /// + /// If an Err result is returned, recursion is stopped immediately + /// + /// If [`false`] is returned on a call to pre_visit, no + /// children of that node are visited, nor is mutate + /// called on that node + /// + fn transform_using>( + self, + rewriter: &mut R, + ) -> Result { + let need_mutate = match rewriter.pre_visit(&self)? { + RewriteRecursion::Mutate => return rewriter.mutate(self), + RewriteRecursion::Stop => return Ok(self), + RewriteRecursion::Continue => true, + RewriteRecursion::Skip => false, + }; + + let after_op_children = + self.map_children(|node| node.transform_using(rewriter))?; + + // now rewrite this node itself + if need_mutate { + rewriter.mutate(after_op_children) + } else { + Ok(after_op_children) + } + } + + /// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder) + fn map_children(self, transform: F) -> Result + where + F: FnMut(Self) -> Result; +} + +/// Trait for potentially recursively transform an [`TreeNodeRewritable`] node +/// tree. When passed to `TreeNodeRewritable::transform_using`, `TreeNodeRewriter::mutate` is +/// invoked recursively on all nodes of a tree. +pub trait TreeNodeRewriter: Sized { + /// Invoked before (Preorder) any children of `node` are rewritten / + /// visited. Default implementation returns `Ok(RewriteRecursion::Continue)` + fn pre_visit(&mut self, _node: &N) -> Result { + Ok(RewriteRecursion::Continue) + } + + /// Invoked after (Postorder) all children of `node` have been mutated and + /// returns a potentially modified node. + fn mutate(&mut self, node: N) -> Result; +} + +/// Controls how the [TreeNodeRewriter] recursion should proceed. +#[allow(dead_code)] +pub enum RewriteRecursion { + /// Continue rewrite / visit this node tree. + Continue, + /// Call 'op' immediately and return. + Mutate, + /// Do not rewrite / visit the children of this node. + Stop, + /// Keep recursive but skip apply op on this node + Skip, +} diff --git a/datafusion/core/src/physical_plan/tree_node/rewritable.rs b/datafusion/core/src/physical_plan/tree_node/rewritable.rs new file mode 100644 index 000000000000..004fc47fd7ac --- /dev/null +++ b/datafusion/core/src/physical_plan/tree_node/rewritable.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tree node rewritable implementations + +use crate::physical_plan::tree_node::TreeNodeRewritable; +use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use datafusion_common::Result; +use std::sync::Arc; + +impl TreeNodeRewritable for Arc { + fn map_children(self, transform: F) -> Result + where + F: FnMut(Self) -> Result, + { + let children = self.children(); + if !children.is_empty() { + let new_children: Result> = + children.into_iter().map(transform).collect(); + with_new_children_if_necessary(self, new_children?) + } else { + Ok(self) + } + } +} From eb53dc0e2327c4f9d145be58559d266dc91ff812 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 14 Mar 2023 15:45:22 +0800 Subject: [PATCH 4/5] Change self to &mut self for TreeNodeVisitor --- .../core/src/physical_plan/file_format/mod.rs | 8 ++--- .../core/src/physical_plan/tree_node/mod.rs | 33 ++++++++++--------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index c343fe5e5637..d7616a3c2d79 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -77,7 +77,7 @@ pub fn get_scan_files( plan: Arc, ) -> Result>>> { let mut collector = FileScanCollector::new(); - collector = plan.accept(collector)?; + plan.accept(&mut collector)?; Ok(collector.file_groups) } @@ -96,7 +96,7 @@ impl FileScanCollector { impl TreeNodeVisitor for FileScanCollector { type N = Arc; - fn pre_visit(mut self, node: &Self::N) -> Result> { + fn pre_visit(&mut self, node: &Self::N) -> Result { let plan_any = node.as_any(); let file_groups = if let Some(parquet_exec) = plan_any.downcast_ref::() { @@ -108,11 +108,11 @@ impl TreeNodeVisitor for FileScanCollector { } else if let Some(csv_exec) = plan_any.downcast_ref::() { csv_exec.base_config().file_groups.clone() } else { - return Ok(VisitRecursion::Continue(self)); + return Ok(VisitRecursion::Continue); }; self.file_groups.push(file_groups); - Ok(VisitRecursion::Stop(self)) + Ok(VisitRecursion::Stop) } } diff --git a/datafusion/core/src/physical_plan/tree_node/mod.rs b/datafusion/core/src/physical_plan/tree_node/mod.rs index ce14a4b90136..327d938d4ec4 100644 --- a/datafusion/core/src/physical_plan/tree_node/mod.rs +++ b/datafusion/core/src/physical_plan/tree_node/mod.rs @@ -45,12 +45,12 @@ pub trait TreeNodeVisitor: Sized { type N: TreeNodeVisitable; /// Invoked before any children of `node` are visited. - fn pre_visit(self, node: &Self::N) -> Result>; + fn pre_visit(&mut self, node: &Self::N) -> Result; /// Invoked after all children of `node` are visited. Default /// implementation does nothing. - fn post_visit(self, _node: &Self::N) -> Result { - Ok(self) + fn post_visit(&mut self, _node: &Self::N) -> Result<()> { + Ok(()) } } @@ -60,15 +60,15 @@ pub trait TreeNodeVisitable: Sized { fn get_children(&self) -> Vec; /// Accept a visitor, calling `visit` on all children of this - fn accept>(&self, visitor: V) -> Result { - let mut visitor = match visitor.pre_visit(self)? { - VisitRecursion::Continue(visitor) => visitor, + fn accept>(&self, visitor: &mut V) -> Result<()> { + match visitor.pre_visit(self)? { + VisitRecursion::Continue => {} // If the recursion should stop, do not visit children - VisitRecursion::Stop(visitor) => return Ok(visitor), + VisitRecursion::Stop => return Ok(()), }; for child in self.get_children() { - visitor = child.accept(visitor)?; + child.accept(visitor)?; } visitor.post_visit(self) @@ -76,12 +76,12 @@ pub trait TreeNodeVisitable: Sized { } /// Controls how the visitor recursion should proceed. -pub enum VisitRecursion { +pub enum VisitRecursion { /// Attempt to visit all the children, recursively. - Continue(V), + Continue, /// Do not visit the children of this tree node, though the walk /// of parents of this tree node will not be affected - Stop(V), + Stop, } /// Trait for marking tree node as rewritable @@ -154,7 +154,7 @@ pub trait TreeNodeRewritable: Clone { /// children of that node are visited, nor is mutate /// called on that node /// - fn transform_using>( + fn transform_using>( self, rewriter: &mut R, ) -> Result { @@ -185,16 +185,19 @@ pub trait TreeNodeRewritable: Clone { /// Trait for potentially recursively transform an [`TreeNodeRewritable`] node /// tree. When passed to `TreeNodeRewritable::transform_using`, `TreeNodeRewriter::mutate` is /// invoked recursively on all nodes of a tree. -pub trait TreeNodeRewriter: Sized { +pub trait TreeNodeRewriter: Sized { + /// The node type which is rewritable. + type N: TreeNodeRewritable; + /// Invoked before (Preorder) any children of `node` are rewritten / /// visited. Default implementation returns `Ok(RewriteRecursion::Continue)` - fn pre_visit(&mut self, _node: &N) -> Result { + fn pre_visit(&mut self, _node: &Self::N) -> Result { Ok(RewriteRecursion::Continue) } /// Invoked after (Postorder) all children of `node` have been mutated and /// returns a potentially modified node. - fn mutate(&mut self, node: N) -> Result; + fn mutate(&mut self, node: Self::N) -> Result; } /// Controls how the [TreeNodeRewriter] recursion should proceed. From b734b4a0419292dcad9d45d1681e777d23739c8d Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 14 Mar 2023 18:35:39 +0800 Subject: [PATCH 5/5] Refine the usage of TreeNodeRewriter --- .../core/src/physical_optimizer/pruning.rs | 25 ++++------------ .../file_format/parquet/row_filter.rs | 3 ++ datafusion/physical-expr/src/utils.rs | 29 ++++--------------- 3 files changed, 15 insertions(+), 42 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 80b72e68f6ea..9185bf04df82 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -46,7 +46,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{downcast_value, ScalarValue}; -use datafusion_physical_expr::rewrite::{TreeNodeRewritable, TreeNodeRewriter}; +use datafusion_physical_expr::rewrite::TreeNodeRewritable; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; use log::trace; @@ -643,28 +643,15 @@ fn rewrite_column_expr( column_old: &phys_expr::Column, column_new: &phys_expr::Column, ) -> Result> { - let mut rewriter = RewriteColumnExpr { - column_old, - column_new, - }; - e.transform_using(&mut rewriter) -} - -struct RewriteColumnExpr<'a> { - column_old: &'a phys_expr::Column, - column_new: &'a phys_expr::Column, -} - -impl<'a> TreeNodeRewriter> for RewriteColumnExpr<'a> { - fn mutate(&mut self, expr: Arc) -> Result> { + e.transform(&|expr| { if let Some(column) = expr.as_any().downcast_ref::() { - if column == self.column_old { - return Ok(Arc::new(self.column_new.clone())); + if column == column_old { + return Ok(Some(Arc::new(column_new.clone()))); } } - Ok(expr) - } + Ok(None) + }) } fn reverse_operator(op: Operator) -> Result { diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs index e1feafec1588..54478edc73c5 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs @@ -219,13 +219,16 @@ impl<'a> TreeNodeRewriter> for FilterCandidateBuilder<'a> if DataType::is_nested(self.file_schema.field(idx).data_type()) { self.non_primitive_columns = true; + return Ok(RewriteRecursion::Stop); } } else if self.table_schema.index_of(column.name()).is_err() { // If the column does not exist in the (un-projected) table schema then // it must be a projected column. self.projected_columns = true; + return Ok(RewriteRecursion::Stop); } } + Ok(RewriteRecursion::Continue) } diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index a80a92bc5a5a..5b357d931821 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -377,35 +377,18 @@ pub fn reassign_predicate_columns( schema: &SchemaRef, ignore_not_found: bool, ) -> Result, DataFusionError> { - let mut rewriter = ColumnAssigner { - schema, - ignore_not_found, - }; - pred.transform_using(&mut rewriter) -} - -#[derive(Debug)] -struct ColumnAssigner<'a> { - schema: &'a SchemaRef, - ignore_not_found: bool, -} - -impl<'a> TreeNodeRewriter> for ColumnAssigner<'a> { - fn mutate( - &mut self, - expr: Arc, - ) -> Result, DataFusionError> { + pred.transform(&|expr| { if let Some(column) = expr.as_any().downcast_ref::() { - let index = match self.schema.index_of(column.name()) { + let index = match schema.index_of(column.name()) { Ok(idx) => idx, - Err(_) if self.ignore_not_found => usize::MAX, + Err(_) if ignore_not_found => usize::MAX, Err(e) => return Err(e.into()), }; - return Ok(Arc::new(Column::new(column.name(), index))); + return Ok(Some(Arc::new(Column::new(column.name(), index)))); } - Ok(expr) - } + Ok(None) + }) } #[cfg(test)]