diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index e0cdec9e2c08..99302095f014 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -858,7 +858,7 @@ impl LogicalPlan { /// Returns true if any expression in this node contains a subquery /// (Exists, InSubquery, SetComparison, or ScalarSubquery). - fn has_subquery_expressions(&self) -> bool { + pub fn has_subquery_expressions(&self) -> bool { let mut found = false; let _ = self.apply_expressions(|expr| { if found { diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 2775d62144c5..02a59545f1ed 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -28,7 +28,7 @@ use crate::utils::NamePreserver; use datafusion_common::alias::AliasGenerator; use datafusion_common::cse::{CSE, CSEController, FoundCommonNodes}; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::Transformed; use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, qualified_name}; use datafusion_expr::expr::{Alias, HigherOrderFunction, ScalarFunction}; use datafusion_expr::logical_plan::{ @@ -586,12 +586,19 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) => { // This rule handles recursion itself in a `ApplyOrder::TopDown` like - // manner. Process uncorrelated subqueries in expressions - // (e.g., Expr::ScalarSubquery), then direct children. - plan.map_uncorrelated_subqueries(|c| self.rewrite(c, config))? - .transform_sibling(|plan| { - plan.map_children(|c| self.rewrite(c, config)) - })? + // manner. In-place recursion via `Arc::make_mut` + `mem::take` + // — when a child returns `Transformed::no` its `Arc` is reused + // without rebuilding the parent variant. Same shape as the + // previous `map_uncorrelated_subqueries` + `map_children` pair + // (subqueries first, then direct children). + let mut plan = plan; + let changed = + crate::optimizer::rewrite_children_in_place(&mut plan, self, config)?; + if changed { + Transformed::yes(plan) + } else { + Transformed::no(plan) + } } }; diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 95b70da443d8..ae1e385fb64f 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -20,7 +20,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use std::sync::Arc; use crate::join_key_set::JoinKeySet; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{NullEquality, Result}; use datafusion_expr::expr::{BinaryExpr, Expr}; use datafusion_expr::logical_plan::{ @@ -251,18 +251,19 @@ fn rewrite_children( plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - // Process uncorrelated subqueries in expressions, then direct children. - let transformed_plan = plan - .map_uncorrelated_subqueries(|input| optimizer.rewrite(input, config))? - .transform_sibling(|plan| { - plan.map_children(|input| optimizer.rewrite(input, config)) - })?; - - // recompute schema if the plan was transformed - if transformed_plan.transformed { - transformed_plan.map_data(|plan| plan.recompute_schema()) + // In-place recursion via `Arc::make_mut` + `std::mem::take`. When the + // recursive `optimizer.rewrite(child, config)` call returns + // `Transformed::no`, the child's `Arc` is reused without rebuilding + // the parent's `LogicalPlan` variant — same shape as the + // ownership-based `map_uncorrelated_subqueries` + `map_children` it + // replaces, just without the per-node destructure/restructure cost. + let mut plan = plan; + let changed = + crate::optimizer::rewrite_children_in_place(&mut plan, optimizer, config)?; + if changed { + plan.recompute_schema().map(Transformed::yes) } else { - Ok(transformed_plan) + Ok(Transformed::no(plan)) } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index a765d7f27a51..39c8b5ec1d05 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -28,15 +28,13 @@ use log::{debug, warn}; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, -}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err}; use datafusion_expr::dml::CopyTo; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{ Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, Distinct, - DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit, Projection, + DistinctOn, DmlStatement, Explain, Extension, Filter, Join, Limit, Projection, RecursiveQuery, Repartition, Sort, Statement, Subquery, SubqueryAlias, Union, Unnest, Window, }; @@ -327,47 +325,6 @@ impl Optimizer { } } -/// Recursively rewrites LogicalPlans -struct Rewriter<'a> { - apply_order: ApplyOrder, - rule: &'a dyn OptimizerRule, - config: &'a dyn OptimizerConfig, -} - -impl<'a> Rewriter<'a> { - fn new( - apply_order: ApplyOrder, - rule: &'a dyn OptimizerRule, - config: &'a dyn OptimizerConfig, - ) -> Self { - Self { - apply_order, - rule, - config, - } - } -} - -impl TreeNodeRewriter for Rewriter<'_> { - type Node = LogicalPlan; - - fn f_down(&mut self, node: LogicalPlan) -> Result> { - if self.apply_order == ApplyOrder::TopDown { - self.rule.rewrite(node, self.config) - } else { - Ok(Transformed::no(node)) - } - } - - fn f_up(&mut self, node: LogicalPlan) -> Result> { - if self.apply_order == ApplyOrder::BottomUp { - self.rule.rewrite(node, self.config) - } else { - Ok(Transformed::no(node)) - } - } -} - /// Applies `f` to each child (input) of `plan` in place, using /// [`Arc::make_mut`] for copy-on-write semantics on `Arc` /// children. When the `Arc` refcount is 1 (the common case here) @@ -483,23 +440,98 @@ fn map_children_mut Result>( }) } +/// Like [`map_children_mut`], but additionally descends into the subquery +/// plans referenced from `Expr::ScalarSubquery` / `InSubquery` / `Exists` / +/// `SetComparison`. +/// +/// Direct children stay on the in-place path (`Arc::make_mut`, zero-cost +/// when refcount == 1). The subquery descent borrows the plan briefly via +/// `std::mem::take` to call the ownership-based [`LogicalPlan::map_subqueries`] +/// — that part allocates only when an expression actually contains a +/// subquery, and the per-node `has_subquery_expressions` fast-path inside +/// `map_subqueries` keeps subquery-free nodes free of allocation. +fn map_children_and_subqueries_mut Result>( + plan: &mut LogicalPlan, + mut f: F, +) -> Result { + let mut changed = map_children_mut(plan, &mut f)?; + + // Per-node fast path: skip the `mem::take` + `map_subqueries` roundtrip + // entirely when this node carries no subquery expressions. `map_subqueries` + // has the same check internally, but going through it still pays for the + // take/put and a `Transformed::no(self)` wrapper per node. Hot rules + // (every `ApplyOrder::None` rule that drives its own recursion) call this + // helper at every node, so eliminating the per-node roundtrip matters. + if !plan.has_subquery_expressions() { + return Ok(changed); + } + + let owned = std::mem::take(plan); + let result = owned.map_subqueries(|mut sub| { + let sq_changed = f(&mut sub)?; + Ok(Transformed::new( + sub, + sq_changed, + TreeNodeRecursion::Continue, + )) + })?; + *plan = result.data; + changed |= result.transformed; + + Ok(changed) +} + +/// Returns true if any node in the plan tree (including subquery plans) +/// references a subquery in its expressions. +/// +/// Used as a one-shot whole-plan gate at the optimizer driver level so that +/// rules with [`ApplyOrder::TopDown`]/[`ApplyOrder::BottomUp`] can skip the +/// per-node `map_subqueries` descent entirely when the plan is +/// subquery-free. The check itself walks the plan once (O(plan size)), but +/// it pays for itself once the rule visits more than a handful of nodes +/// — and most plans run hundreds of node visits per rule × per pass. +fn plan_has_subqueries(plan: &LogicalPlan) -> bool { + let mut found = false; + let _ = plan.apply(|node| { + if node.has_subquery_expressions() { + found = true; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }); + found +} + /// Rewrites a plan tree in place using `Arc::make_mut` for /// copy-on-write semantics on `Arc` children. /// /// This avoids the `Arc::unwrap_or_clone` + `Arc::new` cycle that the -/// ownership-based `TreeNode::rewrite` performs at every child node. +/// ownership-based `TreeNode`-style rewrite performs at every child node. +/// +/// # Subquery handling +/// +/// `has_subqueries` is the caller-computed result of [`plan_has_subqueries`] +/// for the original plan, hoisted once per rule application. When `false` +/// the recursion uses [`map_children_mut`] directly and skips all subquery +/// machinery — that's the hot path for join-heavy / subquery-free plans +/// where the per-node `has_subquery_expressions` walk inside +/// [`map_children_and_subqueries_mut`] would otherwise dominate the +/// optimizer's per-rule cost. /// /// # Error semantics /// -/// On `Err`, `*plan` is left in an **unspecified** state and must not be used. -/// Note this is different than consuming APIs such as [`TreeNode::rewrite`] -/// where the original plan is freed and no longer available on error +/// On `Err`, `*plan` is left in an **unspecified** state and must not be +/// used. Note this is different than consuming APIs such as +/// `TreeNode::rewrite` where the original plan is freed and no longer +/// available on error. #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn rewrite_plan_in_place( plan: &mut LogicalPlan, apply_order: ApplyOrder, rule: &dyn OptimizerRule, config: &dyn OptimizerConfig, + has_subqueries: bool, ) -> Result { // f_down phase let mut changed = false; @@ -518,10 +550,20 @@ fn rewrite_plan_in_place( } } - // Recurse into children using Arc::make_mut (zero-cost when refcount == 1) - changed |= map_children_mut(plan, |child| { - rewrite_plan_in_place(child, apply_order, rule, config) - })?; + // Recurse into direct children using Arc::make_mut (zero-cost when + // refcount == 1). When the plan has no subqueries anywhere, skip the + // subquery-aware helper — that path's per-node check is the main + // source of regression on subquery-free workloads (join chains, wide + // filters / aggregates). + changed |= if has_subqueries { + map_children_and_subqueries_mut(plan, |child| { + rewrite_plan_in_place(child, apply_order, rule, config, true) + })? + } else { + map_children_mut(plan, |child| { + rewrite_plan_in_place(child, apply_order, rule, config, false) + })? + }; // f_up phase if apply_order == ApplyOrder::BottomUp { @@ -534,45 +576,35 @@ fn rewrite_plan_in_place( Ok(changed) } -/// Returns true if the plan contains any subquery expressions -/// (EXISTS, IN subquery, scalar subquery, set comparison). +/// In-place equivalent of the `map_uncorrelated_subqueries` + `map_children` +/// pattern used by rules with `apply_order == None` to drive their own +/// recursion (e.g. [`EliminateCrossJoin`], [`CommonSubexprEliminate`]). /// -/// Used to determine whether the more expensive `rewrite_with_subqueries` -/// traversal is needed. When the plan has no subqueries, the cheaper -/// `rewrite` traversal is sufficient since all plan nodes are reachable -/// via direct children. -fn plan_has_subqueries(plan: &LogicalPlan) -> bool { - let mut found = false; - let _ = plan.apply(|node| { - if found { - return Ok(TreeNodeRecursion::Stop); - } - node.apply_expressions(|expr| { - if found { - return Ok(TreeNodeRecursion::Stop); - } - expr.apply(|e| { - if matches!( - e, - Expr::Exists(_) - | Expr::InSubquery(_) - | Expr::SetComparison(_) - | Expr::ScalarSubquery(_) - ) { - found = true; - Ok(TreeNodeRecursion::Stop) - } else { - Ok(TreeNodeRecursion::Continue) - } - }) - })?; - Ok(if found { - TreeNodeRecursion::Stop - } else { - TreeNodeRecursion::Continue - }) - }); - found +/// Walks the plan's subquery plans and direct children, calling +/// `rule.rewrite(child, config)` on each via the [`Arc::make_mut`] + +/// [`std::mem::take`] bridge. When `rule.rewrite` returns +/// [`Transformed::no`] (the no-op case) the child's [`Arc`] is reused +/// without re-allocation, in contrast to the ownership-based path which +/// rebuilds the parent `LogicalPlan` variant on every recursion. +/// +/// Returns `Ok(true)` if any child was modified. +/// +/// # Schema +/// +/// Like the ownership-based `map_children`, this does **not** call +/// `recompute_schema` on the parent — callers should do so if any child +/// was changed (this matches the existing `rewrite_children` shape). +pub(crate) fn rewrite_children_in_place( + plan: &mut LogicalPlan, + rule: &dyn OptimizerRule, + config: &dyn OptimizerConfig, +) -> Result { + map_children_and_subqueries_mut(plan, |child| { + let owned = std::mem::take(child); + let result = rule.rewrite(owned, config)?; + *child = result.data; + Ok(result.transformed) + }) } impl Optimizer { @@ -604,14 +636,6 @@ impl Optimizer { while i < options.optimizer.max_passes { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); - // Check once per pass whether the plan contains subquery - // expressions. When there are no subqueries, we use the - // cheaper `rewrite` traversal instead of - // `rewrite_with_subqueries`, avoiding the per-node - // map_subqueries call that walks all expression trees - // via ownership-based transform_down. - let has_subqueries = plan_has_subqueries(&new_plan); - for rule in &self.rules { // If skipping failed rules, copy plan before attempting to rewrite // as rewriting is destructive @@ -623,47 +647,37 @@ impl Optimizer { let starting_schema = Arc::clone(new_plan.schema()); let result = match rule.apply_order() { - // optimizer handles recursion + // optimizer handles recursion: use in-place rewriting + // with `Arc::make_mut` for zero-cost CoW on children. + // Subquery descent is gated by a one-shot whole-plan + // check so subquery-free plans (the common case for + // join-heavy workloads) don't pay the per-node + // `has_subquery_expressions` walk on every rule + // iteration. + // + // On error `new_plan` is left in an unspecified + // state (see `rewrite_plan_in_place`); the result + // handling below discards it, restoring `prev_plan` + // when `skip_failed_rules` is set or propagating + // the error otherwise. Some(apply_order) => { - if has_subqueries { - // Plans with subqueries need the full - // rewrite_with_subqueries traversal to - // recurse into subquery plans. - new_plan.rewrite_with_subqueries( - &mut Rewriter::new( - apply_order, - rule.as_ref(), - config, - ), + let has_subqueries = plan_has_subqueries(&new_plan); + rewrite_plan_in_place( + &mut new_plan, + apply_order, + rule.as_ref(), + config, + has_subqueries, + ) + .map(|transformed| { + Transformed::new_transformed( + std::mem::take(&mut new_plan), + transformed, ) - } else { - // No subqueries: use in-place rewriting - // with Arc::make_mut for zero-cost CoW on - // children, avoiding Arc unwrap/rewrap. - // - // On error `new_plan` is left in an unspecified - // state (see `rewrite_plan_in_place`); the result - // handling below discards it, restoring `prev_plan` - // when `skip_failed_rules` is set or propagating - // the error otherwise. - rewrite_plan_in_place( - &mut new_plan, - apply_order, - rule.as_ref(), - config, - ) - .map(|transformed| { - Transformed::new_transformed( - std::mem::take(&mut new_plan), - transformed, - ) - }) - } + }) } // rule handles recursion itself - None => { - rule.rewrite(new_plan, config) - }, + None => rule.rewrite(new_plan, config), } .and_then(|tnr| { // run checks optimizer invariant checks, per optimizer rule applied @@ -1035,4 +1049,156 @@ mod tests { ))) } } + + // --------------------------------------------------------------- + // Tests for the in-place rewrite helpers + // (`map_children_and_subqueries_mut`, `rewrite_children_in_place`) + // --------------------------------------------------------------- + + use datafusion_expr::in_subquery; + + type VisitLog = Arc>>; + + /// Counts how many `LogicalPlan` nodes a closure visits. + fn counting_visitor() -> (VisitLog, impl FnMut(&mut LogicalPlan) -> Result) { + let log: VisitLog = Arc::new(Mutex::new(Vec::new())); + let log_for_closure = Arc::clone(&log); + let visitor = move |plan: &mut LogicalPlan| -> Result { + log_for_closure + .lock() + .unwrap() + .push(plan.display().to_string()); + Ok(false) // never mutate + }; + (log, visitor) + } + + /// `map_children_and_subqueries_mut` on a plan with no children and no + /// subqueries → no calls to `f`, `Ok(false)`. + #[test] + fn map_children_and_subqueries_mut_no_children() -> Result<()> { + let mut plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + let (log, visitor) = counting_visitor(); + let changed = super::map_children_and_subqueries_mut(&mut plan, visitor)?; + assert!(!changed); + assert_eq!(log.lock().unwrap().len(), 0); + Ok(()) + } + + /// `map_children_and_subqueries_mut` walks direct children (a single + /// `TableScan` under a `Filter`) and reports `Ok(false)` when the + /// closure doesn't mutate. + #[test] + fn map_children_and_subqueries_mut_walks_direct_children() -> Result<()> { + let scan = test_table_scan()?; + let mut plan = LogicalPlanBuilder::from(scan) + .filter(col("a").gt(lit(0)))? + .build()?; + + let (log, visitor) = counting_visitor(); + let changed = super::map_children_and_subqueries_mut(&mut plan, visitor)?; + assert!(!changed); + // `Filter.input` is the `TableScan` — exactly one child visited. + let log = log.lock().unwrap(); + assert_eq!(log.len(), 1, "expected 1 child visit, got: {log:?}"); + assert!( + log[0].contains("TableScan"), + "child should be the TableScan, got: {}", + log[0] + ); + Ok(()) + } + + /// `map_children_and_subqueries_mut` descends into a subquery plan + /// reachable from a `Filter`'s `IN (SELECT ...)` predicate. Without + /// the subquery descent it would visit only the outer `TableScan`. + #[test] + fn map_children_and_subqueries_mut_descends_into_subquery() -> Result<()> { + let inner = LogicalPlanBuilder::from(test_table_scan()?) + .project(vec![col("a")])? + .build()?; + let outer_scan = test_table_scan()?; + let mut plan = LogicalPlanBuilder::from(outer_scan) + .filter(in_subquery(col("a"), Arc::new(inner)))? + .build()?; + + let (log, visitor) = counting_visitor(); + let changed = super::map_children_and_subqueries_mut(&mut plan, visitor)?; + assert!(!changed); + let log = log.lock().unwrap(); + // 1 direct child (outer TableScan) + 1 subquery plan + assert_eq!(log.len(), 2, "expected 2 visits, got: {log:?}"); + Ok(()) + } + + /// `map_children_and_subqueries_mut` propagates `Ok(true)` when the + /// closure reports a change on any child. + #[test] + fn map_children_and_subqueries_mut_reports_changes() -> Result<()> { + let scan = test_table_scan()?; + let mut plan = LogicalPlanBuilder::from(scan) + .filter(col("a").gt(lit(0)))? + .build()?; + + // Closure that claims the child changed without actually mutating it. + let mut hits = 0usize; + let changed = super::map_children_and_subqueries_mut(&mut plan, |_| { + hits += 1; + Ok(true) + })?; + assert!(changed, "child claim of `true` should bubble up"); + assert_eq!(hits, 1); + Ok(()) + } + + /// `rewrite_children_in_place` drives recursion through a rule, + /// visiting each child via `rule.rewrite` and returning `true` when + /// any child was rewritten. Verifies the bridge between the + /// `&mut LogicalPlan` callback shape and the ownership-based + /// `OptimizerRule::rewrite` API. + #[test] + fn rewrite_children_in_place_drives_rule_recursion() -> Result<()> { + // Rule that records every plan it sees but never claims a change. + #[derive(Debug)] + struct RecordingRule { + seen: Arc>>, + } + impl OptimizerRule for RecordingRule { + fn name(&self) -> &str { + "recording" + } + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + self.seen.lock().unwrap().push(plan.display().to_string()); + Ok(Transformed::no(plan)) + } + } + + let scan = test_table_scan()?; + let mut plan = LogicalPlanBuilder::from(scan) + .filter(col("a").gt(lit(0)))? + .project(vec![col("a")])? + .build()?; + + let seen = Arc::new(Mutex::new(Vec::new())); + let rule = RecordingRule { + seen: Arc::clone(&seen), + }; + let config = OptimizerContext::new(); + let changed = super::rewrite_children_in_place(&mut plan, &rule, &config)?; + assert!(!changed); + // Outer plan is `Projection`; `rewrite_children_in_place` visits its + // direct children — that's the `Filter`. (It does NOT visit the + // root itself; rules call this from their own `rewrite_children`.) + let seen = seen.lock().unwrap(); + assert_eq!(seen.len(), 1, "expected 1 child visit, got: {seen:?}"); + assert!(seen[0].contains("Filter")); + Ok(()) + } }