From cae813a17db7f2ea99ada5503fa6cb8978462fcf Mon Sep 17 00:00:00 2001 From: xuanyili Date: Mon, 18 May 2026 06:13:52 +0000 Subject: [PATCH] fix(physical-optimizer): make OutputRequirements idempotent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `OutputRequirements::new_add_mode()` was stacking a new `OutputRequirementExec` wrapper at the top of the plan on every invocation. `require_top_ordering_helper` descends through any operator that maintains input order and has no hard ordering requirement, and `OutputRequirementExec` itself qualifies — so on a second pass the helper walked past the existing wrapper, found no SortExec/SPM beneath, and `require_top_ordering` added a fresh wrapper above the old one. Fix: at the start of `require_top_ordering`, return the plan unchanged when it is already topped by an `OutputRequirementExec`. The existing wrapper was either added by this rule's prior run (in which case it is already correct) or inserted intentionally by the caller (in which case we should not disturb it). Motivation: adaptive execution in datafusion-ballista AQE (apache/datafusion-ballista#1359) re-runs the entire `PhysicalOptimizer` chain after every completed stage. Although the chain-level effect is masked when `OutputRequirements::new_remove_mode()` later strips the wrapper, the rule itself violates the idempotence property that `PhysicalOptimizerRule`s are expected to satisfy. Surfaced by the discovery harness added in a sibling PR. Adds `datafusion/core/tests/physical_optimizer/output_requirements.rs` with a focused test that invokes `new_add_mode` twice on a bare parquet scan and asserts structural equality. Fails before this fix (two stacked wrappers); passes after. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core/tests/physical_optimizer/mod.rs | 1 + .../physical_optimizer/output_requirements.rs | 65 +++++++++++++++++++ .../src/output_requirements.rs | 12 +++- 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 datafusion/core/tests/physical_optimizer/output_requirements.rs diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index b7ba661d2343a..4711cf99b7d51 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -29,6 +29,7 @@ mod join_selection; #[expect(clippy::needless_pass_by_value)] mod limit_pushdown; mod limited_distinct_aggregation; +mod output_requirements; mod partition_statistics; mod projection_pushdown; mod pushdown_sort; diff --git a/datafusion/core/tests/physical_optimizer/output_requirements.rs b/datafusion/core/tests/physical_optimizer/output_requirements.rs new file mode 100644 index 0000000000000..846589104e4ca --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/output_requirements.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::physical_optimizer::test_utils::{parquet_exec, schema, sort_exec, sort_expr}; + +use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::output_requirements::OutputRequirements; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::get_plan_string; + +/// `OutputRequirements::new_add_mode()` must be idempotent: re-applying it to +/// its own output must not stack additional `OutputRequirementExec` wrappers. +/// +/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every +/// completed stage; without this guarantee, every replan adds another wrapper. +#[test] +fn add_mode_is_idempotent_on_bare_scan() { + // Exercises the path where `require_top_ordering_helper` returns + // `is_changed = false` and the rule adds a default (empty-requirement) + // wrapper. + assert_add_mode_idempotent(parquet_exec(schema())); +} + +#[test] +fn add_mode_is_idempotent_on_sorted_plan() { + // Exercises the path where the helper recognizes a top-level `SortExec` + // and produces a wrapper carrying that ordering requirement + // (`is_changed = true` branch). + let s = schema(); + let ordering: LexOrdering = [sort_expr("a", &s)].into(); + let plan = sort_exec(ordering, parquet_exec(Arc::clone(&s))); + assert_add_mode_idempotent(plan); +} + +fn assert_add_mode_idempotent(plan: Arc) { + let config = ConfigOptions::new(); + let rule = OutputRequirements::new_add_mode(); + + let once = rule.optimize(plan, &config).unwrap(); + let twice = rule.optimize(Arc::clone(&once), &config).unwrap(); + + assert_eq!( + get_plan_string(&once), + get_plan_string(&twice), + "second invocation of OutputRequirements::new_add_mode mutated the plan", + ); +} diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 81df6f943c15e..f5f3141fbcc71 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -63,7 +63,9 @@ impl OutputRequirements { /// Create a new rule which works in `Add` mode; i.e. it simply adds a /// top-level [`OutputRequirementExec`] into the physical plan to keep track /// of global ordering and distribution requirements if there are any. - /// Note that this rule should run at the beginning. + /// Note that this rule should run at the beginning. It is idempotent: when + /// invoked on a plan that is already topped by an `OutputRequirementExec`, + /// it returns the plan unchanged. pub fn new_add_mode() -> Self { Self { mode: RuleMode::Add, @@ -357,7 +359,15 @@ impl PhysicalOptimizerRule for OutputRequirements { /// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that /// global requirements are not lost during optimization. +/// +/// Idempotent: if the plan is already topped by an `OutputRequirementExec`, it +/// is returned unchanged so that re-running this rule (as adaptive execution +/// in datafusion-ballista AQE does after every completed stage, see +/// datafusion-ballista#1359) does not stack wrappers. fn require_top_ordering(plan: Arc) -> Result> { + if plan.downcast_ref::().is_some() { + return Ok(plan); + } let (new_plan, is_changed) = require_top_ordering_helper(plan)?; if is_changed { Ok(new_plan)