Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 108 additions & 2 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,30 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
}
}

/// Returns the plan that provides this plan's public
/// [`ExecutionPlan`] downcast identity.
///
/// This hook is for wrapper nodes that delegate their public downcast
/// identity to another plan while adding cross-cutting behavior such as
/// instrumentation. The default implementation returns `None`, meaning this
/// plan's concrete type is used for type introspection.
///
/// Most `ExecutionPlan` implementations should use the default `None`;
/// override this only for wrapper plans that intentionally delegate their
/// public downcast identity to another plan.
///
/// The `is` and `downcast_ref` helpers follow the returned delegate instead
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to add a specific example here (like a wrapper node that wants to observe changes but still be treated as the underlying node when DataFusion checks for patterns using downcast 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's the unit test with DowncastDelegatingExec which already shows a possible implementation. Would you also like an inline comment?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking an inline comment so that if someone else is reading (just) the ExecutionPlan docs they can more quickly understand if they need to worry about implementing this method or not (probably not)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done: I amended the commit to be clearer in the comment.

/// of checking the current concrete type, making intermediate delegating
/// wrappers invisible to normal downcast-based inspection.
///
/// Implementations that opt in should return the delegate plan, not `self`.
///
/// This is independent from [`Self::children`] and should not be used for
/// plan traversal or optimizer rewrites.
fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
None
}

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
Arc::clone(self.properties().schema())
Expand Down Expand Up @@ -718,20 +742,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
impl dyn ExecutionPlan {
/// Returns `true` if the plan is of type `T`.
///
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
/// to it.
///
/// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
/// called on `Arc<dyn ExecutionPlan>` via auto-deref.
pub fn is<T: ExecutionPlan>(&self) -> bool {
(self as &dyn Any).is::<T>()
match self.downcast_delegate() {
Some(delegate) => delegate.is::<T>(),
None => (self as &dyn Any).is::<T>(),
}
Comment on lines +751 to +754
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this prevent you from downcasting to the wrapper type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is intentional to mimic the old as_any behavior. I've updated the PR description to be clear about this choice.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the earlier PR would still work for all use cases, right? Because you're always choosing to downcast to a specific type, it seems like it would enable downcasting to either (or many, depending on levels) type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous PR would stop on the first match, rather than go all the way to the leaf child. I agree this is being pedantic, as "real world" cases would probably never have a case where you'd need to match the leaf child but get interrupted by an intermediate node in a "wrapper chain". However, I still find the current implementation clearer and easier to reason about: the leaf node is what is returned.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we revert the hew :Any instead? I think it was just supposed to be a cleanup but we didn't realize it would have downstream impacts 🤔

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I wonder if there is some way to avoid using as_any / delegating logic at all ... But to do that we would have to remove all uses of as_any / downcasting I suspect

Copy link
Copy Markdown
Contributor Author

@geoffreyclaude geoffreyclaude May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we revert the new :Any instead? I think it was just supposed to be a cleanup but we didn't realize it would have downstream impacts 🤔

@alamb Honestly I prefer the new API with downcast_delegate: it's explicit and clearer.

Also I wonder if there is some way to avoid using as_any / delegating logic at all ... But to do that we would have to remove all uses of as_any / downcasting I suspect

That would be a pretty big breaking change as well, as the is a check is quite widespread.

}

/// Attempts to downcast this plan to a concrete type `T`, returning `None`
/// if the plan is not of that type.
///
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
/// to it.
///
/// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref,
/// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
/// downcast the `Arc` itself.
pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> {
(self as &dyn Any).downcast_ref()
match self.downcast_delegate() {
Some(delegate) => delegate.downcast_ref::<T>(),
None => (self as &dyn Any).downcast_ref(),
}
}
}

Expand Down Expand Up @@ -1642,6 +1678,58 @@ mod tests {
}
}

#[derive(Debug)]
struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>);

impl DisplayAs for DowncastDelegatingExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
_f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
unimplemented!()
}
}

impl ExecutionPlan for DowncastDelegatingExec {
fn name(&self) -> &'static str {
Self::static_name()
}

fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}

fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
Some(self.0.as_ref())
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}

fn partition_statistics(
&self,
_partition: Option<usize>,
) -> Result<Arc<Statistics>> {
unimplemented!()
}
}
#[test]
fn test_execution_plan_name() {
let schema1 = Arc::new(Schema::empty());
Expand All @@ -1654,6 +1742,24 @@ mod tests {
assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
}

#[test]
fn test_execution_plan_downcast_delegates_to_downcast_delegate() {
let schema = Arc::new(Schema::empty());
let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
let wrapped: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec(inner));
let nested: Arc<dyn ExecutionPlan> =
Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped)));

for plan in [wrapped.as_ref(), nested.as_ref()] {
assert!(!plan.is::<DowncastDelegatingExec>());
assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none());
assert!(plan.is::<EmptyExec>());
assert!(plan.downcast_ref::<EmptyExec>().is_some());
assert!(!plan.is::<RenamedEmptyExec>());
assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none());
}
}

/// A compilation test to ensure that the `ExecutionPlan::name()` method can
/// be called from a trait object.
/// Related ticket: https://github.com/apache/datafusion/pull/11047
Expand Down
Loading