diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 53e94167d845..ba18e9f62c25 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -620,6 +620,7 @@ mod tests { use super::*; use crate::datasource::file_format::parquet::test_util::store_parquet; + use crate::physical_plan::file_format::get_scan_files; use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; @@ -1215,6 +1216,25 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_scan_files() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let projection = Some(vec![9]); + let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; + let scan_files = get_scan_files(exec)?; + assert_eq!(scan_files.len(), 1); + assert_eq!(scan_files[0].len(), 1); + assert_eq!(scan_files[0][0].len(), 1); + assert!(scan_files[0][0][0] + .object_meta + .location + .to_string() + .contains("alltypes_plain.parquet")); + + Ok(()) + } + fn check_page_index_validation( page_index: Option<&ParquetColumnIndex>, offset_index: Option<&ParquetOffsetIndex>, diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index a1566c37cd2d..9fd29cf89674 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -24,7 +24,7 @@ use crate::{ physical_optimizer::PhysicalOptimizerRule, physical_plan::{ coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, - repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning, + repartition::RepartitionExec, tree_node::TreeNodeRewritable, Partitioning, }, }; use std::sync::Arc; diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index 919273af7467..95d4427e6dc4 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -28,8 +28,8 @@ use crate::physical_plan::joins::{ }; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortOptions; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index 81b4b59e3a14..647558fbff12 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -22,9 +22,9 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; -use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::ExecutionPlan; /// Currently for a sort operator, if diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index d9787881f161..2308b2c85dda 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -32,7 +32,7 @@ use crate::physical_plan::{ExecutionPlan, PhysicalExpr}; use super::optimizer::PhysicalOptimizerRule; use crate::error::Result; -use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::tree_node::TreeNodeRewritable; /// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make /// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 8a6b0e003adf..f097196cd1fc 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -22,7 +22,7 @@ use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; -use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use std::sync::Arc; diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs index 7e85f0c0d5f7..7532914c1258 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs @@ -34,7 +34,7 @@ use crate::physical_plan::joins::{ convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode, SymmetricHashJoinExec, }; -use crate::physical_plan::rewrite::TreeNodeRewritable; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::ExecutionPlan; use datafusion_common::DataFusionError; use datafusion_expr::logical_plan::JoinType; diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 80b72e68f6ea..9185bf04df82 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -46,7 +46,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{downcast_value, ScalarValue}; -use datafusion_physical_expr::rewrite::{TreeNodeRewritable, TreeNodeRewriter}; +use datafusion_physical_expr::rewrite::TreeNodeRewritable; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; use log::trace; @@ -643,28 +643,15 @@ fn rewrite_column_expr( column_old: &phys_expr::Column, column_new: &phys_expr::Column, ) -> Result> { - let mut rewriter = RewriteColumnExpr { - column_old, - column_new, - }; - e.transform_using(&mut rewriter) -} - -struct RewriteColumnExpr<'a> { - column_old: &'a phys_expr::Column, - column_new: &'a phys_expr::Column, -} - -impl<'a> TreeNodeRewriter> for RewriteColumnExpr<'a> { - fn mutate(&mut self, expr: Arc) -> Result> { + e.transform(&|expr| { if let Some(column) = expr.as_any().downcast_ref::() { - if column == self.column_old { - return Ok(Arc::new(self.column_new.clone())); + if column == column_old { + return Ok(Some(Arc::new(column_new.clone()))); } } - Ok(expr) - } + Ok(None) + }) } fn reverse_operator(op: Operator) -> Result { diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 70880b750552..261c19600c81 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -39,9 +39,9 @@ use crate::physical_optimizer::utils::add_sort_above; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::tree_node::TreeNodeRewritable; use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 6f26080122c6..146227335c99 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -73,6 +73,11 @@ impl NdJsonExec { file_compression_type, } } + + /// Ref to the base configs + pub fn base_config(&self) -> &FileScanConfig { + &self.base_config + } } impl ExecutionPlan for NdJsonExec { diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 97b091cedfef..d7616a3c2d79 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -45,6 +45,10 @@ use crate::datasource::{ listing::{FileRange, PartitionedFile}, object_store::ObjectStoreUrl, }; +use crate::physical_plan::tree_node::{ + TreeNodeVisitable, TreeNodeVisitor, VisitRecursion, +}; +use crate::physical_plan::ExecutionPlan; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -68,6 +72,50 @@ pub fn partition_type_wrap(val_type: DataType) -> DataType { DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type)) } +/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`] +pub fn get_scan_files( + plan: Arc, +) -> Result>>> { + let mut collector = FileScanCollector::new(); + plan.accept(&mut collector)?; + Ok(collector.file_groups) +} + +struct FileScanCollector { + file_groups: Vec>>, +} + +impl FileScanCollector { + fn new() -> Self { + Self { + file_groups: vec![], + } + } +} + +impl TreeNodeVisitor for FileScanCollector { + type N = Arc; + + fn pre_visit(&mut self, node: &Self::N) -> Result { + let plan_any = node.as_any(); + let file_groups = + if let Some(parquet_exec) = plan_any.downcast_ref::() { + parquet_exec.base_config().file_groups.clone() + } else if let Some(avro_exec) = plan_any.downcast_ref::() { + avro_exec.base_config().file_groups.clone() + } else if let Some(json_exec) = plan_any.downcast_ref::() { + json_exec.base_config().file_groups.clone() + } else if let Some(csv_exec) = plan_any.downcast_ref::() { + csv_exec.base_config().file_groups.clone() + } else { + return Ok(VisitRecursion::Continue); + }; + + self.file_groups.push(file_groups); + Ok(VisitRecursion::Stop) + } +} + /// The base configurations to provide when creating a physical plan for /// any given file format. #[derive(Debug, Clone)] diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs index e1feafec1588..54478edc73c5 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs @@ -219,13 +219,16 @@ impl<'a> TreeNodeRewriter> for FilterCandidateBuilder<'a> if DataType::is_nested(self.file_schema.field(idx).data_type()) { self.non_primitive_columns = true; + return Ok(RewriteRecursion::Stop); } } else if self.table_schema.index_of(column.name()).is_err() { // If the column does not exist in the (un-projected) table schema then // it must be a projected column. self.projected_columns = true; + return Ok(RewriteRecursion::Stop); } } + Ok(RewriteRecursion::Continue) } diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index dbd1024ae482..9e0e03a77d04 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -653,10 +653,10 @@ pub mod metrics; pub mod planner; pub mod projection; pub mod repartition; -pub mod rewrite; pub mod sorts; pub mod stream; pub mod streaming; +pub mod tree_node; pub mod udaf; pub mod union; pub mod unnest; diff --git a/datafusion/core/src/physical_plan/rewrite.rs b/datafusion/core/src/physical_plan/tree_node/mod.rs similarity index 66% rename from datafusion/core/src/physical_plan/rewrite.rs rename to datafusion/core/src/physical_plan/tree_node/mod.rs index 2972b546bb0e..327d938d4ec4 100644 --- a/datafusion/core/src/physical_plan/rewrite.rs +++ b/datafusion/core/src/physical_plan/tree_node/mod.rs @@ -15,16 +15,119 @@ // specific language governing permissions and limitations // under the License. -//! Trait to make Executionplan rewritable +//! This module provides common traits for visiting or rewriting tree nodes easily. + +pub mod rewritable; +pub mod visitable; -use crate::physical_plan::with_new_children_if_necessary; -use crate::physical_plan::ExecutionPlan; use datafusion_common::Result; -use std::sync::Arc; +/// Implements the [visitor +/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively walking [`TreeNodeVisitable`]s. +/// +/// [`TreeNodeVisitor`] allows keeping the algorithms +/// separate from the code to traverse the structure of the `TreeNodeVisitable` +/// tree and makes it easier to add new types of tree node and +/// algorithms by. +/// +/// When passed to[`TreeNodeVisitable::accept`], [`TreeNodeVisitor::pre_visit`] +/// and [`TreeNodeVisitor::post_visit`] are invoked recursively +/// on an node tree. +/// +/// If an [`Err`] result is returned, recursion is stopped +/// immediately. +/// +/// If [`Recursion::Stop`] is returned on a call to pre_visit, no +/// children of that tree node are visited, nor is post_visit +/// called on that tree node +pub trait TreeNodeVisitor: Sized { + /// The node type which is visitable. + type N: TreeNodeVisitable; + + /// Invoked before any children of `node` are visited. + fn pre_visit(&mut self, node: &Self::N) -> Result; + + /// Invoked after all children of `node` are visited. Default + /// implementation does nothing. + fn post_visit(&mut self, _node: &Self::N) -> Result<()> { + Ok(()) + } +} + +/// Trait for types that can be visited by [`TreeNodeVisitor`] +pub trait TreeNodeVisitable: Sized { + /// Return the children of this tree node + fn get_children(&self) -> Vec; + + /// Accept a visitor, calling `visit` on all children of this + fn accept>(&self, visitor: &mut V) -> Result<()> { + match visitor.pre_visit(self)? { + VisitRecursion::Continue => {} + // If the recursion should stop, do not visit children + VisitRecursion::Stop => return Ok(()), + }; + + for child in self.get_children() { + child.accept(visitor)?; + } + + visitor.post_visit(self) + } +} + +/// Controls how the visitor recursion should proceed. +pub enum VisitRecursion { + /// Attempt to visit all the children, recursively. + Continue, + /// Do not visit the children of this tree node, though the walk + /// of parents of this tree node will not be affected + Stop, +} -/// a Trait for marking tree node types that are rewritable +/// Trait for marking tree node as rewritable pub trait TreeNodeRewritable: Clone { + /// Convenience utils for writing optimizers rule: recursively apply the given `op` to the node tree. + /// When `op` does not apply to a given node, it is left unchanged. + /// The default tree traversal direction is transform_up(Postorder Traversal). + fn transform(self, op: &F) -> Result + where + F: Fn(Self) -> Result>, + { + self.transform_up(op) + } + + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its + /// children(Preorder Traversal). + /// When the `op` does not apply to a given node, it is left unchanged. + fn transform_down(self, op: &F) -> Result + where + F: Fn(Self) -> Result>, + { + let node_cloned = self.clone(); + let after_op = match op(node_cloned)? { + Some(value) => value, + None => self, + }; + after_op.map_children(|node| node.transform_down(op)) + } + + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its + /// children and then itself(Postorder Traversal). + /// When the `op` does not apply to a given node, it is left unchanged. + fn transform_up(self, op: &F) -> Result + where + F: Fn(Self) -> Result>, + { + let after_op_children = self.map_children(|node| node.transform_up(op))?; + + let after_op_children_clone = after_op_children.clone(); + let new_node = match op(after_op_children)? { + Some(value) => value, + None => after_op_children_clone, + }; + Ok(new_node) + } + /// Transform the tree node using the given [TreeNodeRewriter] /// It performs a depth first walk of an node and its children. /// @@ -51,7 +154,7 @@ pub trait TreeNodeRewritable: Clone { /// children of that node are visited, nor is mutate /// called on that node /// - fn transform_using>( + fn transform_using>( self, rewriter: &mut R, ) -> Result { @@ -73,48 +176,6 @@ pub trait TreeNodeRewritable: Clone { } } - /// Convenience utils for writing optimizers rule: recursively apply the given `op` to the node tree. - /// When `op` does not apply to a given node, it is left unchanged. - /// The default tree traversal direction is transform_up(Postorder Traversal). - fn transform(self, op: &F) -> Result - where - F: Fn(Self) -> Result>, - { - self.transform_up(op) - } - - /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its - /// children(Preorder Traversal). - /// When the `op` does not apply to a given node, it is left unchanged. - fn transform_down(self, op: &F) -> Result - where - F: Fn(Self) -> Result>, - { - let node_cloned = self.clone(); - let after_op = match op(node_cloned)? { - Some(value) => value, - None => self, - }; - after_op.map_children(|node| node.transform_down(op)) - } - - /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its - /// children and then itself(Postorder Traversal). - /// When the `op` does not apply to a given node, it is left unchanged. - fn transform_up(self, op: &F) -> Result - where - F: Fn(Self) -> Result>, - { - let after_op_children = self.map_children(|node| node.transform_up(op))?; - - let after_op_children_clone = after_op_children.clone(); - let new_node = match op(after_op_children)? { - Some(value) => value, - None => after_op_children_clone, - }; - Ok(new_node) - } - /// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder) fn map_children(self, transform: F) -> Result where @@ -124,16 +185,19 @@ pub trait TreeNodeRewritable: Clone { /// Trait for potentially recursively transform an [`TreeNodeRewritable`] node /// tree. When passed to `TreeNodeRewritable::transform_using`, `TreeNodeRewriter::mutate` is /// invoked recursively on all nodes of a tree. -pub trait TreeNodeRewriter: Sized { +pub trait TreeNodeRewriter: Sized { + /// The node type which is rewritable. + type N: TreeNodeRewritable; + /// Invoked before (Preorder) any children of `node` are rewritten / /// visited. Default implementation returns `Ok(RewriteRecursion::Continue)` - fn pre_visit(&mut self, _node: &N) -> Result { + fn pre_visit(&mut self, _node: &Self::N) -> Result { Ok(RewriteRecursion::Continue) } /// Invoked after (Postorder) all children of `node` have been mutated and /// returns a potentially modified node. - fn mutate(&mut self, node: N) -> Result; + fn mutate(&mut self, node: Self::N) -> Result; } /// Controls how the [TreeNodeRewriter] recursion should proceed. @@ -148,19 +212,3 @@ pub enum RewriteRecursion { /// Keep recursive but skip apply op on this node Skip, } - -impl TreeNodeRewritable for Arc { - fn map_children(self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - let children = self.children(); - if !children.is_empty() { - let new_children: Result> = - children.into_iter().map(transform).collect(); - with_new_children_if_necessary(self, new_children?) - } else { - Ok(self) - } - } -} diff --git a/datafusion/core/src/physical_plan/tree_node/rewritable.rs b/datafusion/core/src/physical_plan/tree_node/rewritable.rs new file mode 100644 index 000000000000..004fc47fd7ac --- /dev/null +++ b/datafusion/core/src/physical_plan/tree_node/rewritable.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tree node rewritable implementations + +use crate::physical_plan::tree_node::TreeNodeRewritable; +use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use datafusion_common::Result; +use std::sync::Arc; + +impl TreeNodeRewritable for Arc { + fn map_children(self, transform: F) -> Result + where + F: FnMut(Self) -> Result, + { + let children = self.children(); + if !children.is_empty() { + let new_children: Result> = + children.into_iter().map(transform).collect(); + with_new_children_if_necessary(self, new_children?) + } else { + Ok(self) + } + } +} diff --git a/datafusion/core/src/physical_plan/tree_node/visitable.rs b/datafusion/core/src/physical_plan/tree_node/visitable.rs new file mode 100644 index 000000000000..935c8adb7ea7 --- /dev/null +++ b/datafusion/core/src/physical_plan/tree_node/visitable.rs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tree node visitable implementations + +use crate::physical_plan::tree_node::TreeNodeVisitable; +use crate::physical_plan::ExecutionPlan; +use std::sync::Arc; + +impl TreeNodeVisitable for Arc { + fn get_children(&self) -> Vec { + self.children() + } +} diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index a80a92bc5a5a..5b357d931821 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -377,35 +377,18 @@ pub fn reassign_predicate_columns( schema: &SchemaRef, ignore_not_found: bool, ) -> Result, DataFusionError> { - let mut rewriter = ColumnAssigner { - schema, - ignore_not_found, - }; - pred.transform_using(&mut rewriter) -} - -#[derive(Debug)] -struct ColumnAssigner<'a> { - schema: &'a SchemaRef, - ignore_not_found: bool, -} - -impl<'a> TreeNodeRewriter> for ColumnAssigner<'a> { - fn mutate( - &mut self, - expr: Arc, - ) -> Result, DataFusionError> { + pred.transform(&|expr| { if let Some(column) = expr.as_any().downcast_ref::() { - let index = match self.schema.index_of(column.name()) { + let index = match schema.index_of(column.name()) { Ok(idx) => idx, - Err(_) if self.ignore_not_found => usize::MAX, + Err(_) if ignore_not_found => usize::MAX, Err(e) => return Err(e.into()), }; - return Ok(Arc::new(Column::new(column.name(), index))); + return Ok(Some(Arc::new(Column::new(column.name(), index)))); } - Ok(expr) - } + Ok(None) + }) } #[cfg(test)]