-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Implement FFI_PhysicalExpr and the structs it needs to support it. #18916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement FFI_PhysicalExpr and the structs it needs to support it. #18916
Conversation
| datafusion-expr = { workspace = true } | ||
| datafusion-functions-aggregate-common = { workspace = true } | ||
| datafusion-physical-expr = { workspace = true } | ||
| datafusion-physical-expr-common = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this and the following PRs I am introducing more of these crates, even when they are re-exported in datafusion core crate so that it will have a smaller PR when we remove the core crate at the end of this work epic.
|
@renato2099 @comphead @paleolimbot @kevinjqliu Would any of you be available to review this PR? I know it is large - I think it's the biggest single PR of the entire FFI update effort. I am particularly interested if anyone disagrees that this is a correct approach to take - implementing the I appreciate any time and thoughts you can spare. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces FFI (Foreign Function Interface) support for PhysicalExpr and its supporting structures, enabling physical expressions to be safely passed across FFI boundaries. This is a foundational change that will eventually allow removal of protobuf usage for physical expressions and improve UDF performance by preserving ColumnarValue::Scalar.
Key changes:
- Implementation of
FFI_PhysicalExprtrait with comprehensive wrapper functions for allPhysicalExprmethods - FFI-stable structures for intervals, distributions, expression properties, columnar values, partitioning, and sort expressions
- Bidirectional conversion support between native DataFusion types and FFI-safe representations
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/ffi/src/util.rs | Added FFIResult type alias for FFI return types |
| datafusion/ffi/src/record_batch_stream.rs | Made conversion utilities public for use in physical expression FFI |
| datafusion/ffi/src/physical_expr/sort.rs | Implemented FFI wrapper for physical sort expressions |
| datafusion/ffi/src/physical_expr/partitioning.rs | Implemented FFI wrapper for partitioning schemes |
| datafusion/ffi/src/physical_expr/mod.rs | Core implementation of FFI_PhysicalExpr with wrapper functions and conversions |
| datafusion/ffi/src/lib.rs | Exported new expr and physical_expr modules |
| datafusion/ffi/src/expr/util.rs | Utility functions for scalar value serialization via protobuf |
| datafusion/ffi/src/expr/mod.rs | Module organization for expression-related FFI types |
| datafusion/ffi/src/expr/interval.rs | FFI wrapper for interval arithmetic |
| datafusion/ffi/src/expr/expr_properties.rs | FFI wrappers for expression properties and sort options |
| datafusion/ffi/src/expr/distribution.rs | FFI wrappers for all distribution types |
| datafusion/ffi/src/expr/columnar_value.rs | FFI wrapper for columnar values (arrays and scalars) |
| datafusion/ffi/Cargo.toml | Added dependencies for expression and physical expression crates |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| unsafe extern "C" fn hash_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 { | ||
| let expr = expr.inner(); | ||
| // let mut hasher = DefaultHasher::new(); |
Copilot
AI
Nov 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate code with commented-out line. Remove the commented line 375 as it serves no purpose.
| // let mut hasher = DefaultHasher::new(); |
| fn from(expr: &FFI_PhysicalExpr) -> Self { | ||
| if (expr.library_marker_id)() == crate::get_library_marker_id() { | ||
| Arc::clone(expr.inner()) | ||
| } else { | ||
| let children = unsafe { | ||
| (expr.children)(expr) | ||
| .into_iter() | ||
| .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr)) | ||
| .collect() | ||
| }; | ||
|
|
||
| Arc::new(ForeignPhysicalExpr { | ||
| expr: expr.clone(), |
Copilot
AI
Nov 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The variable name expr is used both for the parameter and the field, which is confusing. Consider renaming to expr: unsafe { (expr.clone)(expr) } or use ffi_expr to clarify that we're cloning the FFI structure.
| fn from(expr: &FFI_PhysicalExpr) -> Self { | |
| if (expr.library_marker_id)() == crate::get_library_marker_id() { | |
| Arc::clone(expr.inner()) | |
| } else { | |
| let children = unsafe { | |
| (expr.children)(expr) | |
| .into_iter() | |
| .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr)) | |
| .collect() | |
| }; | |
| Arc::new(ForeignPhysicalExpr { | |
| expr: expr.clone(), | |
| fn from(ffi_expr: &FFI_PhysicalExpr) -> Self { | |
| if (ffi_expr.library_marker_id)() == crate::get_library_marker_id() { | |
| Arc::clone(ffi_expr.inner()) | |
| } else { | |
| let children = unsafe { | |
| (ffi_expr.children)(ffi_expr) | |
| .into_iter() | |
| .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr)) | |
| .collect() | |
| }; | |
| Arc::new(ForeignPhysicalExpr { | |
| expr: ffi_expr.clone(), |
| let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?; | ||
| // This is not ideal - we are cloning the selection array | ||
| // This is not terrible since it will be a small array. | ||
| // The other alternative is to modify the trait signature. |
Copilot
AI
Nov 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment acknowledges a performance concern but doesn't explain why modifying the trait signature isn't viable. Consider adding context about the constraints that prevent trait signature modification.
| // The other alternative is to modify the trait signature. | |
| // The other alternative is to modify the trait signature. | |
| // However, modifying the trait signature is not viable here because | |
| // the PhysicalExpr trait is part of a stable public API used across | |
| // multiple crates and FFI boundaries. Changing its signature would | |
| // break backward compatibility and require coordinated updates in | |
| // downstream consumers, which is undesirable for maintaining API stability. |
| impl Eq for ForeignPhysicalExpr {} | ||
| impl PartialEq for ForeignPhysicalExpr { | ||
| fn eq(&self, other: &Self) -> bool { | ||
| // FFI_PhysicalExpr cannot be compared, so identity equality is the best we can do. |
Copilot
AI
Nov 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "FFI_PhysicalExpr cannot be compared" but doesn't explain why. Consider clarifying that this is due to opaque private_data across FFI boundaries making semantic comparison impossible.
| // FFI_PhysicalExpr cannot be compared, so identity equality is the best we can do. | |
| // FFI_PhysicalExpr cannot be compared semantically because its private_data is opaque | |
| // across FFI boundaries, making it impossible to determine equivalence of underlying state. | |
| // Therefore, identity equality is the best we can do. |
Thanks @timsaucer for the PR I'm planning to look it at it today or tomorrow! |
paleolimbot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome!
All my comments here likely lack some context, so any comments here are more like questions 🙂 .
The only high-level question I have is whether passing filters across an FFI boundary in this way are going to be useful everywhere. For example, our filtering makes extensive use of downcasting to prune row groups out of GeoParquet files and push bounding boxes into spatial indexes: https://github.com/apache/sedona-db/blob/main/rust/sedona-expr/src/spatial_filter.rs#L176-L211 . (That shouldn't block anything here...we may be totally unique and are fully capable of inventing our own FFI if we need to!)
I also wonder if mismatched struct definitions should be considered at some level here (or likely a battle for a different day given there's no precedent for that yet!)
| pub fn scalar_value_to_rvec_u8(value: &ScalarValue) -> Result<RVec<u8>> { | ||
| let value: datafusion_proto_common::ScalarValue = value.try_into()?; | ||
| Ok(value.encode_to_vec().into()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these be passed as FFI_ArrowArray of length one? Serializing probably has non-trivial performance implications for things like ScalarValue::List that might otherwise be able to avoid a copy of its values. I had been planning to use this trick to pass around references to imagery.
| ColumnarValue::Scalar(v) => { | ||
| FFI_ColumnarValue::Scalar(scalar_value_to_rvec_u8(&v)?) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a note in a later section as well, but I think an FFI_ArrowArray of length one is a more flexible choice for passing a scalar across the FFI boundary.
| fn try_from(value: &FFI_Distribution) -> Result<Self, Self::Error> { | ||
| match value { | ||
| FFI_Distribution::Uniform(d) => d.try_into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me what would happen on version mismatch at this point. Does derive(StableAbi) have a mechanism for loading an FFI enum with an unrecognized variant? I feel as though _ => { / * error communicating that there is a version mismatch */ } would be a nice failure route (can/should the FFI enums be non-exhaustive?)
| /// Allocate batches using a round-robin algorithm and the specified number of partitions | ||
| RoundRobinBatch(usize), | ||
|
|
||
| /// Allocate rows based on a hash of one of more expressions and the specified number of | ||
| /// partitions | ||
| Hash(RVec<FFI_PhysicalExpr>, usize), | ||
|
|
||
| /// Unknown partitioning scheme with a known number of partitions | ||
| UnknownPartitioning(usize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason these are documented and not the other FFI enum items?
| unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) { | ||
| let private_data = Box::from_raw(expr.private_data as *mut PhysicalExprPrivateData); | ||
| drop(private_data); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a good idea to set expr.private_data to null_mut() after dropping (and perhaps debug_assert!() that the private data is non-null before creating the box).
| /// This wrapper struct exists on the receiver side of the FFI interface, so it has | ||
| /// no guarantees about being able to access the data in `private_data`. Any functions | ||
| /// defined on this struct must only use the stable functions provided in | ||
| /// FFI_PhysicalExpr to interact with the foreign table provider. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// FFI_PhysicalExpr to interact with the foreign table provider. | |
| /// FFI_PhysicalExpr to interact with the expression. |
| #[cfg(test)] | ||
| mod tests { | ||
| use std::hash::{DefaultHasher, Hash, Hasher}; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow::array::{record_batch, BooleanArray, RecordBatch}; | ||
| use datafusion_common::{tree_node::DynTreeNode, DataFusionError, ScalarValue}; | ||
| use datafusion_expr::{interval_arithmetic::Interval, statistics::Distribution}; | ||
| use datafusion_physical_expr::expressions::{Column, NegativeExpr, NotExpr}; | ||
| use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExpr}; | ||
|
|
||
| use crate::physical_expr::FFI_PhysicalExpr; | ||
|
|
||
| fn create_test_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps there is another spot for integration tests that is more appropriate, but I wonder if you could use a physical optimizer rule to replace all (or some) expressions in a planned DataFrame/SQL query and check if it executes.
Which issue does this PR close?
Addresses part of #18671 but does not close it.
Rationale for this change
This PR exposes the
PhysicalExprtrait via FFI. This will allow us to remove using protobuf for transferring physical expressions across the FFI boundary. We will still use protobuf for the logical side.The reason this is important is because it will allow us to eventually remove the
corecrate as described in #18671 but also it will enable keepingColumnarValue::Scalarwhen using UDFs. This is important for UDF performance.Of all of the PRs I have prepared for #18671 this is the largest of the individual PRs. That is because it requires quite a few supporting structures from
datafusion-exprin order to support it.What changes are included in this PR?
This PR introduces the
FFI_PhysicalExprtrait and a variety of enums and structs that are needed to be FFI stable in order to implement it. It does not replace the existing usage in the UDFs and other places with theFFI_PhysicalExpryet. That comes in a later PR in order to keep the size of the individual requests to manageable.Are these changes tested?
Unit tests are included.

Are there any user-facing changes?
Since this is pure addition, no user facing changes in this PR.