diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index b7ba661d2343a..4711cf99b7d51 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -29,6 +29,7 @@ mod join_selection; #[expect(clippy::needless_pass_by_value)] mod limit_pushdown; mod limited_distinct_aggregation; +mod output_requirements; mod partition_statistics; mod projection_pushdown; mod pushdown_sort; diff --git a/datafusion/core/tests/physical_optimizer/output_requirements.rs b/datafusion/core/tests/physical_optimizer/output_requirements.rs new file mode 100644 index 0000000000000..846589104e4ca --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/output_requirements.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::physical_optimizer::test_utils::{parquet_exec, schema, sort_exec, sort_expr}; + +use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::output_requirements::OutputRequirements; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::get_plan_string; + +/// `OutputRequirements::new_add_mode()` must be idempotent: re-applying it to +/// its own output must not stack additional `OutputRequirementExec` wrappers. +/// +/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every +/// completed stage; without this guarantee, every replan adds another wrapper. +#[test] +fn add_mode_is_idempotent_on_bare_scan() { + // Exercises the path where `require_top_ordering_helper` returns + // `is_changed = false` and the rule adds a default (empty-requirement) + // wrapper. + assert_add_mode_idempotent(parquet_exec(schema())); +} + +#[test] +fn add_mode_is_idempotent_on_sorted_plan() { + // Exercises the path where the helper recognizes a top-level `SortExec` + // and produces a wrapper carrying that ordering requirement + // (`is_changed = true` branch). + let s = schema(); + let ordering: LexOrdering = [sort_expr("a", &s)].into(); + let plan = sort_exec(ordering, parquet_exec(Arc::clone(&s))); + assert_add_mode_idempotent(plan); +} + +fn assert_add_mode_idempotent(plan: Arc) { + let config = ConfigOptions::new(); + let rule = OutputRequirements::new_add_mode(); + + let once = rule.optimize(plan, &config).unwrap(); + let twice = rule.optimize(Arc::clone(&once), &config).unwrap(); + + assert_eq!( + get_plan_string(&once), + get_plan_string(&twice), + "second invocation of OutputRequirements::new_add_mode mutated the plan", + ); +} diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 81df6f943c15e..f5f3141fbcc71 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -63,7 +63,9 @@ impl OutputRequirements { /// Create a new rule which works in `Add` mode; i.e. it simply adds a /// top-level [`OutputRequirementExec`] into the physical plan to keep track /// of global ordering and distribution requirements if there are any. - /// Note that this rule should run at the beginning. + /// Note that this rule should run at the beginning. It is idempotent: when + /// invoked on a plan that is already topped by an `OutputRequirementExec`, + /// it returns the plan unchanged. pub fn new_add_mode() -> Self { Self { mode: RuleMode::Add, @@ -357,7 +359,15 @@ impl PhysicalOptimizerRule for OutputRequirements { /// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that /// global requirements are not lost during optimization. +/// +/// Idempotent: if the plan is already topped by an `OutputRequirementExec`, it +/// is returned unchanged so that re-running this rule (as adaptive execution +/// in datafusion-ballista AQE does after every completed stage, see +/// datafusion-ballista#1359) does not stack wrappers. fn require_top_ordering(plan: Arc) -> Result> { + if plan.downcast_ref::().is_some() { + return Ok(plan); + } let (new_plan, is_changed) = require_top_ordering_helper(plan)?; if is_changed { Ok(new_plan)