Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use crate::error::{_plan_err, _schema_err, DataFusionError, Result};
use crate::{
Expand Down Expand Up @@ -129,6 +129,13 @@ impl DFSchema {
}
}

/// Returns a reference to a shared empty [`DFSchema`].
pub fn empty_ref() -> &'static DFSchemaRef {
static EMPTY: LazyLock<DFSchemaRef> =
LazyLock::new(|| Arc::new(DFSchema::empty()));
&EMPTY
}

/// Return a reference to the inner Arrow [`Schema`]
///
/// Note this does not have the qualifier information
Expand Down
79 changes: 70 additions & 9 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,9 @@ pub trait TreeNodeContainer<'a, T: 'a>: Sized {
) -> Result<Transformed<Self>>;
}

impl<'a, T: 'a, C: TreeNodeContainer<'a, T>> TreeNodeContainer<'a, T> for Box<C> {
impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Default> TreeNodeContainer<'a, T>
for Box<C>
{
fn apply_elements<F: FnMut(&'a T) -> Result<TreeNodeRecursion>>(
&'a self,
f: F,
Expand All @@ -805,14 +807,24 @@ impl<'a, T: 'a, C: TreeNodeContainer<'a, T>> TreeNodeContainer<'a, T> for Box<C>
}

fn map_elements<F: FnMut(T) -> Result<Transformed<T>>>(
self,
mut self,
f: F,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimizing this API is great, but I wonder two things:

  • Do we call this API too much in some passes? i.e. should we make some optimizer pass more efficient by avoiding / reducing the need to call this API at all in some places?
  • Are we using the "wrong" abstractions that inherently lead to inneficient code? (I.e. lot's of closures, branches, nesting -> hard to optimize for compiler)

) -> Result<Transformed<Self>> {
(*self).map_elements(f)?.map_data(|c| Ok(Self::new(c)))
// Rewrite in place so the existing heap allocation can be reused.
// `mem::take` hands the inner `C` to `f` while leaving
// `C::default()` in the slot, so an unwinding drop finds a valid
// `C` even if `f` panics or the `?` short-circuits.
let inner = std::mem::take(&mut *self);
Ok(inner.map_elements(f)?.update_data(|c| {
*self = c;
self
}))
}
}

impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Clone> TreeNodeContainer<'a, T> for Arc<C> {
impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Clone + Default> TreeNodeContainer<'a, T>
for Arc<C>
{
fn apply_elements<F: FnMut(&'a T) -> Result<TreeNodeRecursion>>(
&'a self,
f: F,
Expand All @@ -821,12 +833,18 @@ impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Clone> TreeNodeContainer<'a, T> fo
}

fn map_elements<F: FnMut(T) -> Result<Transformed<T>>>(
self,
mut self,
f: F,
) -> Result<Transformed<Self>> {
Arc::unwrap_or_clone(self)
.map_elements(f)?
.map_data(|c| Ok(Arc::new(c)))
// Rewrite in place using the same `mem::take` strategy as
// `Box<C>::map_elements`. `Arc::make_mut` gives us exclusive
// access (cloning `C` first if we were sharing), after which
// `get_mut` is infallible.
let inner = std::mem::take(Arc::make_mut(&mut self));
Ok(inner.map_elements(f)?.update_data(|c| {
*Arc::get_mut(&mut self).unwrap() = c;
self
}))
}
}

Expand Down Expand Up @@ -1335,14 +1353,15 @@ impl<T: ConcreteTreeNode> TreeNode for T {
pub(crate) mod tests {
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;

use crate::Result;
use crate::tree_node::{
Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion, TreeNodeRewriter,
TreeNodeVisitor,
};

#[derive(Debug, Eq, Hash, PartialEq, Clone)]
#[derive(Debug, Default, Eq, Hash, PartialEq, Clone)]
pub struct TestTreeNode<T> {
pub(crate) children: Vec<TestTreeNode<T>>,
pub(crate) data: T,
Expand Down Expand Up @@ -2431,4 +2450,46 @@ pub(crate) mod tests {

item.visit(&mut visitor).unwrap();
}

#[test]
fn box_map_elements_reuses_allocation() {
let boxed = Box::new(TestTreeNode::new_leaf(42i32));
let before: *const TestTreeNode<i32> = &*boxed;
let out = boxed.map_elements(|n| Ok(Transformed::no(n))).unwrap();
let after: *const TestTreeNode<i32> = &*out.data;
assert_eq!(after, before);
}

#[test]
fn arc_map_elements_reuses_allocation_when_unique() {
let arc = Arc::new(TestTreeNode::new_leaf(42i32));
let before = Arc::as_ptr(&arc);
let out = arc.map_elements(|n| Ok(Transformed::no(n))).unwrap();
assert_eq!(Arc::as_ptr(&out.data), before);
}

#[test]
fn arc_map_elements_clones_when_shared() {
// When the input `Arc` is shared, `make_mut` clones into a fresh
// allocation, so the reuse optimization does not apply.
let arc = Arc::new(TestTreeNode::new_leaf(42i32));
let _keepalive = Arc::clone(&arc);
let before = Arc::as_ptr(&arc);
let out = arc.map_elements(|n| Ok(Transformed::no(n))).unwrap();
assert_ne!(Arc::as_ptr(&out.data), before);
}

#[test]
fn box_map_elements_panic() {
use std::panic::{AssertUnwindSafe, catch_unwind};
let boxed = Box::new(TestTreeNode::new_leaf(42i32));
let result = catch_unwind(AssertUnwindSafe(|| {
boxed
.map_elements(|_: TestTreeNode<i32>| -> Result<_> {
panic!("simulated panic during rewrite")
})
.ok()
}));
assert!(result.is_err());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't verify the actual panic-safety invariant — that dropping an unwound Box doesn't double-free or access garbage

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, although I'm not sure there's a good way to validate that in a standard unit test (unless we run under miri or similar). wdyt?

}
}
5 changes: 4 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,12 @@ pub enum LogicalPlan {

impl Default for LogicalPlan {
fn default() -> Self {
// `Default` is used as a transient placeholder on hot paths (e.g.
// `Box`/`Arc` `map_elements`), so use a shared empty schema to avoid
// allocating.
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
schema: Arc::clone(DFSchema::empty_ref()),
})
}
}
Expand Down
7 changes: 2 additions & 5 deletions datafusion/expr/src/logical_plan/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datafusion_common::metadata::format_type_and_metadata;
use datafusion_common::{DFSchema, DFSchemaRef};
use itertools::Itertools as _;
use std::fmt::{self, Display};
use std::sync::{Arc, LazyLock};
use std::sync::Arc;

use crate::{Expr, LogicalPlan, expr_vec_fmt};

Expand Down Expand Up @@ -55,10 +55,7 @@ impl Statement {
/// Get a reference to the logical plan's schema
pub fn schema(&self) -> &DFSchemaRef {
// Statements have an unchanging empty schema.
static STATEMENT_EMPTY_SCHEMA: LazyLock<DFSchemaRef> =
LazyLock::new(|| Arc::new(DFSchema::empty()));

&STATEMENT_EMPTY_SCHEMA
DFSchema::empty_ref()
}

/// Return a descriptive string describing the type of this
Expand Down
11 changes: 8 additions & 3 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl TreeNode for Expr {
/// indicating whether the expression was transformed or left unchanged.
fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
mut f: F,
f: F,
) -> Result<Transformed<Self>> {
Ok(match self {
// TODO: remove the next line after `Expr::Wildcard` is removed
Expand Down Expand Up @@ -150,8 +150,13 @@ impl TreeNode for Expr {
relation,
name,
metadata,
}) => f(*expr)?.update_data(|e| {
e.alias_qualified_with_metadata(relation, name, metadata)
}) => expr.map_elements(f)?.update_data(|expr| {
Expr::Alias(Alias {
expr,
relation,
name,
metadata,
})
}),
Expr::InSubquery(InSubquery {
expr,
Expand Down
25 changes: 25 additions & 0 deletions docs/source/library-user-guide/upgrading/54.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,28 @@ SELECT CAST(approx_percentile_cont(quantity, 0.5) AS BIGINT) FROM orders;
```

[#21074]: https://github.com/apache/datafusion/pull/21074

### `Box<C>` and `Arc<C>` `TreeNodeContainer` impls now require `C: Default`

The generic `TreeNodeContainer` implementations for `Box<C>` and `Arc<C>` now
require `C: Default`. This change was necessary as part of optimizing tree
rewriting to reduce heap allocations.

**Who is affected:**

- Users that implement `TreeNodeContainer` on a custom type and wrap it in
`Box` or `Arc` when walking trees.

**Migration guide:**

Add a `Default` implementation to your type. The default value is used as a
temporary placeholder during query optimization, so when possible, pick a cheap,
allocation-free variant:

```rust,ignore
impl Default for MyTreeNode {
fn default() -> Self {
MyTreeNode::Leaf // or whichever variant is cheapest to construct
}
}
```
Loading