diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 701a886d2a140..937452a286b90 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -26,7 +26,6 @@ use async_trait::async_trait; use datafusion::arrow::array::{UInt8Builder, UInt64Builder}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::datasource::{TableProvider, TableType, provider_as_source}; use datafusion::error::Result; use datafusion::execution::context::TaskContext; @@ -275,20 +274,4 @@ impl ExecutionPlan for CustomExec { None, )?)) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } diff --git a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs index dc374c7e02fe5..eab813b7eedbd 100644 --- a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs @@ -29,7 +29,6 @@ use arrow::record_batch::RecordBatch; use arrow_schema::SchemaRef; use datafusion::common::record_batch; -use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{exec_datafusion_err, internal_err}; use datafusion::datasource::{DefaultTableSource, memory::MemTable}; use datafusion::error::Result; @@ -292,20 +291,4 @@ impl ExecutionPlan for BufferingExecutionPlan { }), ))) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index ae9503dd87b19..9a4a76749433a 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -37,7 +37,6 @@ use std::sync::Arc; use datafusion::common::Result; use datafusion::common::internal_err; -use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::execution::TaskContext; use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; use datafusion::prelude::SessionContext; @@ -124,15 +123,6 @@ impl ExecutionPlan for ParentExec { ) -> Result { unreachable!() } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } /// A PhysicalExtensionCodec that can serialize and deserialize ParentExec @@ -205,15 +195,6 @@ impl ExecutionPlan for ChildExec { ) -> Result { unreachable!() } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } /// A PhysicalExtensionCodec that can serialize and deserialize ChildExec diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 42342e5f1a641..46826216e28da 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -116,7 +116,7 @@ use datafusion::{ }; use datafusion_common::{ DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err, - plan_datafusion_err, plan_err, tree_node::TreeNodeRecursion, + plan_datafusion_err, plan_err, }; use datafusion_expr::{ UserDefinedLogicalNode, UserDefinedLogicalNodeCore, @@ -738,22 +738,6 @@ impl ExecutionPlan for SampleExec { Ok(Arc::new(stats)) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } /// Bernoulli sampler: includes each row with probability `(upper - lower)`. diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 8102c15079658..bbc962d9acabf 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -31,7 +31,6 @@ use arrow::compute::{and, filter_record_batch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::error::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::memory::{MemSink, MemorySourceConfig}; @@ -40,13 +39,13 @@ use datafusion_datasource::source::DataSourceExec; use datafusion_expr::dml::InsertOp; use datafusion_expr::{Expr, SortExpr, TableType}; use datafusion_physical_expr::{ - LexOrdering, create_physical_expr, create_physical_sort_exprs, + LexOrdering, PhysicalExpr, create_physical_expr, create_physical_sort_exprs, }; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, - PhysicalExpr, PlanProperties, common, + PlanProperties, common, }; use datafusion_session::Session; @@ -627,11 +626,4 @@ impl ExecutionPlan for DmlResultExec { stream, ))) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d225cff1deafc..ee97309c27aae 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -4475,20 +4475,6 @@ mod tests { ) -> Result { unimplemented!("NoOpExecutionPlan::execute"); } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } // Produces an execution plan where the schema is mismatched from @@ -4628,12 +4614,6 @@ digraph { ) -> Result { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } impl DisplayAs for OkExtensionNode { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { @@ -4680,12 +4660,6 @@ digraph { ) -> Result { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } impl DisplayAs for InvariantFailsExtensionNode { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { @@ -4804,12 +4778,6 @@ digraph { ) -> Result { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } impl DisplayAs for ExecutableInvariantFails { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index cef75b444f6fe..06b3701cbe6d6 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -38,7 +38,6 @@ use datafusion_catalog::Session; use datafusion_common::cast::as_primitive_array; use datafusion_common::project_schema; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::PlanProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -205,22 +204,6 @@ impl ExecutionPlan for CustomExecutionPlan { .collect(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[async_trait] diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index e52c559ec79ef..18695accd0f2e 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -35,7 +35,6 @@ use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use datafusion_catalog::Session; use datafusion_common::cast::as_primitive_array; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, internal_err, not_impl_err}; use datafusion_expr::expr::{BinaryExpr, Cast}; use datafusion_functions_aggregate::expr_fn::count; @@ -149,22 +148,6 @@ impl ExecutionPlan for CustomPlan { })), ))) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[derive(Clone, Debug)] diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 01c4deac5ccd3..14406c2316da0 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -33,7 +33,6 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_catalog::Session; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -181,22 +180,6 @@ impl ExecutionPlan for StatisticsValidation { Ok(Arc::new(self.stats.clone())) } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } fn init_ctx(stats: Statistics, schema: Schema) -> Result { diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs b/datafusion/core/tests/fuzz_cases/once_exec.rs index 403e377a690e2..9b57141061518 100644 --- a/datafusion/core/tests/fuzz_cases/once_exec.rs +++ b/datafusion/core/tests/fuzz_cases/once_exec.rs @@ -17,7 +17,6 @@ use arrow_schema::SchemaRef; use datafusion_common::internal_datafusion_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -106,20 +105,4 @@ impl ExecutionPlan for OnceExec { stream.ok_or_else(|| internal_datafusion_err!("Stream already consumed")) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> datafusion_common::Result, - ) -> datafusion_common::Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 78bb02ab1108b..12abf79041091 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -40,9 +40,7 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::ScalarValue; use datafusion_common::config::CsvOptions; use datafusion_common::error::Result; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; @@ -203,20 +201,6 @@ impl ExecutionPlan for SortRequiredExec { ))) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn execute( &self, _partition: usize, @@ -300,13 +284,6 @@ impl ExecutionPlan for SinglePartitionMaintainsOrderExec { Ok(Arc::new(Self::new(child))) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index f56b8c6d70624..b420326596d0d 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -2942,111 +2942,6 @@ async fn test_filter_with_projection_pushdown() { assert_batches_eq!(expected, &result); } -/// Test that ExecutionPlan::apply_expressions() can discover dynamic filters across the plan tree. -/// -/// Not portable to sqllogictest: asserts by walking the plan tree with -/// `apply_expressions` + `downcast_ref::` and -/// counting nodes. Neither API is observable from SQL. -#[tokio::test] -async fn test_discover_dynamic_filters_via_expressions_api() { - use datafusion_common::JoinType; - use datafusion_common::tree_node::TreeNodeRecursion; - use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; - use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - - fn count_dynamic_filters(plan: &Arc) -> usize { - let mut count = 0; - - // Check expressions from this node using apply_expressions - let _ = plan.apply_expressions(&mut |expr| { - if let Some(_df) = expr.downcast_ref::() { - count += 1; - } - Ok(TreeNodeRecursion::Continue) - }); - - // Recursively visit children - for child in plan.children() { - count += count_dynamic_filters(child); - } - - count - } - - // Create build side (left) - let build_batches = - vec![record_batch!(("a", Utf8, ["foo", "bar"]), ("b", Int32, [1, 2])).unwrap()]; - let build_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Int32, false), - ])); - let build_scan = TestScanBuilder::new(build_schema.clone()) - .with_support(true) - .with_batches(build_batches) - .build(); - - // Create probe side (right) - let probe_batches = vec![ - record_batch!( - ("a", Utf8, ["foo", "bar", "baz", "qux"]), - ("c", Float64, [1.0, 2.0, 3.0, 4.0]) - ) - .unwrap(), - ]; - let probe_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("c", DataType::Float64, false), - ])); - let probe_scan = TestScanBuilder::new(probe_schema.clone()) - .with_support(true) - .with_batches(probe_batches) - .build(); - - // Create HashJoinExec - let plan = Arc::new( - HashJoinExec::try_new( - build_scan, - probe_scan, - vec![( - col("a", &build_schema).unwrap(), - col("a", &probe_schema).unwrap(), - )], - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - datafusion_common::NullEquality::NullEqualsNothing, - false, - ) - .unwrap(), - ) as Arc; - - // Before optimization: no dynamic filters - let count_before = count_dynamic_filters(&plan); - assert_eq!( - count_before, 0, - "Before optimization, should have no dynamic filters" - ); - - // Apply filter pushdown optimization (this creates dynamic filters) - let mut config = ConfigOptions::default(); - config.optimizer.enable_dynamic_filter_pushdown = true; - config.execution.parquet.pushdown_filters = true; - let optimized_plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) - .unwrap(); - - // After optimization: should discover dynamic filters - // We expect 2 dynamic filters: - // 1. In the HashJoinExec (producer) - // 2. In the DataSourceExec (consumer, pushed down to the probe side) - let count_after = count_dynamic_filters(&optimized_plan); - assert_eq!( - count_after, 2, - "After optimization, should discover exactly 2 dynamic filters (1 in HashJoinExec, 1 in DataSourceExec), found {count_after}" - ); -} - // ==== Filter pushdown through SortExec tests ==== /// FilterExec above a plain SortExec (no fetch) should be pushed below it. diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 050baa9e792e9..29a2b59e5725d 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -25,7 +25,6 @@ use std::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ColumnStatistics, JoinType, ScalarValue, stats::Precision}; use datafusion_common::{JoinSide, NullEquality}; use datafusion_common::{Result, Statistics}; @@ -1059,20 +1058,6 @@ impl ExecutionPlan for UnboundedExec { batch: self.batch.clone(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[derive(Eq, PartialEq, Debug)] @@ -1174,20 +1159,6 @@ impl ExecutionPlan for StatisticsExec { self.stats.clone() })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[test] diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs index 8b659e757aa2a..61fd0a45952ba 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -18,7 +18,6 @@ use arrow::datatypes::SchemaRef; use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, config::ConfigOptions, internal_err}; use datafusion_datasource::{ PartitionedFile, file::FileSource, file_scan_config::FileScanConfig, @@ -235,25 +234,6 @@ impl FileSource for TestSource { fn table_schema(&self) -> &datafusion_datasource::TableSchema { &self.table_schema } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit predicate (filter) expression if present - if let Some(predicate) = &self.predicate { - f(predicate.as_ref())?; - } - - // Visit projection expressions if present - if let Some(projection) = &self.projection { - for proj_expr in projection { - f(proj_expr.expr.as_ref())?; - } - } - - Ok(TreeNodeRecursion::Continue) - } } #[derive(Debug, Clone)] @@ -569,13 +549,4 @@ impl ExecutionPlan for TestNode { Ok(res) } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit the predicate expression - f(self.predicate.as_ref())?; - Ok(TreeNodeRecursion::Continue) - } } diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 6814ab2358ffc..09225cb0385a7 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -30,9 +30,7 @@ use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; use datafusion_common::{ ColumnStatistics, JoinType, NullEquality, Result, Statistics, internal_err, @@ -489,20 +487,6 @@ impl ExecutionPlan for RequirementsTestExec { ) -> Result { unimplemented!("Test exec does not support execution") } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in required_input_ordering if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = &self.required_input_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } /// A [`PlanContext`] object is susceptible to being left in an inconsistent state after @@ -1035,28 +1019,6 @@ impl ExecutionPlan for TestScan { }) } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in output_ordering - let mut tnr = TreeNodeRecursion::Continue; - for ordering in &self.output_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - - // Visit expressions in requested_ordering if present - if let Some(ordering) = &self.requested_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - - Ok(tnr) - } } /// Helper function to create a TestScan with ordering diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 326c767d97610..f3d3f70bdf925 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -25,7 +25,6 @@ use datafusion::{ }; use datafusion_catalog::{Session, TableProvider}; use datafusion_common::config::Dialect; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_expr::{Expr, TableType, dml::InsertOp}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_plan::execution_plan::SchedulingType; @@ -177,22 +176,6 @@ impl ExecutionPlan for TestInsertExec { ) -> Result { unimplemented!("TestInsertExec is a stub for testing.") } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.plan_properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } fn make_count_schema() -> SchemaRef { diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 505468a19cd37..e8ff6758ccdd4 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -90,9 +90,7 @@ use datafusion::{ prelude::{SessionConfig, SessionContext}, }; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ScalarValue, assert_eq_or_internal_err, assert_or_internal_err}; use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr}; use datafusion_optimizer::AnalyzerRule; @@ -744,22 +742,6 @@ impl ExecutionPlan for TopKExec { state: BTreeMap::new(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } // A very specialized TopK implementation diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 061f130f24131..59c020c779ca2 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -40,7 +40,6 @@ use arrow::buffer::Buffer; use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader}; use datafusion_common::error::Result; use datafusion_common::exec_datafusion_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_datasource::PartitionedFile; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -393,20 +392,6 @@ impl FileSource for ArrowSource { fn projection(&self) -> Option<&ProjectionExprs> { Some(&self.projection.source) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } /// `FileOpener` wrapper for both Arrow IPC file and stream formats diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index b80d4f462e425..e3be9d8a401d0 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; use arrow_avro::reader::{Reader, ReaderBuilder}; use datafusion_common::error::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -169,20 +168,6 @@ impl FileSource for AvroSource { // Avro OCF does not support safe byte-range splitting in this reader path. false } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } mod private { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 611586cee6473..638279f827344 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -34,7 +34,6 @@ use datafusion_datasource::{ use arrow::csv; use datafusion_common::config::CsvOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::file::FileSource; @@ -310,20 +309,6 @@ impl FileSource for CsvSource { DisplayFormatType::TreeRender => Ok(()), } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } impl FileOpener for CsvOpener { diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 2f2f459956f4e..179870673d426 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -29,7 +29,6 @@ use crate::boundary_stream::AlignedBoundaryStream; use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::exec_datafusion_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; @@ -233,20 +232,6 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } impl FileOpener for JsonOpener { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 2b367cf7600d5..2e2d0be0da507 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -36,7 +36,6 @@ use arrow::array::timezone::Tz; use arrow::datatypes::TimeUnit; use datafusion_common::DataFusionError; use datafusion_common::config::TableParquetOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -961,26 +960,6 @@ impl FileSource for ParquetSource { inner: Arc::new(new_source) as Arc, }) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn PhysicalExpr, - ) -> datafusion_common::Result, - ) -> datafusion_common::Result { - // Visit predicate (filter) expression if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(predicate) = &self.predicate { - tnr = tnr.visit_sibling(|| f(predicate.as_ref()))?; - } - - // Visit projection expressions - for proj_expr in &self.projection { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - - Ok(tnr) - } } #[cfg(test)] diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 32bee63b54f23..07460b23694b7 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -29,7 +29,6 @@ use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, not_impl_err}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; @@ -352,27 +351,6 @@ pub trait FileSource: Any + Send + Sync { fn schema_adapter_factory(&self) -> Option> { None } - - /// Apply a function to all physical expressions used by this file source. - /// - /// This includes: - /// - Filter predicates (which may contain dynamic filters) - /// - Projection expressions - /// - /// The function `f` is called once for each expression. The function should - /// return `TreeNodeRecursion::Continue` to continue visiting other expressions, - /// or `TreeNodeRecursion::Stop` to stop visiting expressions early. - /// - /// Implementations must explicitly visit all expressions. There is no default - /// implementation to ensure that all FileSource implementations handle this correctly. - /// - /// See [`ExecutionPlan::apply_expressions`] for more details and examples. - /// - /// [`ExecutionPlan::apply_expressions`]: datafusion_physical_plan::ExecutionPlan::apply_expressions - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result; } impl dyn FileSource { diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..e1fd10324373d 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -30,7 +30,6 @@ use crate::{ use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err, }; @@ -82,9 +81,7 @@ use std::{fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; /// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef}; /// # use object_store::ObjectStore; /// # use datafusion_common::Result; -/// # use datafusion_common::tree_node::TreeNodeRecursion; /// # use datafusion_datasource::file::FileSource; -/// # use datafusion_physical_plan::PhysicalExpr; /// # use datafusion_datasource::file_groups::FileGroup; /// # use datafusion_datasource::PartitionedFile; /// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; @@ -114,7 +111,6 @@ use std::{fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; /// # fn file_type(&self) -> &str { "parquet" } /// # // Note that this implementation drops the projection on the floor, it is not complete! /// # fn try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result>> { Ok(Some(Arc::new(self.clone()) as Arc)) } -/// # fn apply_expressions(&self, _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result) -> Result { Ok(TreeNodeRecursion::Continue) } /// # } /// # impl ParquetSource { /// # fn new(table_schema: impl Into) -> Self { Self {table_schema: table_schema.into()} } @@ -999,14 +995,6 @@ impl DataSource for FileScanConfig { Some(Arc::new(new_config)) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Delegate to the file source - self.file_source.apply_expressions(f) - } - /// Create any shared state that should be passed between sibling streams /// during one execution. /// @@ -1408,11 +1396,9 @@ mod tests { use arrow::datatypes::Field; use datafusion_common::ColumnStatistics; use datafusion_common::stats::Precision; - use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_batches_eq, internal_err}; use datafusion_execution::TaskContext; use datafusion_expr::SortExpr; - use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExpr; @@ -1475,13 +1461,6 @@ mod tests { inner: Arc::new(self.clone()) as Arc, }) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } #[test] @@ -2624,13 +2603,6 @@ mod tests { inner: Arc::new(self.clone()) as Arc, }) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } #[test] diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 7c9281dcc2f26..f073b09c5463e 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -28,7 +28,6 @@ use crate::source::{DataSource, DataSourceExec}; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, ScalarValue, assert_or_internal_err, plan_err, project_schema, }; @@ -257,20 +256,6 @@ impl DataSource for MemorySourceConfig { }) .transpose() } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in sort_information - let mut tnr = TreeNodeRecursion::Continue; - for ordering in &self.sort_information { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } impl MemorySourceConfig { diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 2a1f5c4a2fd02..e3df1ad6381f4 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -24,10 +24,9 @@ use std::sync::Arc; use arrow::array::{ArrayRef, RecordBatch, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::{Distribution, EquivalenceProperties}; use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements}; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -224,19 +223,6 @@ impl ExecutionPlan for DataSinkExec { ))) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to sort order requirements if present - if let Some(sort_order) = &self.sort_order { - for req in sort_order.iter() { - f(req.expr.as_ref())?; - } - } - Ok(TreeNodeRecursion::Continue) - } - /// Execute the plan and return a stream of `RecordBatch`es for /// the specified partition. fn execute( diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 420c6b508ce4f..af4bc09504937 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -40,7 +40,6 @@ use itertools::Itertools; use crate::file::FileSource; use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; @@ -225,22 +224,6 @@ pub trait DataSource: Any + Send + Sync + Debug { None } - /// Apply a closure to each expression used by this data source. - /// - /// This includes filter predicates (which may contain dynamic filters) and any - /// other expressions used during data scanning. - /// - /// Implementations must override this method. If the data source has no expressions, - /// return `Ok(TreeNodeRecursion::Continue)` immediately. - /// - /// See [`ExecutionPlan::apply_expressions`] for more details and implementation examples. - /// - /// [`ExecutionPlan::apply_expressions`]: datafusion_physical_plan::ExecutionPlan::apply_expressions - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result; - /// Injects arbitrary run-time state into this DataSource, returning a new instance /// that incorporates that state *if* it is relevant to the concrete DataSource implementation. /// @@ -368,14 +351,6 @@ impl ExecutionPlan for DataSourceExec { Vec::new() } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Delegate to the underlying data source - self.data_source.apply_expressions(f) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index b59ce58a420a8..d211319629878 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -22,7 +22,7 @@ use crate::{ use std::sync::Arc; use arrow::datatypes::Schema; -use datafusion_common::{Result, tree_node::TreeNodeRecursion}; +use datafusion_common::Result; use datafusion_physical_expr::{PhysicalExpr, expressions::Column}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; @@ -125,13 +125,6 @@ impl FileSource for MockSource { ) -> Option<&datafusion_physical_plan::projection::ProjectionExprs> { Some(&self.projection.source) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } /// Create a column expression diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index ddad605081745..f942916ea19ff 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -20,7 +20,6 @@ use std::pin::Pin; use std::sync::Arc; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr_common::metrics::MetricsSet; @@ -439,22 +438,6 @@ impl ExecutionPlan for ForeignExecutionPlan { } } - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn repartitioned( &self, target_partitions: usize, @@ -581,22 +564,6 @@ pub mod tests { Statistics::new_unknown(self.props.eq_properties.schema()) }))) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.props.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[test] diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 011d3f0a0a343..69104709b477e 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -32,7 +32,6 @@ use arrow::array::RecordBatch; use arrow::datatypes::Schema; use async_trait::async_trait; use datafusion_catalog::TableProvider; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, exec_err}; use datafusion_execution::RecordBatchStream; use datafusion_expr::Expr; @@ -211,22 +210,6 @@ impl ExecutionPlan for AsyncTestExecutionPlan { batch_receiver: self.batch_receiver.resubscribe(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } impl datafusion_physical_plan::DisplayAs for AsyncTestExecutionPlan { diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index 102e21a4853a4..e7aacb2321b67 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -264,11 +264,10 @@ mod tests { // Test that cooperative context is reset when encountering an eager evaluation boundary. use arrow::datatypes::Schema; use datafusion_common::internal_err; - use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr, PlanProperties, + DisplayAs, DisplayFormatType, Partitioning, PlanProperties, SendableRecordBatchStream, execution_plan::{Boundedness, EmissionType}, }; @@ -346,13 +345,6 @@ mod tests { ) -> Result { internal_err!("DummyExec does not support execution") } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } // Build a plan similar to the original test: diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 81df6f943c15e..24eb3af5f564c 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -27,9 +27,7 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; use datafusion_execution::TaskContext; use datafusion_physical_expr::Distribution; @@ -294,36 +292,6 @@ impl ExecutionPlan for OutputRequirementExec { fn fetch(&self) -> Option { self.fetch } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in order_requirement - let mut tnr = TreeNodeRecursion::Continue; - if let Some(order_reqs) = &self.order_requirement { - let lexes = match order_reqs { - OrderingRequirements::Hard(alternatives) => alternatives, - OrderingRequirements::Soft(alternatives) => alternatives, - }; - for lex in lexes { - for sort_expr in lex { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - } - - // Visit expressions in dist_requirement if it's HashPartitioned - if let Distribution::HashPartitioned(exprs) = &self.dist_requirement { - for expr in exprs { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - - Ok(tnr) - } } impl PhysicalOptimizerRule for OutputRequirements { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d1498e4a3ea55..5e6b8505764a2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -45,7 +45,6 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::FieldRef; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraint, Constraints, Result, ScalarValue, assert_eq_or_internal_err, internal_err, not_impl_err, @@ -1579,36 +1578,6 @@ impl ExecutionPlan for AggregateExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to group by expressions - let mut tnr = TreeNodeRecursion::Continue; - for expr in self.group_by.input_exprs() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - - // Apply to aggregate expressions - for aggr in self.aggr_expr.iter() { - for expr in aggr.expressions() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - - // Apply to filter expressions (FILTER WHERE clauses) - for filter in self.filter_expr.iter().flatten() { - tnr = tnr.visit_sibling(|| f(filter.as_ref()))?; - } - - // Apply to dynamic filter expression if present - if let Some(dyn_filter) = &self.dynamic_filter { - tnr = tnr.visit_sibling(|| f(dyn_filter.filter.as_ref()))?; - } - - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, @@ -2764,13 +2733,6 @@ mod tests { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index ea3abf439e4c1..582af8f1e3dae 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -30,11 +30,9 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_expr::PhysicalExpr; use futures::StreamExt; @@ -149,13 +147,6 @@ impl ExecutionPlan for AnalyzeExec { vec![Distribution::UnspecifiedDistribution] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index 8ad4ecb096962..1b15bf27e78cc 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -24,8 +24,7 @@ use crate::{ }; use arrow::array::RecordBatch; use arrow_schema::{Fields, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::ScalarFunctionExpr; @@ -165,13 +164,6 @@ impl ExecutionPlan for AsyncFuncExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 0cc4a1d71814e..19a4ebba83eae 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -31,7 +31,6 @@ use crate::{ }; use arrow::array::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, Statistics, internal_err, plan_err}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -168,13 +167,6 @@ impl ExecutionPlan for BufferExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 34cd770260915..76b2f63798f88 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -33,7 +33,6 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -184,13 +183,6 @@ impl ExecutionPlan for CoalesceBatchesExec { vec![false] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 3399554612431..fa200ef845f3a 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -34,7 +34,6 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_proper use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -154,13 +153,6 @@ impl ExecutionPlan for CoalescePartitionsExec { vec![false] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index fe6a3bc3d5678..111999b71c91d 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -71,7 +71,6 @@ //! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_physical_expr::PhysicalExpr; #[cfg(datafusion_coop = "tokio_fallback")] use futures::Future; @@ -277,13 +276,6 @@ impl ExecutionPlan for CooperativeExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 756a68b1a958d..f7c6de3fc591a 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1167,11 +1167,8 @@ mod tests { use std::fmt::Write; use std::sync::Arc; - use datafusion_common::{ - Result, Statistics, internal_datafusion_err, tree_node::TreeNodeRecursion, - }; + use datafusion_common::{Result, Statistics, internal_datafusion_err}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; - use datafusion_physical_expr::PhysicalExpr; use crate::{DisplayAs, ExecutionPlan, PlanProperties}; @@ -1214,13 +1211,6 @@ mod tests { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _: usize, diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 8103695ad08fa..2e7f982a51a31 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -29,10 +29,9 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use crate::execution_plan::SchedulingType; use log::trace; @@ -119,13 +118,6 @@ impl ExecutionPlan for EmptyExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1a67ea0ded11b..b55d3c32cb569 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -27,9 +27,7 @@ pub use crate::stream::EmptyRecordBatchStream; use arrow_schema::Schema; pub use datafusion_common::hash_utils; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; pub use datafusion_common::utils::project_schema; pub use datafusion_common::{ColumnStatistics, Statistics, internal_err}; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -203,80 +201,6 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { /// joins). fn children(&self) -> Vec<&Arc>; - /// Apply a closure `f` to each expression (non-recursively) in the current - /// physical plan node. This does not include expressions in any children. - /// - /// The closure `f` is applied to expressions in the order they appear in the plan. - /// The closure can return `TreeNodeRecursion::Continue` to continue visiting, - /// `TreeNodeRecursion::Stop` to stop visiting immediately, or `TreeNodeRecursion::Jump` - /// to skip any remaining expressions (though typically all expressions are visited). - /// - /// The expressions visited do not necessarily represent or even contribute - /// to the output schema of this node. For example, `FilterExec` visits the - /// filter predicate even though the output of a Filter has the same columns - /// as the input. - /// - /// # Example Usage - /// ``` - /// # use std::sync::Arc; - /// # use datafusion_physical_plan::ExecutionPlan; - /// # use datafusion_common::tree_node::TreeNodeRecursion; - /// # fn example(plan: Arc) -> datafusion_common::Result<()> { - /// // Count the number of expressions - /// let mut count = 0; - /// plan.apply_expressions(&mut |_expr| { - /// count += 1; - /// Ok(TreeNodeRecursion::Continue) - /// })?; - /// # Ok(()) - /// # } - /// ``` - /// - /// # Implementation Examples - /// - /// ## Node with no expressions (e.g., EmptyExec, MemoryExec) - /// ```ignore - /// fn apply_expressions( - /// &self, - /// _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - /// ) -> Result { - /// Ok(TreeNodeRecursion::Continue) - /// } - /// ``` - /// - /// ## Node with a single expression (e.g., FilterExec) - /// ```ignore - /// fn apply_expressions( - /// &self, - /// f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - /// ) -> Result { - /// f(self.predicate.as_ref()) - /// } - /// ``` - /// - /// ## Node with multiple expressions (e.g., ProjectionExec, JoinExec) - /// - /// Use [`TreeNodeRecursion::visit_sibling`] when iterating over multiple - /// expressions. This correctly propagates [`TreeNodeRecursion::Stop`]: if - /// `f` returns `Stop` for an earlier expression, `visit_sibling` short-circuits - /// and skips the remaining ones. - /// ```ignore - /// fn apply_expressions( - /// &self, - /// f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - /// ) -> Result { - /// let mut tnr = TreeNodeRecursion::Continue; - /// for expr in &self.expressions { - /// tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - /// } - /// Ok(tnr) - /// } - /// ``` - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result; - /// Returns a new `ExecutionPlan` where all existing children were replaced /// by the `children`, in order fn with_new_children( @@ -1640,13 +1564,6 @@ mod tests { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, @@ -1702,13 +1619,6 @@ mod tests { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, @@ -1732,109 +1642,6 @@ mod tests { } } - /// A test node that holds a fixed list of expressions, used to test - /// `apply_expressions` behavior. - #[derive(Debug)] - struct MultiExprExec { - exprs: Vec>, - } - - impl DisplayAs for MultiExprExec { - fn fmt_as( - &self, - _t: DisplayFormatType, - _f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - unimplemented!() - } - } - - impl ExecutionPlan for MultiExprExec { - fn name(&self) -> &'static str { - "MultiExprExec" - } - - fn properties(&self) -> &Arc { - unimplemented!() - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - unimplemented!() - } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for expr in &self.exprs { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - Ok(tnr) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!() - } - - fn partition_statistics( - &self, - _partition: Option, - ) -> Result> { - unimplemented!() - } - } - - /// Returns a simple literal `Arc` for use in tests. - fn lit_expr(val: i64) -> Arc { - use datafusion_physical_expr::expressions::Literal; - Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some( - val, - )))) - } - - /// `apply_expressions` visits all expressions when `f` always returns `Continue`. - #[test] - fn test_apply_expressions_continue_visits_all() -> Result<()> { - let plan = MultiExprExec { - exprs: vec![lit_expr(1), lit_expr(2), lit_expr(3)], - }; - let mut visited = 0usize; - plan.apply_expressions(&mut |_expr| { - visited += 1; - Ok(TreeNodeRecursion::Continue) - })?; - assert_eq!(visited, 3); - Ok(()) - } - - #[test] - fn test_apply_expressions_stop_halts_early() -> Result<()> { - let plan = MultiExprExec { - exprs: vec![lit_expr(1), lit_expr(2), lit_expr(3)], - }; - let mut visited = 0usize; - let tnr = plan.apply_expressions(&mut |_expr| { - visited += 1; - Ok(TreeNodeRecursion::Stop) - })?; - // Only the first expression is visited; the rest are skipped. - assert_eq!(visited, 1); - assert_eq!(tnr, TreeNodeRecursion::Stop); - Ok(()) - } - #[test] fn test_execution_plan_name() { let schema1 = Arc::new(Schema::empty()); diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 617a1a6cdaf53..98eac3d28b5df 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -26,10 +26,9 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::display::StringifiedPlan; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use log::trace; @@ -117,13 +116,6 @@ impl ExecutionPlan for ExplainExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index c485e181f3826..b3b107dc580df 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -54,7 +54,6 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema, }; @@ -521,13 +520,6 @@ impl ExecutionPlan for FilterExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - f(self.predicate.as_ref()) - } - fn maintains_input_order(&self) -> Vec { // Tell optimizer this operator doesn't reorder its input vec![true] diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index ab66955dc6034..6661d2782b212 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -42,13 +42,11 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::concat_batches; use arrow::datatypes::{Fields, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ JoinType, Result, ScalarValue, assert_eq_or_internal_err, internal_err, }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::equivalence::join_equivalence_properties; use async_trait::async_trait; @@ -284,14 +282,6 @@ impl ExecutionPlan for CrossJoinExec { Some(self.metrics.clone_inner()) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // CrossJoin has no join conditions or expressions - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 3cdd60d7ab3c8..f7391feb29cc3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -72,7 +72,6 @@ use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_schema::{DataType, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, @@ -1251,30 +1250,6 @@ impl ExecutionPlan for HashJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to join key expressions from both sides - let mut tnr = TreeNodeRecursion::Continue; - for (left, right) in &self.on { - tnr = tnr.visit_sibling(|| f(left.as_ref()))?; - tnr = tnr.visit_sibling(|| f(right.as_ref()))?; - } - - // Apply to join filter expression if present - if let Some(filter) = &self.filter { - tnr = tnr.visit_sibling(|| f(filter.expression().as_ref()))?; - } - - // Apply to dynamic filter expression if present - if let Some(df) = &self.dynamic_filter { - tnr = tnr.visit_sibling(|| f(df.filter.as_ref()))?; - } - - Ok(tnr) - } - /// Creates a new HashJoinExec with different children while preserving configuration. /// /// This method is called during query optimization when the optimizer creates new diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index feaf344200ac1..15af23b447836 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -60,7 +60,6 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::DataType; use datafusion_common::cast::as_boolean_array; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ JoinSide, Result, ScalarValue, Statistics, arrow_err, assert_eq_or_internal_err, internal_datafusion_err, internal_err, project_schema, unwrap_or_internal_err, @@ -580,17 +579,6 @@ impl ExecutionPlan for NestedLoopJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn crate::PhysicalExpr) -> Result, - ) -> Result { - // Apply to join filter expressions if present - if let Some(filter) = &self.filter { - f(filter.expression().as_ref())?; - } - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 2b20089f8e221..50e9252a21131 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -23,7 +23,6 @@ use arrow::{ }; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::not_impl_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{JoinSide, Result, internal_err}; use datafusion_execution::{ SendableRecordBatchStream, @@ -508,14 +507,6 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { vec![&self.buffered, &self.streamed] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to the two expressions being compared in the range predicate - f(self.on.0.as_ref())?.visit_sibling(|| f(self.on.1.as_ref())) - } - fn required_input_distribution(&self) -> Vec { vec![ Distribution::SinglePartition, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 3f309431614a4..9e87b52696a57 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -45,7 +45,6 @@ use crate::{ use arrow::compute::SortOptions; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err, internal_err, plan_err, @@ -450,23 +449,6 @@ impl ExecutionPlan for SortMergeJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn crate::PhysicalExpr) -> Result, - ) -> Result { - // Apply to join keys from both sides - let mut tnr = TreeNodeRecursion::Continue; - for (left, right) in &self.on { - tnr = tnr.visit_sibling(|| f(left.as_ref()))?; - tnr = tnr.visit_sibling(|| f(right.as_ref()))?; - } - // Apply to join filter expressions if present - if let Some(filter) = &self.filter { - tnr = tnr.visit_sibling(|| f(filter.expression().as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 34af88ea4027b..ef92964fadf84 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -66,7 +66,6 @@ use arrow::compute::concat_batches; use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::bisect; use datafusion_common::{ HashSet, JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err, @@ -460,23 +459,6 @@ impl ExecutionPlan for SymmetricHashJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn crate::PhysicalExpr) -> Result, - ) -> Result { - // Apply to join keys from both sides - let mut tnr = TreeNodeRecursion::Continue; - for (left, right) in &self.on { - tnr = tnr.visit_sibling(|| f(left.as_ref()))?; - tnr = tnr.visit_sibling(|| f(right.as_ref()))?; - } - // Apply to join filter expressions if present - if let Some(filter) = &self.filter { - tnr = tnr.visit_sibling(|| f(filter.expression().as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 223a476493b39..7f42c33a79ca0 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -34,11 +34,10 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_expr::LexOrdering; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -174,20 +173,6 @@ impl ExecutionPlan for GlobalLimitExec { vec![false] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to required ordering expressions if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = &self.required_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn with_new_children( self: Arc, mut children: Vec>, @@ -358,20 +343,6 @@ impl ExecutionPlan for LocalLimitExec { vec![true] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to required ordering expressions if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = &self.required_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, @@ -565,6 +536,7 @@ mod tests { use arrow::array::RecordBatchOptions; use arrow::datatypes::Schema; use datafusion_common::stats::Precision; + use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::col; #[tokio::test] diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index e172ef4463ec4..ad54905f474aa 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -32,11 +32,10 @@ use crate::{ use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, assert_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryReservation; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::Stream; @@ -312,13 +311,6 @@ impl ExecutionPlan for LazyMemoryExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index eca017cde9d0c..041ef4666658d 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -1034,7 +1034,6 @@ mod tests { use std::fmt; use crate::execution_plan::{Boundedness, EmissionType}; - use datafusion_common::tree_node::TreeNodeRecursion; fn make_schema() -> Arc { Arc::new(Schema::new(vec![ @@ -1114,13 +1113,6 @@ mod tests { &self.cache } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, @@ -1222,13 +1214,6 @@ mod tests { self.input.properties() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index ae8e73cd74ade..b99f9a93045fb 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -29,11 +29,9 @@ use crate::{ use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions}; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_expr::PhysicalExpr; use log::trace; @@ -137,13 +135,6 @@ impl ExecutionPlan for PlaceholderRowExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e5b91fbb1c5d4..d49522dfd2989 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -312,17 +312,6 @@ impl ExecutionPlan for ProjectionExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in self.projector.projection().as_ref().iter() { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index c160f9a0dc763..f34aac3744557 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -37,12 +37,10 @@ use arrow::array::{BooleanArray, BooleanBuilder}; use arrow::compute::filter_record_batch; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, internal_datafusion_err, not_impl_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::{Stream, StreamExt, ready}; @@ -154,13 +152,6 @@ impl ExecutionPlan for RecursiveQueryExec { vec![&self.static_term, &self.recursive_term] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - // TODO: control these hints and see whether we can // infer some from the child plans (static/recursive terms). fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5d87836ba518b..465ca4a99e961 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -50,7 +50,6 @@ use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::transpose; use datafusion_common::{ ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err, @@ -1185,21 +1184,6 @@ impl ExecutionPlan for RepartitionExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to hash partition expressions if this is a hash repartition - if let Partitioning::Hash(exprs, _) = self.partitioning() { - let mut tnr = TreeNodeRecursion::Continue; - for expr in exprs { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - return Ok(tnr); - } - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/scalar_subquery.rs b/datafusion/physical-plan/src/scalar_subquery.rs index 82421d66dee9e..25f7332f95272 100644 --- a/datafusion/physical-plan/src/scalar_subquery.rs +++ b/datafusion/physical-plan/src/scalar_subquery.rs @@ -27,11 +27,9 @@ use std::fmt; use std::sync::Arc; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, ScalarValue, Statistics, exec_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex}; -use datafusion_physical_expr::PhysicalExpr; use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties}; use crate::joins::utils::{OnceAsync, OnceFut}; @@ -225,13 +223,6 @@ impl ExecutionPlan for ScalarSubqueryExec { ))) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn maintains_input_order(&self) -> Vec { // Only the main input (first child); subquery children don't contribute // to ordering. @@ -388,13 +379,6 @@ mod tests { ))) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index abd9ebb142a66..3bf16af36c62b 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -69,10 +69,9 @@ use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::evaluate_partition_ranges; use datafusion_execution::{RecordBatchStream, TaskContext}; -use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_expr::LexOrdering; use futures::{Stream, StreamExt, ready}; use log::trace; @@ -284,17 +283,6 @@ impl ExecutionPlan for PartialSortExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index f4c2585ea790d..fe876eeddf7f2 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -35,7 +35,6 @@ use arrow::array::{RecordBatch, UInt32Array}; use arrow::compute::{BatchCoalescer, take_record_batch}; use arrow::datatypes::SchemaRef; use arrow::row::{OwnedRow, RowConverter}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{HashMap, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -332,17 +331,6 @@ impl ExecutionPlan for PartitionedTopKExec { )?)) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 90d4b5ec12f91..f715de0b5964b 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -59,7 +59,6 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ DataFusionError, Result, assert_or_internal_err, internal_datafusion_err, unwrap_or_internal_err, @@ -1151,25 +1150,6 @@ impl ExecutionPlan for SortExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to sort expressions - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - - // Apply to dynamic filter expression if present (when fetch is Some, TopK mode) - if let Some(filter) = &self.filter { - let filter_guard = filter.read(); - tnr = tnr.visit_sibling(|| f(filter_guard.expr().as_ref()))?; - } - - Ok(tnr) - } - fn benefits_from_input_partitioning(&self) -> Vec { vec![false] } @@ -1530,13 +1510,6 @@ mod tests { Ok(self) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 13c28ccb10991..09570f14ba734 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -30,11 +30,9 @@ use crate::{ check_if_same_properties, }; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; use crate::execution_plan::{EvaluationType, SchedulingType}; @@ -287,17 +285,6 @@ impl ExecutionPlan for SortPreservingMergeExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, mut children: Vec>, @@ -1419,12 +1406,6 @@ mod tests { fn children(&self) -> Vec<&Arc> { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 250eb59f19b87..cdf4b08f718c6 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -33,10 +33,8 @@ use crate::stream::RecordBatchStreamAdapter; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, internal_err, plan_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use async_trait::async_trait; @@ -245,13 +243,6 @@ impl ExecutionPlan for StreamingTableExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 4c4724e4dcc4f..a6e76cebcdee2 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -35,7 +35,6 @@ use crate::{DisplayAs, DisplayFormatType, PlanProperties}; use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema, }; @@ -45,9 +44,7 @@ use datafusion_physical_expr::equivalence::{ }; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; -use datafusion_physical_expr::{ - EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr, -}; +use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning}; use futures::{Future, FutureExt}; @@ -140,20 +137,6 @@ impl ExecutionPlan for TestMemoryExec { Vec::new() } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to all sort information orderings - let mut tnr = TreeNodeRecursion::Continue; - for ordering in &self.sort_information { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 200223b9b660a..e162571e32261 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -35,10 +35,9 @@ use std::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result, internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use futures::Stream; use tokio::sync::Barrier; @@ -196,13 +195,6 @@ impl ExecutionPlan for MockExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, @@ -436,13 +428,6 @@ impl ExecutionPlan for BarrierExec { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - /// Returns a stream which yields data fn execute( &self, @@ -575,13 +560,6 @@ impl ExecutionPlan for ErrorExec { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - /// Returns a stream which yields data fn execute( &self, @@ -661,13 +639,6 @@ impl ExecutionPlan for StatisticsExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, @@ -775,13 +746,6 @@ impl ExecutionPlan for BlockingExec { internal_err!("Children cannot be replaced in {self:?}") } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, @@ -917,13 +881,6 @@ impl ExecutionPlan for PanicExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index ec9ea376e0b6d..3ea2eb5402fe5 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -49,7 +49,6 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::NdvFallback; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, }; @@ -269,13 +268,6 @@ impl ExecutionPlan for UnionExec { self.inputs.iter().collect() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, @@ -589,13 +581,6 @@ impl ExecutionPlan for InterleaveExec { vec![false; self.inputs().len()] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 3a4b9d7232f4d..c31d0dd23fa68 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -44,7 +44,6 @@ use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_ord::cmp::lt; use async_trait::async_trait; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err, internal_err, @@ -239,13 +238,6 @@ impl ExecutionPlan for UnnestExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index f442bcea94be2..6c6b26c9cf49f 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -48,7 +48,6 @@ use arrow::{ }; use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::{ evaluate_partition_ranges, get_at_indices, get_row_at_idx, }; @@ -321,19 +320,6 @@ impl ExecutionPlan for BoundedWindowAggExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for window_expr in &self.window_expr { - for expr in window_expr.expressions() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - Ok(tnr) - } - fn required_input_ordering(&self) -> Vec> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 9e8fc8a6ebb62..ee3b071fc9167 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -41,7 +41,6 @@ use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::{evaluate_partition_ranges, transpose}; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; @@ -222,19 +221,6 @@ impl ExecutionPlan for WindowAggExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for window_expr in &self.window_expr { - for expr in window_expr.expressions() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - Ok(tnr) - } - fn maintains_input_order(&self) -> Vec { vec![true] } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 0855dbf2fd635..28b9c8ddc704c 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -31,11 +31,10 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_datafusion_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryReservation; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; /// A vector of record batches with a memory reservation. #[derive(Debug)] @@ -186,13 +185,6 @@ impl ExecutionPlan for WorkTableExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 81b2d131e65c3..540782e3e8bf7 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -766,7 +766,6 @@ impl DatePartitionedTable { # fn children(&self) -> Vec<&Arc> { vec![] } # fn with_new_children(self: Arc, _: Vec>) -> Result> { Ok(self) } # fn execute(&self, _: usize, _: Arc) -> Result { todo!() } -# fn apply_expressions(&self, _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result) -> Result { Ok(TreeNodeRecursion::Continue) } # } ``` @@ -910,13 +909,6 @@ impl ExecutionPlan for CountingExec { batch_stream, ))) } - -# fn apply_expressions( -# &self, -# _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -# ) -> Result { -# Ok(TreeNodeRecursion::Continue) -# } } ``` diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 0c3bf20a91ed5..8245793ec07de 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -169,73 +169,6 @@ where string types are preferred (`UNION`, `CASE THEN/ELSE`, `NVL2`). string-preferring behavior - Crates that call `get_coerce_type_for_case_expression` -### `ExecutionPlan::apply_expressions` is now a required method - -`apply_expressions` has been added as a **required** method on the `ExecutionPlan` trait (no default implementation). The same applies to the `FileSource` and `DataSource` traits. Any custom implementation of these traits must now implement `apply_expressions`. - -**Who is affected:** - -- Users who implement custom `ExecutionPlan` nodes -- Users who implement custom `FileSource` or `DataSource` sources - -**Migration guide:** - -Add `apply_expressions` to your implementation. Call `f` on each top-level `PhysicalExpr` your node owns, using `visit_sibling` to correctly propagate `TreeNodeRecursion`: - -**Node with no expressions:** - -```rust,ignore -fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - Ok(TreeNodeRecursion::Continue) -} -``` - -**Node with a single expression:** - -```rust,ignore -fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - f(self.predicate.as_ref()) -} -``` - -**Node with multiple expressions:** - -```rust,ignore -fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for expr in &self.expressions { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - Ok(tnr) -} -``` - -**Node whose only expressions are in `output_ordering()` (e.g. a synthetic test node with no owned expression fields):** - -```rust,ignore -fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) -} -``` - ### `ExecutionPlan::partition_statistics` now returns `Arc` `ExecutionPlan::partition_statistics` now returns `Result>` instead of `Result`. This avoids cloning `Statistics` when it is shared across multiple consumers.