From 51969abd312a605910a3e5a4ff703cd6a1df49a0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 May 2026 16:25:38 -0400 Subject: [PATCH] Revert "Add `ExecutionPlan::apply_expressions()` (#20337)" (#22437) - Reverts #20337 - Addresses concerns raised in https://github.com/apache/datafusion/pull/22415 - Closes https://github.com/apache/datafusion/pull/22415 `ExecutionPlan::apply_expressions()` was added in #20337 with no default implementation, forcing every custom `ExecutionPlan`, `FileSource`, and `DataSource` implementor to add the method as part of upgrading to DataFusion 54. As discussed on #22415, per @LiaCastaneda and @adriangb the method is not yet called from anywhere in DataFusion and the originally intended use (dynamic-filter discovery/serialization for distributed scenarios) is blocked on other in-progress work (#20009, #21350). The combined effect on downstream users is a required code change with no immediate benefit, and ambiguity about what a "correct" implementation even means today (e.g. is returning `Ok(TreeNodeRecursion::Continue)` is safe right now but becomes incorrect as soon as the method starts being used by an optimizer pass?. The plan agreed in the discussion is to remove the API from the 54.0 release and re-add it together with the concrete consumer that needs it. cc @adriangb @LiaCastaneda @milenkovicm. `git revert -m 1` of the merge commit, with the following manual conflict resolutions and follow-ups: By CI Yes -- this removes the new public API: - `ExecutionPlan::apply_expressions` - `FileSource::apply_expressions` - `DataSource::apply_expressions` These were only added in 54 and are not yet released. Custom implementors no longer need to implement these methods. --- .../custom_data_source/custom_datasource.rs | 17 -- .../memory_pool_execution_plan.rs | 17 -- .../proto/composed_extension_codec.rs | 19 -- .../examples/relation_planner/table_sample.rs | 18 +- datafusion/catalog/src/memory/table.rs | 12 +- datafusion/core/src/physical_planner.rs | 32 --- .../core/tests/custom_sources_cases/mod.rs | 17 -- .../provider_filter_pushdown.rs | 17 -- .../tests/custom_sources_cases/statistics.rs | 17 -- datafusion/core/tests/fuzz_cases/once_exec.rs | 17 -- .../enforce_distribution.rs | 25 +-- .../physical_optimizer/filter_pushdown.rs | 105 ---------- .../physical_optimizer/join_selection.rs | 29 --- .../physical_optimizer/pushdown_utils.rs | 29 --- .../tests/physical_optimizer/test_utils.rs | 40 +--- .../tests/user_defined/insert_operation.rs | 17 -- .../tests/user_defined/user_defined_plan.rs | 20 +- datafusion/datasource-arrow/src/source.rs | 15 -- datafusion/datasource-avro/src/source.rs | 15 -- datafusion/datasource-csv/src/source.rs | 15 -- datafusion/datasource-json/src/source.rs | 15 -- datafusion/datasource-parquet/src/source.rs | 21 -- datafusion/datasource/src/file.rs | 22 -- .../datasource/src/file_scan_config/mod.rs | 28 --- datafusion/datasource/src/memory.rs | 15 -- datafusion/datasource/src/sink.rs | 16 +- datafusion/datasource/src/source.rs | 25 --- datafusion/datasource/src/test_util.rs | 9 +- datafusion/ffi/src/execution_plan.rs | 33 --- datafusion/ffi/src/tests/async_provider.rs | 17 -- .../physical-optimizer/src/ensure_coop.rs | 10 +- .../src/output_requirements.rs | 34 +-- .../physical-plan/src/aggregates/mod.rs | 38 ---- datafusion/physical-plan/src/analyze.rs | 9 - datafusion/physical-plan/src/async_func.rs | 10 +- datafusion/physical-plan/src/buffer.rs | 8 - .../physical-plan/src/coalesce_batches.rs | 8 - .../physical-plan/src/coalesce_partitions.rs | 8 - datafusion/physical-plan/src/coop.rs | 8 - datafusion/physical-plan/src/display.rs | 12 +- datafusion/physical-plan/src/empty.rs | 10 +- .../physical-plan/src/execution_plan.rs | 195 +----------------- datafusion/physical-plan/src/explain.rs | 10 +- datafusion/physical-plan/src/filter.rs | 8 - .../physical-plan/src/joins/cross_join.rs | 10 - .../physical-plan/src/joins/hash_join/exec.rs | 25 --- .../src/joins/nested_loop_join.rs | 12 -- .../src/joins/piecewise_merge_join/exec.rs | 9 - .../src/joins/sort_merge_join/exec.rs | 18 -- .../src/joins/symmetric_hash_join.rs | 18 -- datafusion/physical-plan/src/limit.rs | 32 +-- datafusion/physical-plan/src/memory.rs | 10 +- .../src/operator_statistics/mod.rs | 15 -- .../physical-plan/src/placeholder_row.rs | 9 - datafusion/physical-plan/src/projection.rs | 11 - .../physical-plan/src/recursive_query.rs | 9 - .../physical-plan/src/repartition/mod.rs | 16 -- .../physical-plan/src/scalar_subquery.rs | 16 -- .../physical-plan/src/sorts/partial_sort.rs | 14 +- .../src/sorts/partitioned_topk.rs | 12 -- datafusion/physical-plan/src/sorts/sort.rs | 27 --- .../src/sorts/sort_preserving_merge.rs | 19 -- datafusion/physical-plan/src/streaming.rs | 9 - datafusion/physical-plan/src/test.rs | 19 +- datafusion/physical-plan/src/test/exec.rs | 45 +--- datafusion/physical-plan/src/union.rs | 15 -- datafusion/physical-plan/src/unnest.rs | 8 - .../src/windows/bounded_window_agg_exec.rs | 14 -- .../src/windows/window_agg_exec.rs | 14 -- datafusion/physical-plan/src/work_table.rs | 10 +- .../custom-table-providers.md | 8 - .../library-user-guide/upgrading/54.0.0.md | 67 ------ 72 files changed, 22 insertions(+), 1531 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 701a886d2a140..937452a286b90 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -26,7 +26,6 @@ use async_trait::async_trait; use datafusion::arrow::array::{UInt8Builder, UInt64Builder}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::datasource::{TableProvider, TableType, provider_as_source}; use datafusion::error::Result; use datafusion::execution::context::TaskContext; @@ -275,20 +274,4 @@ impl ExecutionPlan for CustomExec { None, )?)) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } diff --git a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs index dc374c7e02fe5..eab813b7eedbd 100644 --- a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs @@ -29,7 +29,6 @@ use arrow::record_batch::RecordBatch; use arrow_schema::SchemaRef; use datafusion::common::record_batch; -use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::common::{exec_datafusion_err, internal_err}; use datafusion::datasource::{DefaultTableSource, memory::MemTable}; use datafusion::error::Result; @@ -292,20 +291,4 @@ impl ExecutionPlan for BufferingExecutionPlan { }), ))) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index ae9503dd87b19..9a4a76749433a 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -37,7 +37,6 @@ use std::sync::Arc; use datafusion::common::Result; use datafusion::common::internal_err; -use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::execution::TaskContext; use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; use datafusion::prelude::SessionContext; @@ -124,15 +123,6 @@ impl ExecutionPlan for ParentExec { ) -> Result { unreachable!() } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } /// A PhysicalExtensionCodec that can serialize and deserialize ParentExec @@ -205,15 +195,6 @@ impl ExecutionPlan for ChildExec { ) -> Result { unreachable!() } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } /// A PhysicalExtensionCodec that can serialize and deserialize ChildExec diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 42342e5f1a641..46826216e28da 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -116,7 +116,7 @@ use datafusion::{ }; use datafusion_common::{ DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err, - plan_datafusion_err, plan_err, tree_node::TreeNodeRecursion, + plan_datafusion_err, plan_err, }; use datafusion_expr::{ UserDefinedLogicalNode, UserDefinedLogicalNodeCore, @@ -738,22 +738,6 @@ impl ExecutionPlan for SampleExec { Ok(Arc::new(stats)) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } /// Bernoulli sampler: includes each row with probability `(upper - lower)`. diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 8102c15079658..bbc962d9acabf 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -31,7 +31,6 @@ use arrow::compute::{and, filter_record_batch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::error::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::memory::{MemSink, MemorySourceConfig}; @@ -40,13 +39,13 @@ use datafusion_datasource::source::DataSourceExec; use datafusion_expr::dml::InsertOp; use datafusion_expr::{Expr, SortExpr, TableType}; use datafusion_physical_expr::{ - LexOrdering, create_physical_expr, create_physical_sort_exprs, + LexOrdering, PhysicalExpr, create_physical_expr, create_physical_sort_exprs, }; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, - PhysicalExpr, PlanProperties, common, + PlanProperties, common, }; use datafusion_session::Session; @@ -627,11 +626,4 @@ impl ExecutionPlan for DmlResultExec { stream, ))) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d225cff1deafc..ee97309c27aae 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -4475,20 +4475,6 @@ mod tests { ) -> Result { unimplemented!("NoOpExecutionPlan::execute"); } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } // Produces an execution plan where the schema is mismatched from @@ -4628,12 +4614,6 @@ digraph { ) -> Result { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } impl DisplayAs for OkExtensionNode { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { @@ -4680,12 +4660,6 @@ digraph { ) -> Result { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } impl DisplayAs for InvariantFailsExtensionNode { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { @@ -4804,12 +4778,6 @@ digraph { ) -> Result { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } impl DisplayAs for ExecutableInvariantFails { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index cef75b444f6fe..06b3701cbe6d6 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -38,7 +38,6 @@ use datafusion_catalog::Session; use datafusion_common::cast::as_primitive_array; use datafusion_common::project_schema; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::PlanProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -205,22 +204,6 @@ impl ExecutionPlan for CustomExecutionPlan { .collect(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[async_trait] diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index e52c559ec79ef..18695accd0f2e 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -35,7 +35,6 @@ use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use datafusion_catalog::Session; use datafusion_common::cast::as_primitive_array; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, internal_err, not_impl_err}; use datafusion_expr::expr::{BinaryExpr, Cast}; use datafusion_functions_aggregate::expr_fn::count; @@ -149,22 +148,6 @@ impl ExecutionPlan for CustomPlan { })), ))) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[derive(Clone, Debug)] diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 01c4deac5ccd3..14406c2316da0 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -33,7 +33,6 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_catalog::Session; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -181,22 +180,6 @@ impl ExecutionPlan for StatisticsValidation { Ok(Arc::new(self.stats.clone())) } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } fn init_ctx(stats: Statistics, schema: Schema) -> Result { diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs b/datafusion/core/tests/fuzz_cases/once_exec.rs index 403e377a690e2..9b57141061518 100644 --- a/datafusion/core/tests/fuzz_cases/once_exec.rs +++ b/datafusion/core/tests/fuzz_cases/once_exec.rs @@ -17,7 +17,6 @@ use arrow_schema::SchemaRef; use datafusion_common::internal_datafusion_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -106,20 +105,4 @@ impl ExecutionPlan for OnceExec { stream.ok_or_else(|| internal_datafusion_err!("Stream already consumed")) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> datafusion_common::Result, - ) -> datafusion_common::Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 78bb02ab1108b..12abf79041091 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -40,9 +40,7 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::ScalarValue; use datafusion_common::config::CsvOptions; use datafusion_common::error::Result; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; @@ -203,20 +201,6 @@ impl ExecutionPlan for SortRequiredExec { ))) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn execute( &self, _partition: usize, @@ -300,13 +284,6 @@ impl ExecutionPlan for SinglePartitionMaintainsOrderExec { Ok(Arc::new(Self::new(child))) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index f56b8c6d70624..b420326596d0d 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -2942,111 +2942,6 @@ async fn test_filter_with_projection_pushdown() { assert_batches_eq!(expected, &result); } -/// Test that ExecutionPlan::apply_expressions() can discover dynamic filters across the plan tree. -/// -/// Not portable to sqllogictest: asserts by walking the plan tree with -/// `apply_expressions` + `downcast_ref::` and -/// counting nodes. Neither API is observable from SQL. -#[tokio::test] -async fn test_discover_dynamic_filters_via_expressions_api() { - use datafusion_common::JoinType; - use datafusion_common::tree_node::TreeNodeRecursion; - use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; - use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - - fn count_dynamic_filters(plan: &Arc) -> usize { - let mut count = 0; - - // Check expressions from this node using apply_expressions - let _ = plan.apply_expressions(&mut |expr| { - if let Some(_df) = expr.downcast_ref::() { - count += 1; - } - Ok(TreeNodeRecursion::Continue) - }); - - // Recursively visit children - for child in plan.children() { - count += count_dynamic_filters(child); - } - - count - } - - // Create build side (left) - let build_batches = - vec![record_batch!(("a", Utf8, ["foo", "bar"]), ("b", Int32, [1, 2])).unwrap()]; - let build_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Int32, false), - ])); - let build_scan = TestScanBuilder::new(build_schema.clone()) - .with_support(true) - .with_batches(build_batches) - .build(); - - // Create probe side (right) - let probe_batches = vec![ - record_batch!( - ("a", Utf8, ["foo", "bar", "baz", "qux"]), - ("c", Float64, [1.0, 2.0, 3.0, 4.0]) - ) - .unwrap(), - ]; - let probe_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("c", DataType::Float64, false), - ])); - let probe_scan = TestScanBuilder::new(probe_schema.clone()) - .with_support(true) - .with_batches(probe_batches) - .build(); - - // Create HashJoinExec - let plan = Arc::new( - HashJoinExec::try_new( - build_scan, - probe_scan, - vec![( - col("a", &build_schema).unwrap(), - col("a", &probe_schema).unwrap(), - )], - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - datafusion_common::NullEquality::NullEqualsNothing, - false, - ) - .unwrap(), - ) as Arc; - - // Before optimization: no dynamic filters - let count_before = count_dynamic_filters(&plan); - assert_eq!( - count_before, 0, - "Before optimization, should have no dynamic filters" - ); - - // Apply filter pushdown optimization (this creates dynamic filters) - let mut config = ConfigOptions::default(); - config.optimizer.enable_dynamic_filter_pushdown = true; - config.execution.parquet.pushdown_filters = true; - let optimized_plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) - .unwrap(); - - // After optimization: should discover dynamic filters - // We expect 2 dynamic filters: - // 1. In the HashJoinExec (producer) - // 2. In the DataSourceExec (consumer, pushed down to the probe side) - let count_after = count_dynamic_filters(&optimized_plan); - assert_eq!( - count_after, 2, - "After optimization, should discover exactly 2 dynamic filters (1 in HashJoinExec, 1 in DataSourceExec), found {count_after}" - ); -} - // ==== Filter pushdown through SortExec tests ==== /// FilterExec above a plain SortExec (no fetch) should be pushed below it. diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 050baa9e792e9..29a2b59e5725d 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -25,7 +25,6 @@ use std::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ColumnStatistics, JoinType, ScalarValue, stats::Precision}; use datafusion_common::{JoinSide, NullEquality}; use datafusion_common::{Result, Statistics}; @@ -1059,20 +1058,6 @@ impl ExecutionPlan for UnboundedExec { batch: self.batch.clone(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[derive(Eq, PartialEq, Debug)] @@ -1174,20 +1159,6 @@ impl ExecutionPlan for StatisticsExec { self.stats.clone() })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[test] diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs index 8b659e757aa2a..61fd0a45952ba 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -18,7 +18,6 @@ use arrow::datatypes::SchemaRef; use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, config::ConfigOptions, internal_err}; use datafusion_datasource::{ PartitionedFile, file::FileSource, file_scan_config::FileScanConfig, @@ -235,25 +234,6 @@ impl FileSource for TestSource { fn table_schema(&self) -> &datafusion_datasource::TableSchema { &self.table_schema } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit predicate (filter) expression if present - if let Some(predicate) = &self.predicate { - f(predicate.as_ref())?; - } - - // Visit projection expressions if present - if let Some(projection) = &self.projection { - for proj_expr in projection { - f(proj_expr.expr.as_ref())?; - } - } - - Ok(TreeNodeRecursion::Continue) - } } #[derive(Debug, Clone)] @@ -569,13 +549,4 @@ impl ExecutionPlan for TestNode { Ok(res) } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit the predicate expression - f(self.predicate.as_ref())?; - Ok(TreeNodeRecursion::Continue) - } } diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 6814ab2358ffc..09225cb0385a7 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -30,9 +30,7 @@ use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; use datafusion_common::{ ColumnStatistics, JoinType, NullEquality, Result, Statistics, internal_err, @@ -489,20 +487,6 @@ impl ExecutionPlan for RequirementsTestExec { ) -> Result { unimplemented!("Test exec does not support execution") } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in required_input_ordering if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = &self.required_input_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } /// A [`PlanContext`] object is susceptible to being left in an inconsistent state after @@ -1035,28 +1019,6 @@ impl ExecutionPlan for TestScan { }) } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in output_ordering - let mut tnr = TreeNodeRecursion::Continue; - for ordering in &self.output_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - - // Visit expressions in requested_ordering if present - if let Some(ordering) = &self.requested_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - - Ok(tnr) - } } /// Helper function to create a TestScan with ordering diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 326c767d97610..f3d3f70bdf925 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -25,7 +25,6 @@ use datafusion::{ }; use datafusion_catalog::{Session, TableProvider}; use datafusion_common::config::Dialect; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_expr::{Expr, TableType, dml::InsertOp}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_plan::execution_plan::SchedulingType; @@ -177,22 +176,6 @@ impl ExecutionPlan for TestInsertExec { ) -> Result { unimplemented!("TestInsertExec is a stub for testing.") } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.plan_properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } fn make_count_schema() -> SchemaRef { diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 505468a19cd37..e8ff6758ccdd4 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -90,9 +90,7 @@ use datafusion::{ prelude::{SessionConfig, SessionContext}, }; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ScalarValue, assert_eq_or_internal_err, assert_or_internal_err}; use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr}; use datafusion_optimizer::AnalyzerRule; @@ -744,22 +742,6 @@ impl ExecutionPlan for TopKExec { state: BTreeMap::new(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion::physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } // A very specialized TopK implementation diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 061f130f24131..59c020c779ca2 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -40,7 +40,6 @@ use arrow::buffer::Buffer; use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader}; use datafusion_common::error::Result; use datafusion_common::exec_datafusion_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_datasource::PartitionedFile; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -393,20 +392,6 @@ impl FileSource for ArrowSource { fn projection(&self) -> Option<&ProjectionExprs> { Some(&self.projection.source) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } /// `FileOpener` wrapper for both Arrow IPC file and stream formats diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index b80d4f462e425..e3be9d8a401d0 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; use arrow_avro::reader::{Reader, ReaderBuilder}; use datafusion_common::error::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -169,20 +168,6 @@ impl FileSource for AvroSource { // Avro OCF does not support safe byte-range splitting in this reader path. false } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } mod private { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 611586cee6473..638279f827344 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -34,7 +34,6 @@ use datafusion_datasource::{ use arrow::csv; use datafusion_common::config::CsvOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::file::FileSource; @@ -310,20 +309,6 @@ impl FileSource for CsvSource { DisplayFormatType::TreeRender => Ok(()), } } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } impl FileOpener for CsvOpener { diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 2f2f459956f4e..179870673d426 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -29,7 +29,6 @@ use crate::boundary_stream::AlignedBoundaryStream; use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::exec_datafusion_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; @@ -233,20 +232,6 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit projection expressions - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in &self.projection.source { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } } impl FileOpener for JsonOpener { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 2b367cf7600d5..2e2d0be0da507 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -36,7 +36,6 @@ use arrow::array::timezone::Tz; use arrow::datatypes::TimeUnit; use datafusion_common::DataFusionError; use datafusion_common::config::TableParquetOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -961,26 +960,6 @@ impl FileSource for ParquetSource { inner: Arc::new(new_source) as Arc, }) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn PhysicalExpr, - ) -> datafusion_common::Result, - ) -> datafusion_common::Result { - // Visit predicate (filter) expression if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(predicate) = &self.predicate { - tnr = tnr.visit_sibling(|| f(predicate.as_ref()))?; - } - - // Visit projection expressions - for proj_expr in &self.projection { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - - Ok(tnr) - } } #[cfg(test)] diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 32bee63b54f23..07460b23694b7 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -29,7 +29,6 @@ use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, not_impl_err}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; @@ -352,27 +351,6 @@ pub trait FileSource: Any + Send + Sync { fn schema_adapter_factory(&self) -> Option> { None } - - /// Apply a function to all physical expressions used by this file source. - /// - /// This includes: - /// - Filter predicates (which may contain dynamic filters) - /// - Projection expressions - /// - /// The function `f` is called once for each expression. The function should - /// return `TreeNodeRecursion::Continue` to continue visiting other expressions, - /// or `TreeNodeRecursion::Stop` to stop visiting expressions early. - /// - /// Implementations must explicitly visit all expressions. There is no default - /// implementation to ensure that all FileSource implementations handle this correctly. - /// - /// See [`ExecutionPlan::apply_expressions`] for more details and examples. - /// - /// [`ExecutionPlan::apply_expressions`]: datafusion_physical_plan::ExecutionPlan::apply_expressions - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result; } impl dyn FileSource { diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..e1fd10324373d 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -30,7 +30,6 @@ use crate::{ use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err, }; @@ -82,9 +81,7 @@ use std::{fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; /// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef}; /// # use object_store::ObjectStore; /// # use datafusion_common::Result; -/// # use datafusion_common::tree_node::TreeNodeRecursion; /// # use datafusion_datasource::file::FileSource; -/// # use datafusion_physical_plan::PhysicalExpr; /// # use datafusion_datasource::file_groups::FileGroup; /// # use datafusion_datasource::PartitionedFile; /// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; @@ -114,7 +111,6 @@ use std::{fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; /// # fn file_type(&self) -> &str { "parquet" } /// # // Note that this implementation drops the projection on the floor, it is not complete! /// # fn try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result>> { Ok(Some(Arc::new(self.clone()) as Arc)) } -/// # fn apply_expressions(&self, _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result) -> Result { Ok(TreeNodeRecursion::Continue) } /// # } /// # impl ParquetSource { /// # fn new(table_schema: impl Into) -> Self { Self {table_schema: table_schema.into()} } @@ -999,14 +995,6 @@ impl DataSource for FileScanConfig { Some(Arc::new(new_config)) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Delegate to the file source - self.file_source.apply_expressions(f) - } - /// Create any shared state that should be passed between sibling streams /// during one execution. /// @@ -1408,11 +1396,9 @@ mod tests { use arrow::datatypes::Field; use datafusion_common::ColumnStatistics; use datafusion_common::stats::Precision; - use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_batches_eq, internal_err}; use datafusion_execution::TaskContext; use datafusion_expr::SortExpr; - use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExpr; @@ -1475,13 +1461,6 @@ mod tests { inner: Arc::new(self.clone()) as Arc, }) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } #[test] @@ -2624,13 +2603,6 @@ mod tests { inner: Arc::new(self.clone()) as Arc, }) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } #[test] diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 7c9281dcc2f26..f073b09c5463e 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -28,7 +28,6 @@ use crate::source::{DataSource, DataSourceExec}; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, ScalarValue, assert_or_internal_err, plan_err, project_schema, }; @@ -257,20 +256,6 @@ impl DataSource for MemorySourceConfig { }) .transpose() } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Visit expressions in sort_information - let mut tnr = TreeNodeRecursion::Continue; - for ordering in &self.sort_information { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } impl MemorySourceConfig { diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 2a1f5c4a2fd02..e3df1ad6381f4 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -24,10 +24,9 @@ use std::sync::Arc; use arrow::array::{ArrayRef, RecordBatch, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::{Distribution, EquivalenceProperties}; use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements}; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -224,19 +223,6 @@ impl ExecutionPlan for DataSinkExec { ))) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to sort order requirements if present - if let Some(sort_order) = &self.sort_order { - for req in sort_order.iter() { - f(req.expr.as_ref())?; - } - } - Ok(TreeNodeRecursion::Continue) - } - /// Execute the plan and return a stream of `RecordBatch`es for /// the specified partition. fn execute( diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 420c6b508ce4f..af4bc09504937 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -40,7 +40,6 @@ use itertools::Itertools; use crate::file::FileSource; use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; @@ -225,22 +224,6 @@ pub trait DataSource: Any + Send + Sync + Debug { None } - /// Apply a closure to each expression used by this data source. - /// - /// This includes filter predicates (which may contain dynamic filters) and any - /// other expressions used during data scanning. - /// - /// Implementations must override this method. If the data source has no expressions, - /// return `Ok(TreeNodeRecursion::Continue)` immediately. - /// - /// See [`ExecutionPlan::apply_expressions`] for more details and implementation examples. - /// - /// [`ExecutionPlan::apply_expressions`]: datafusion_physical_plan::ExecutionPlan::apply_expressions - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result; - /// Injects arbitrary run-time state into this DataSource, returning a new instance /// that incorporates that state *if* it is relevant to the concrete DataSource implementation. /// @@ -368,14 +351,6 @@ impl ExecutionPlan for DataSourceExec { Vec::new() } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Delegate to the underlying data source - self.data_source.apply_expressions(f) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index b59ce58a420a8..d211319629878 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -22,7 +22,7 @@ use crate::{ use std::sync::Arc; use arrow::datatypes::Schema; -use datafusion_common::{Result, tree_node::TreeNodeRecursion}; +use datafusion_common::Result; use datafusion_physical_expr::{PhysicalExpr, expressions::Column}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; @@ -125,13 +125,6 @@ impl FileSource for MockSource { ) -> Option<&datafusion_physical_plan::projection::ProjectionExprs> { Some(&self.projection.source) } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } /// Create a column expression diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index ddad605081745..f942916ea19ff 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -20,7 +20,6 @@ use std::pin::Pin; use std::sync::Arc; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr_common::metrics::MetricsSet; @@ -439,22 +438,6 @@ impl ExecutionPlan for ForeignExecutionPlan { } } - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn repartitioned( &self, target_partitions: usize, @@ -581,22 +564,6 @@ pub mod tests { Statistics::new_unknown(self.props.eq_properties.schema()) }))) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.props.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } #[test] diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 011d3f0a0a343..69104709b477e 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -32,7 +32,6 @@ use arrow::array::RecordBatch; use arrow::datatypes::Schema; use async_trait::async_trait; use datafusion_catalog::TableProvider; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, exec_err}; use datafusion_execution::RecordBatchStream; use datafusion_expr::Expr; @@ -211,22 +210,6 @@ impl ExecutionPlan for AsyncTestExecutionPlan { batch_receiver: self.batch_receiver.resubscribe(), })) } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_plan::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in the output ordering from equivalence properties - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.properties.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } } impl datafusion_physical_plan::DisplayAs for AsyncTestExecutionPlan { diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index 102e21a4853a4..e7aacb2321b67 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -264,11 +264,10 @@ mod tests { // Test that cooperative context is reset when encountering an eager evaluation boundary. use arrow::datatypes::Schema; use datafusion_common::internal_err; - use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr, PlanProperties, + DisplayAs, DisplayFormatType, Partitioning, PlanProperties, SendableRecordBatchStream, execution_plan::{Boundedness, EmissionType}, }; @@ -346,13 +345,6 @@ mod tests { ) -> Result { internal_err!("DummyExec does not support execution") } - - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } } // Build a plan similar to the original test: diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 81df6f943c15e..24eb3af5f564c 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -27,9 +27,7 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; use datafusion_execution::TaskContext; use datafusion_physical_expr::Distribution; @@ -294,36 +292,6 @@ impl ExecutionPlan for OutputRequirementExec { fn fetch(&self) -> Option { self.fetch } - - fn apply_expressions( - &self, - f: &mut dyn FnMut( - &dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr, - ) -> Result, - ) -> Result { - // Visit expressions in order_requirement - let mut tnr = TreeNodeRecursion::Continue; - if let Some(order_reqs) = &self.order_requirement { - let lexes = match order_reqs { - OrderingRequirements::Hard(alternatives) => alternatives, - OrderingRequirements::Soft(alternatives) => alternatives, - }; - for lex in lexes { - for sort_expr in lex { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - } - - // Visit expressions in dist_requirement if it's HashPartitioned - if let Distribution::HashPartitioned(exprs) = &self.dist_requirement { - for expr in exprs { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - - Ok(tnr) - } } impl PhysicalOptimizerRule for OutputRequirements { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d1498e4a3ea55..5e6b8505764a2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -45,7 +45,6 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::FieldRef; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraint, Constraints, Result, ScalarValue, assert_eq_or_internal_err, internal_err, not_impl_err, @@ -1579,36 +1578,6 @@ impl ExecutionPlan for AggregateExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to group by expressions - let mut tnr = TreeNodeRecursion::Continue; - for expr in self.group_by.input_exprs() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - - // Apply to aggregate expressions - for aggr in self.aggr_expr.iter() { - for expr in aggr.expressions() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - - // Apply to filter expressions (FILTER WHERE clauses) - for filter in self.filter_expr.iter().flatten() { - tnr = tnr.visit_sibling(|| f(filter.as_ref()))?; - } - - // Apply to dynamic filter expression if present - if let Some(dyn_filter) = &self.dynamic_filter { - tnr = tnr.visit_sibling(|| f(dyn_filter.filter.as_ref()))?; - } - - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, @@ -2764,13 +2733,6 @@ mod tests { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index ea3abf439e4c1..582af8f1e3dae 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -30,11 +30,9 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_expr::PhysicalExpr; use futures::StreamExt; @@ -149,13 +147,6 @@ impl ExecutionPlan for AnalyzeExec { vec![Distribution::UnspecifiedDistribution] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index 8ad4ecb096962..1b15bf27e78cc 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -24,8 +24,7 @@ use crate::{ }; use arrow::array::RecordBatch; use arrow_schema::{Fields, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::ScalarFunctionExpr; @@ -165,13 +164,6 @@ impl ExecutionPlan for AsyncFuncExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 0cc4a1d71814e..19a4ebba83eae 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -31,7 +31,6 @@ use crate::{ }; use arrow::array::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, Statistics, internal_err, plan_err}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -168,13 +167,6 @@ impl ExecutionPlan for BufferExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 34cd770260915..76b2f63798f88 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -33,7 +33,6 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -184,13 +183,6 @@ impl ExecutionPlan for CoalesceBatchesExec { vec![false] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 3399554612431..fa200ef845f3a 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -34,7 +34,6 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_proper use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -154,13 +153,6 @@ impl ExecutionPlan for CoalescePartitionsExec { vec![false] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index fe6a3bc3d5678..111999b71c91d 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -71,7 +71,6 @@ //! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_physical_expr::PhysicalExpr; #[cfg(datafusion_coop = "tokio_fallback")] use futures::Future; @@ -277,13 +276,6 @@ impl ExecutionPlan for CooperativeExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 756a68b1a958d..f7c6de3fc591a 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1167,11 +1167,8 @@ mod tests { use std::fmt::Write; use std::sync::Arc; - use datafusion_common::{ - Result, Statistics, internal_datafusion_err, tree_node::TreeNodeRecursion, - }; + use datafusion_common::{Result, Statistics, internal_datafusion_err}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; - use datafusion_physical_expr::PhysicalExpr; use crate::{DisplayAs, ExecutionPlan, PlanProperties}; @@ -1214,13 +1211,6 @@ mod tests { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _: usize, diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 8103695ad08fa..2e7f982a51a31 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -29,10 +29,9 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use crate::execution_plan::SchedulingType; use log::trace; @@ -119,13 +118,6 @@ impl ExecutionPlan for EmptyExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1a67ea0ded11b..b55d3c32cb569 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -27,9 +27,7 @@ pub use crate::stream::EmptyRecordBatchStream; use arrow_schema::Schema; pub use datafusion_common::hash_utils; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; pub use datafusion_common::utils::project_schema; pub use datafusion_common::{ColumnStatistics, Statistics, internal_err}; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -203,80 +201,6 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { /// joins). fn children(&self) -> Vec<&Arc>; - /// Apply a closure `f` to each expression (non-recursively) in the current - /// physical plan node. This does not include expressions in any children. - /// - /// The closure `f` is applied to expressions in the order they appear in the plan. - /// The closure can return `TreeNodeRecursion::Continue` to continue visiting, - /// `TreeNodeRecursion::Stop` to stop visiting immediately, or `TreeNodeRecursion::Jump` - /// to skip any remaining expressions (though typically all expressions are visited). - /// - /// The expressions visited do not necessarily represent or even contribute - /// to the output schema of this node. For example, `FilterExec` visits the - /// filter predicate even though the output of a Filter has the same columns - /// as the input. - /// - /// # Example Usage - /// ``` - /// # use std::sync::Arc; - /// # use datafusion_physical_plan::ExecutionPlan; - /// # use datafusion_common::tree_node::TreeNodeRecursion; - /// # fn example(plan: Arc) -> datafusion_common::Result<()> { - /// // Count the number of expressions - /// let mut count = 0; - /// plan.apply_expressions(&mut |_expr| { - /// count += 1; - /// Ok(TreeNodeRecursion::Continue) - /// })?; - /// # Ok(()) - /// # } - /// ``` - /// - /// # Implementation Examples - /// - /// ## Node with no expressions (e.g., EmptyExec, MemoryExec) - /// ```ignore - /// fn apply_expressions( - /// &self, - /// _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - /// ) -> Result { - /// Ok(TreeNodeRecursion::Continue) - /// } - /// ``` - /// - /// ## Node with a single expression (e.g., FilterExec) - /// ```ignore - /// fn apply_expressions( - /// &self, - /// f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - /// ) -> Result { - /// f(self.predicate.as_ref()) - /// } - /// ``` - /// - /// ## Node with multiple expressions (e.g., ProjectionExec, JoinExec) - /// - /// Use [`TreeNodeRecursion::visit_sibling`] when iterating over multiple - /// expressions. This correctly propagates [`TreeNodeRecursion::Stop`]: if - /// `f` returns `Stop` for an earlier expression, `visit_sibling` short-circuits - /// and skips the remaining ones. - /// ```ignore - /// fn apply_expressions( - /// &self, - /// f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - /// ) -> Result { - /// let mut tnr = TreeNodeRecursion::Continue; - /// for expr in &self.expressions { - /// tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - /// } - /// Ok(tnr) - /// } - /// ``` - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result; - /// Returns a new `ExecutionPlan` where all existing children were replaced /// by the `children`, in order fn with_new_children( @@ -1640,13 +1564,6 @@ mod tests { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, @@ -1702,13 +1619,6 @@ mod tests { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, @@ -1732,109 +1642,6 @@ mod tests { } } - /// A test node that holds a fixed list of expressions, used to test - /// `apply_expressions` behavior. - #[derive(Debug)] - struct MultiExprExec { - exprs: Vec>, - } - - impl DisplayAs for MultiExprExec { - fn fmt_as( - &self, - _t: DisplayFormatType, - _f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - unimplemented!() - } - } - - impl ExecutionPlan for MultiExprExec { - fn name(&self) -> &'static str { - "MultiExprExec" - } - - fn properties(&self) -> &Arc { - unimplemented!() - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - unimplemented!() - } - - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for expr in &self.exprs { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - Ok(tnr) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!() - } - - fn partition_statistics( - &self, - _partition: Option, - ) -> Result> { - unimplemented!() - } - } - - /// Returns a simple literal `Arc` for use in tests. - fn lit_expr(val: i64) -> Arc { - use datafusion_physical_expr::expressions::Literal; - Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some( - val, - )))) - } - - /// `apply_expressions` visits all expressions when `f` always returns `Continue`. - #[test] - fn test_apply_expressions_continue_visits_all() -> Result<()> { - let plan = MultiExprExec { - exprs: vec![lit_expr(1), lit_expr(2), lit_expr(3)], - }; - let mut visited = 0usize; - plan.apply_expressions(&mut |_expr| { - visited += 1; - Ok(TreeNodeRecursion::Continue) - })?; - assert_eq!(visited, 3); - Ok(()) - } - - #[test] - fn test_apply_expressions_stop_halts_early() -> Result<()> { - let plan = MultiExprExec { - exprs: vec![lit_expr(1), lit_expr(2), lit_expr(3)], - }; - let mut visited = 0usize; - let tnr = plan.apply_expressions(&mut |_expr| { - visited += 1; - Ok(TreeNodeRecursion::Stop) - })?; - // Only the first expression is visited; the rest are skipped. - assert_eq!(visited, 1); - assert_eq!(tnr, TreeNodeRecursion::Stop); - Ok(()) - } - #[test] fn test_execution_plan_name() { let schema1 = Arc::new(Schema::empty()); diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 617a1a6cdaf53..98eac3d28b5df 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -26,10 +26,9 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::display::StringifiedPlan; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use log::trace; @@ -117,13 +116,6 @@ impl ExecutionPlan for ExplainExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index c485e181f3826..b3b107dc580df 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -54,7 +54,6 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema, }; @@ -521,13 +520,6 @@ impl ExecutionPlan for FilterExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - f(self.predicate.as_ref()) - } - fn maintains_input_order(&self) -> Vec { // Tell optimizer this operator doesn't reorder its input vec![true] diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index ab66955dc6034..6661d2782b212 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -42,13 +42,11 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::concat_batches; use arrow::datatypes::{Fields, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ JoinType, Result, ScalarValue, assert_eq_or_internal_err, internal_err, }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::equivalence::join_equivalence_properties; use async_trait::async_trait; @@ -284,14 +282,6 @@ impl ExecutionPlan for CrossJoinExec { Some(self.metrics.clone_inner()) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // CrossJoin has no join conditions or expressions - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 3cdd60d7ab3c8..f7391feb29cc3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -72,7 +72,6 @@ use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_schema::{DataType, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, @@ -1251,30 +1250,6 @@ impl ExecutionPlan for HashJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to join key expressions from both sides - let mut tnr = TreeNodeRecursion::Continue; - for (left, right) in &self.on { - tnr = tnr.visit_sibling(|| f(left.as_ref()))?; - tnr = tnr.visit_sibling(|| f(right.as_ref()))?; - } - - // Apply to join filter expression if present - if let Some(filter) = &self.filter { - tnr = tnr.visit_sibling(|| f(filter.expression().as_ref()))?; - } - - // Apply to dynamic filter expression if present - if let Some(df) = &self.dynamic_filter { - tnr = tnr.visit_sibling(|| f(df.filter.as_ref()))?; - } - - Ok(tnr) - } - /// Creates a new HashJoinExec with different children while preserving configuration. /// /// This method is called during query optimization when the optimizer creates new diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index feaf344200ac1..15af23b447836 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -60,7 +60,6 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::DataType; use datafusion_common::cast::as_boolean_array; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ JoinSide, Result, ScalarValue, Statistics, arrow_err, assert_eq_or_internal_err, internal_datafusion_err, internal_err, project_schema, unwrap_or_internal_err, @@ -580,17 +579,6 @@ impl ExecutionPlan for NestedLoopJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn crate::PhysicalExpr) -> Result, - ) -> Result { - // Apply to join filter expressions if present - if let Some(filter) = &self.filter { - f(filter.expression().as_ref())?; - } - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 2b20089f8e221..50e9252a21131 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -23,7 +23,6 @@ use arrow::{ }; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::not_impl_err; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{JoinSide, Result, internal_err}; use datafusion_execution::{ SendableRecordBatchStream, @@ -508,14 +507,6 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { vec![&self.buffered, &self.streamed] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to the two expressions being compared in the range predicate - f(self.on.0.as_ref())?.visit_sibling(|| f(self.on.1.as_ref())) - } - fn required_input_distribution(&self) -> Vec { vec![ Distribution::SinglePartition, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 3f309431614a4..9e87b52696a57 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -45,7 +45,6 @@ use crate::{ use arrow::compute::SortOptions; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err, internal_err, plan_err, @@ -450,23 +449,6 @@ impl ExecutionPlan for SortMergeJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn crate::PhysicalExpr) -> Result, - ) -> Result { - // Apply to join keys from both sides - let mut tnr = TreeNodeRecursion::Continue; - for (left, right) in &self.on { - tnr = tnr.visit_sibling(|| f(left.as_ref()))?; - tnr = tnr.visit_sibling(|| f(right.as_ref()))?; - } - // Apply to join filter expressions if present - if let Some(filter) = &self.filter { - tnr = tnr.visit_sibling(|| f(filter.expression().as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 34af88ea4027b..ef92964fadf84 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -66,7 +66,6 @@ use arrow::compute::concat_batches; use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::bisect; use datafusion_common::{ HashSet, JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err, @@ -460,23 +459,6 @@ impl ExecutionPlan for SymmetricHashJoinExec { vec![&self.left, &self.right] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn crate::PhysicalExpr) -> Result, - ) -> Result { - // Apply to join keys from both sides - let mut tnr = TreeNodeRecursion::Continue; - for (left, right) in &self.on { - tnr = tnr.visit_sibling(|| f(left.as_ref()))?; - tnr = tnr.visit_sibling(|| f(right.as_ref()))?; - } - // Apply to join filter expressions if present - if let Some(filter) = &self.filter { - tnr = tnr.visit_sibling(|| f(filter.expression().as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 223a476493b39..7f42c33a79ca0 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -34,11 +34,10 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_expr::LexOrdering; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -174,20 +173,6 @@ impl ExecutionPlan for GlobalLimitExec { vec![false] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to required ordering expressions if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = &self.required_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn with_new_children( self: Arc, mut children: Vec>, @@ -358,20 +343,6 @@ impl ExecutionPlan for LocalLimitExec { vec![true] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to required ordering expressions if present - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = &self.required_ordering { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, @@ -565,6 +536,7 @@ mod tests { use arrow::array::RecordBatchOptions; use arrow::datatypes::Schema; use datafusion_common::stats::Precision; + use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::col; #[tokio::test] diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index e172ef4463ec4..ad54905f474aa 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -32,11 +32,10 @@ use crate::{ use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, assert_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryReservation; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::Stream; @@ -312,13 +311,6 @@ impl ExecutionPlan for LazyMemoryExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index eca017cde9d0c..041ef4666658d 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -1034,7 +1034,6 @@ mod tests { use std::fmt; use crate::execution_plan::{Boundedness, EmissionType}; - use datafusion_common::tree_node::TreeNodeRecursion; fn make_schema() -> Arc { Arc::new(Schema::new(vec![ @@ -1114,13 +1113,6 @@ mod tests { &self.cache } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, @@ -1222,13 +1214,6 @@ mod tests { self.input.properties() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index ae8e73cd74ade..b99f9a93045fb 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -29,11 +29,9 @@ use crate::{ use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions}; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_expr::PhysicalExpr; use log::trace; @@ -137,13 +135,6 @@ impl ExecutionPlan for PlaceholderRowExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e5b91fbb1c5d4..d49522dfd2989 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -312,17 +312,6 @@ impl ExecutionPlan for ProjectionExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for proj_expr in self.projector.projection().as_ref().iter() { - tnr = tnr.visit_sibling(|| f(proj_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index c160f9a0dc763..f34aac3744557 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -37,12 +37,10 @@ use arrow::array::{BooleanArray, BooleanBuilder}; use arrow::compute::filter_record_batch; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, internal_datafusion_err, not_impl_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::{Stream, StreamExt, ready}; @@ -154,13 +152,6 @@ impl ExecutionPlan for RecursiveQueryExec { vec![&self.static_term, &self.recursive_term] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - // TODO: control these hints and see whether we can // infer some from the child plans (static/recursive terms). fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5d87836ba518b..465ca4a99e961 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -50,7 +50,6 @@ use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::transpose; use datafusion_common::{ ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err, @@ -1185,21 +1184,6 @@ impl ExecutionPlan for RepartitionExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to hash partition expressions if this is a hash repartition - if let Partitioning::Hash(exprs, _) = self.partitioning() { - let mut tnr = TreeNodeRecursion::Continue; - for expr in exprs { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - return Ok(tnr); - } - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/scalar_subquery.rs b/datafusion/physical-plan/src/scalar_subquery.rs index 82421d66dee9e..25f7332f95272 100644 --- a/datafusion/physical-plan/src/scalar_subquery.rs +++ b/datafusion/physical-plan/src/scalar_subquery.rs @@ -27,11 +27,9 @@ use std::fmt; use std::sync::Arc; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, ScalarValue, Statistics, exec_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex}; -use datafusion_physical_expr::PhysicalExpr; use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties}; use crate::joins::utils::{OnceAsync, OnceFut}; @@ -225,13 +223,6 @@ impl ExecutionPlan for ScalarSubqueryExec { ))) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn maintains_input_order(&self) -> Vec { // Only the main input (first child); subquery children don't contribute // to ordering. @@ -388,13 +379,6 @@ mod tests { ))) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index abd9ebb142a66..3bf16af36c62b 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -69,10 +69,9 @@ use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::evaluate_partition_ranges; use datafusion_execution::{RecordBatchStream, TaskContext}; -use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_expr::LexOrdering; use futures::{Stream, StreamExt, ready}; use log::trace; @@ -284,17 +283,6 @@ impl ExecutionPlan for PartialSortExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index f4c2585ea790d..fe876eeddf7f2 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -35,7 +35,6 @@ use arrow::array::{RecordBatch, UInt32Array}; use arrow::compute::{BatchCoalescer, take_record_batch}; use arrow::datatypes::SchemaRef; use arrow::row::{OwnedRow, RowConverter}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{HashMap, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -332,17 +331,6 @@ impl ExecutionPlan for PartitionedTopKExec { )?)) } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 90d4b5ec12f91..f715de0b5964b 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -59,7 +59,6 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ DataFusionError, Result, assert_or_internal_err, internal_datafusion_err, unwrap_or_internal_err, @@ -1151,25 +1150,6 @@ impl ExecutionPlan for SortExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to sort expressions - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - - // Apply to dynamic filter expression if present (when fetch is Some, TopK mode) - if let Some(filter) = &self.filter { - let filter_guard = filter.read(); - tnr = tnr.visit_sibling(|| f(filter_guard.expr().as_ref()))?; - } - - Ok(tnr) - } - fn benefits_from_input_partitioning(&self) -> Vec { vec![false] } @@ -1530,13 +1510,6 @@ mod tests { Ok(self) } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 13c28ccb10991..09570f14ba734 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -30,11 +30,9 @@ use crate::{ check_if_same_properties, }; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; use crate::execution_plan::{EvaluationType, SchedulingType}; @@ -287,17 +285,6 @@ impl ExecutionPlan for SortPreservingMergeExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for sort_expr in &self.expr { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - Ok(tnr) - } - fn with_new_children( self: Arc, mut children: Vec>, @@ -1419,12 +1406,6 @@ mod tests { fn children(&self) -> Vec<&Arc> { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 250eb59f19b87..cdf4b08f718c6 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -33,10 +33,8 @@ use crate::stream::RecordBatchStreamAdapter; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, internal_err, plan_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use async_trait::async_trait; @@ -245,13 +243,6 @@ impl ExecutionPlan for StreamingTableExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 4c4724e4dcc4f..a6e76cebcdee2 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -35,7 +35,6 @@ use crate::{DisplayAs, DisplayFormatType, PlanProperties}; use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema, }; @@ -45,9 +44,7 @@ use datafusion_physical_expr::equivalence::{ }; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; -use datafusion_physical_expr::{ - EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr, -}; +use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning}; use futures::{Future, FutureExt}; @@ -140,20 +137,6 @@ impl ExecutionPlan for TestMemoryExec { Vec::new() } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - // Apply to all sort information orderings - let mut tnr = TreeNodeRecursion::Continue; - for ordering in &self.sort_information { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 200223b9b660a..e162571e32261 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -35,10 +35,9 @@ use std::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result, internal_err}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr::EquivalenceProperties; use futures::Stream; use tokio::sync::Barrier; @@ -196,13 +195,6 @@ impl ExecutionPlan for MockExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, @@ -436,13 +428,6 @@ impl ExecutionPlan for BarrierExec { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - /// Returns a stream which yields data fn execute( &self, @@ -575,13 +560,6 @@ impl ExecutionPlan for ErrorExec { unimplemented!() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - /// Returns a stream which yields data fn execute( &self, @@ -661,13 +639,6 @@ impl ExecutionPlan for StatisticsExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, @@ -775,13 +746,6 @@ impl ExecutionPlan for BlockingExec { internal_err!("Children cannot be replaced in {self:?}") } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn execute( &self, _partition: usize, @@ -917,13 +881,6 @@ impl ExecutionPlan for PanicExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index ec9ea376e0b6d..3ea2eb5402fe5 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -49,7 +49,6 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::NdvFallback; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, }; @@ -269,13 +268,6 @@ impl ExecutionPlan for UnionExec { self.inputs.iter().collect() } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, @@ -589,13 +581,6 @@ impl ExecutionPlan for InterleaveExec { vec![false; self.inputs().len()] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 3a4b9d7232f4d..c31d0dd23fa68 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -44,7 +44,6 @@ use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_ord::cmp::lt; use async_trait::async_trait; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err, internal_err, @@ -239,13 +238,6 @@ impl ExecutionPlan for UnnestExec { vec![&self.input] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index f442bcea94be2..6c6b26c9cf49f 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -48,7 +48,6 @@ use arrow::{ }; use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::{ evaluate_partition_ranges, get_at_indices, get_row_at_idx, }; @@ -321,19 +320,6 @@ impl ExecutionPlan for BoundedWindowAggExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for window_expr in &self.window_expr { - for expr in window_expr.expressions() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - Ok(tnr) - } - fn required_input_ordering(&self) -> Vec> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 9e8fc8a6ebb62..ee3b071fc9167 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -41,7 +41,6 @@ use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::{evaluate_partition_ranges, transpose}; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; @@ -222,19 +221,6 @@ impl ExecutionPlan for WindowAggExec { vec![&self.input] } - fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for window_expr in &self.window_expr { - for expr in window_expr.expressions() { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - } - Ok(tnr) - } - fn maintains_input_order(&self) -> Vec { vec![true] } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 0855dbf2fd635..28b9c8ddc704c 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -31,11 +31,10 @@ use crate::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_datafusion_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryReservation; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; /// A vector of record batches with a memory reservation. #[derive(Debug)] @@ -186,13 +185,6 @@ impl ExecutionPlan for WorkTableExec { vec![] } - fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, - ) -> Result { - Ok(TreeNodeRecursion::Continue) - } - fn with_new_children( self: Arc, _: Vec>, diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 81b2d131e65c3..540782e3e8bf7 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -766,7 +766,6 @@ impl DatePartitionedTable { # fn children(&self) -> Vec<&Arc> { vec![] } # fn with_new_children(self: Arc, _: Vec>) -> Result> { Ok(self) } # fn execute(&self, _: usize, _: Arc) -> Result { todo!() } -# fn apply_expressions(&self, _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result) -> Result { Ok(TreeNodeRecursion::Continue) } # } ``` @@ -910,13 +909,6 @@ impl ExecutionPlan for CountingExec { batch_stream, ))) } - -# fn apply_expressions( -# &self, -# _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -# ) -> Result { -# Ok(TreeNodeRecursion::Continue) -# } } ``` diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 0c3bf20a91ed5..8245793ec07de 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -169,73 +169,6 @@ where string types are preferred (`UNION`, `CASE THEN/ELSE`, `NVL2`). string-preferring behavior - Crates that call `get_coerce_type_for_case_expression` -### `ExecutionPlan::apply_expressions` is now a required method - -`apply_expressions` has been added as a **required** method on the `ExecutionPlan` trait (no default implementation). The same applies to the `FileSource` and `DataSource` traits. Any custom implementation of these traits must now implement `apply_expressions`. - -**Who is affected:** - -- Users who implement custom `ExecutionPlan` nodes -- Users who implement custom `FileSource` or `DataSource` sources - -**Migration guide:** - -Add `apply_expressions` to your implementation. Call `f` on each top-level `PhysicalExpr` your node owns, using `visit_sibling` to correctly propagate `TreeNodeRecursion`: - -**Node with no expressions:** - -```rust,ignore -fn apply_expressions( - &self, - _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - Ok(TreeNodeRecursion::Continue) -} -``` - -**Node with a single expression:** - -```rust,ignore -fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - f(self.predicate.as_ref()) -} -``` - -**Node with multiple expressions:** - -```rust,ignore -fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - for expr in &self.expressions { - tnr = tnr.visit_sibling(|| f(expr.as_ref()))?; - } - Ok(tnr) -} -``` - -**Node whose only expressions are in `output_ordering()` (e.g. a synthetic test node with no owned expression fields):** - -```rust,ignore -fn apply_expressions( - &self, - f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, -) -> Result { - let mut tnr = TreeNodeRecursion::Continue; - if let Some(ordering) = self.cache.output_ordering() { - for sort_expr in ordering { - tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; - } - } - Ok(tnr) -} -``` - ### `ExecutionPlan::partition_statistics` now returns `Arc` `ExecutionPlan::partition_statistics` now returns `Result>` instead of `Result`. This avoids cloning `Statistics` when it is shared across multiple consumers.