diff --git a/Cargo.lock b/Cargo.lock index e0f811f16f1b..20aabb996662 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3042,7 +3042,6 @@ version = "0.1.0" dependencies = [ "abi_stable", "datafusion", - "datafusion-ffi", "ffi_module_interface", "tokio", ] diff --git a/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml b/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml index 028a366aab1c..1f68bb1bb1be 100644 --- a/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml +++ b/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml @@ -24,6 +24,5 @@ publish = false [dependencies] abi_stable = "0.11.3" datafusion = { workspace = true } -datafusion-ffi = { workspace = true } ffi_module_interface = { path = "../ffi_module_interface" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } diff --git a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs index 6e376ca866e8..170c7c3eff8a 100644 --- a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs +++ b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs @@ -23,7 +23,7 @@ use datafusion::{ }; use abi_stable::library::{development_utils::compute_library_path, RootModule}; -use datafusion_ffi::table_provider::ForeignTableProvider; +use datafusion::datasource::TableProvider; use ffi_module_interface::TableProviderModuleRef; #[tokio::main] @@ -49,13 +49,13 @@ async fn main() -> Result<()> { ))?(); // In order to access the table provider within this executable, we need to - // turn it into a `ForeignTableProvider`. - let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into(); + // turn it into a `TableProvider`. + let foreign_table_provider: Arc = (&ffi_table_provider).into(); let ctx = SessionContext::new(); // Display the data to show the full cycle works. - ctx.register_table("external_table", Arc::new(foreign_table_provider))?; + ctx.register_table("external_table", foreign_table_provider)?; let df = ctx.table("external_table").await?; df.show().await?; diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 3ac08180fb68..b797804731f4 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -58,6 +58,7 @@ semver = "1.0.27" tokio = { workspace = true } [dev-dependencies] +datafusion = { workspace = true, default-features = false, features = ["sql"] } doc-comment = { workspace = true } [features] diff --git a/datafusion/ffi/README.md b/datafusion/ffi/README.md index 72070984f931..afbda95d45e2 100644 --- a/datafusion/ffi/README.md +++ b/datafusion/ffi/README.md @@ -101,6 +101,75 @@ In this crate we have a variety of structs which closely mimic the behavior of their internal counterparts. To see detailed notes about how to use them, see the example in `FFI_TableProvider`. +## Memory Management + +One of the advantages of Rust the ownership model, which means programmers +_usually_ do not need to worry about memory management. When interacting with +foreign code, this is not necessarily true. If you review the structures in +this crate, you will find that many of them implement the `Drop` trait and +perform a foreign call. + +Suppose we have a `FFI_CatalogProvider`, for example. This struct is safe to +pass across the FFI boundary, so it may be owned be either the library that +produces the underlying `CatalogProvider` or by another library that consumes +it. If we look closer at the `FFI_CatalogProvider`, it has a pointer to +some private data. That private data is only accessible on the producer's +side. If you attempt to access it on the consumer's side, you may get +segmentation faults or other bad behavior. Within that private data is the +actual `Arc` must be freed, but if the +`FFI_CatalogProvider` is only owned on the consumer's side, we have no way +to access the private data and free it. + +To account for this most structs in this crate have a `release` method that +is used to clean up any privately held data. This calls into the producer's +side, regardless of if it is called on either the local or foreign side. +Most of the structs in this crate carry atomic reference counts to the +underlying data, and this is straight forward. Some structs like the +`FFI_Accumulator` contain an inner `Box`. The reason for +this is that we need to be able to mutably access these based on the +`Accumulator` trait definition. For these we have slightly more complicated +release code based on whether it is being dropped on the local or foreign side. +Traits that use a `Box<>` for their underlying data also cannot implement +`Clone`. + +## Library Marker ID + +When reviewing the code, many of the structs in this crate contain a call to +a `library_marker_id`. The purpose of this call is to determine if a library is +accessing _local_ code through the FFI structs. Consider this example: you have +a `primary` program that exposes functions to create a schema provider. You +have a `secondary` library that exposes a function to create a catalog provider +and the `secondary` library uses the schema provider of the `primary` program. +From the point of view of the `secondary` library, the schema provider is +foreign code. + +Now when we register the `secondary` library with the `primary` program as a +catalog provider and we make calls to get a schema, the `secondary` library +will return a FFI wrapped schema provider back to the `primary` program. In +this case that schema provider is actually local code to the `primary` program +except that it is wrapped in the FFI code! + +We work around this by the `library_marker_id` calls. What this does is it +creates a global variable within each library and returns a `usize` address +of that library. This is guaranteed to be unique for every library that contains +FFI code. By comparing these `usize` addresses we can determine if a FFI struct +is local or foreign. + +In our example of the schema provider, if you were to make a call in your +primary program to get the schema provider, it would reach out to the foreign +catalog provider and send back a `FFI_SchemaProvider` object. By then +comparing the `library_marker_id` of this object to the `primary` program, we +determine it is local code. This means it is safe to access the underlying +private data. + +Users of the FFI code should not need to access these function. If you are +implementing a new FFI struct, then it is recommended that you follow the +established patterns for converting from FFI struct into the underlying +traits. Specifically you should use `crate::get_library_marker_id` and in +your unit tests you should override this with +`crate::mock_foreign_marker_id` to force your test to create the foreign +variant of your struct. + [apache datafusion]: https://datafusion.apache.org/ [api docs]: http://docs.rs/datafusion-ffi/latest [rust abi]: https://doc.rust-lang.org/reference/abi.html diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index d279951783b4..00e8dc315811 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -70,6 +70,11 @@ pub struct FFI_CatalogProvider { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignCatalogProvider`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_CatalogProvider {} @@ -116,7 +121,7 @@ unsafe extern "C" fn register_schema_fn_wrapper( ) -> RResult, RString> { let runtime = provider.runtime(); let provider = provider.inner(); - let schema = Arc::new(ForeignSchemaProvider::from(schema)); + let schema: Arc = schema.into(); let returned_schema = rresult_return!(provider.register_schema(name.as_str(), schema)) @@ -169,6 +174,7 @@ unsafe extern "C" fn clone_fn_wrapper( release: release_fn_wrapper, version: super::version, private_data, + library_marker_id: crate::get_library_marker_id, } } @@ -195,6 +201,7 @@ impl FFI_CatalogProvider { release: release_fn_wrapper, version: super::version, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -209,9 +216,14 @@ pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider); unsafe impl Send for ForeignCatalogProvider {} unsafe impl Sync for ForeignCatalogProvider {} -impl From<&FFI_CatalogProvider> for ForeignCatalogProvider { +impl From<&FFI_CatalogProvider> for Arc { fn from(provider: &FFI_CatalogProvider) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(unsafe { provider.inner() }); + } + + Arc::new(ForeignCatalogProvider(provider.clone())) + as Arc } } @@ -298,9 +310,10 @@ mod tests { .unwrap() .is_none()); - let ffi_catalog = FFI_CatalogProvider::new(catalog, None); + let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None); + ffi_catalog.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); let prior_schema_names = foreign_catalog.schema_names(); assert_eq!(prior_schema_names.len(), 1); @@ -335,4 +348,26 @@ mod tests { let returned_schema = foreign_catalog.schema("second_schema"); assert!(returned_schema.is_some()); } + + #[test] + fn test_ffi_catalog_provider_local_bypass() { + let catalog = Arc::new(MemoryCatalogProvider::new()); + + let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None); + + // Verify local libraries can be downcast to their original + let foreign_catalog: Arc = (&ffi_catalog).into(); + assert!(foreign_catalog + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_catalog.library_marker_id = crate::mock_foreign_marker_id; + let foreign_catalog: Arc = (&ffi_catalog).into(); + assert!(foreign_catalog + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index b09f06d318c1..429897269470 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -58,6 +58,11 @@ pub struct FFI_CatalogProviderList { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignCatalogProviderList`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_CatalogProviderList {} @@ -94,7 +99,7 @@ unsafe extern "C" fn register_catalog_fn_wrapper( ) -> ROption { let runtime = provider.runtime(); let provider = provider.inner(); - let catalog = Arc::new(ForeignCatalogProvider::from(catalog)); + let catalog: Arc = catalog.into(); provider .register_catalog(name.into(), catalog) @@ -138,6 +143,7 @@ unsafe extern "C" fn clone_fn_wrapper( release: release_fn_wrapper, version: super::version, private_data, + library_marker_id: crate::get_library_marker_id, } } @@ -163,6 +169,7 @@ impl FFI_CatalogProviderList { release: release_fn_wrapper, version: super::version, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -177,9 +184,14 @@ pub struct ForeignCatalogProviderList(FFI_CatalogProviderList); unsafe impl Send for ForeignCatalogProviderList {} unsafe impl Sync for ForeignCatalogProviderList {} -impl From<&FFI_CatalogProviderList> for ForeignCatalogProviderList { +impl From<&FFI_CatalogProviderList> for Arc { fn from(provider: &FFI_CatalogProviderList) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(unsafe { provider.inner() }); + } + + Arc::new(ForeignCatalogProviderList(provider.clone())) + as Arc } } @@ -248,9 +260,11 @@ mod tests { .register_catalog("prior_catalog".to_owned(), prior_catalog) .is_none()); - let ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None); + let mut ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None); + ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into(); + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); let prior_catalog_names = foreign_catalog_list.catalog_names(); assert_eq!(prior_catalog_names.len(), 1); @@ -280,4 +294,28 @@ mod tests { let returned_catalog = foreign_catalog_list.catalog("second_catalog"); assert!(returned_catalog.is_some()); } + + #[test] + fn test_ffi_catalog_provider_list_local_bypass() { + let catalog_list = Arc::new(MemoryCatalogProviderList::new()); + + let mut ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None); + + // Verify local libraries can be downcast to their original + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); + assert!(foreign_catalog_list + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id; + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); + assert!(foreign_catalog_list + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 70c957d8c373..d76dcd8dd0c9 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -65,6 +65,11 @@ pub struct FFI_ExecutionPlan { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignExecutionPlan`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_ExecutionPlan {} @@ -76,13 +81,17 @@ pub struct ExecutionPlanPrivateData { pub runtime: Option, } +impl FFI_ExecutionPlan { + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ExecutionPlanPrivateData; + unsafe { &(*private_data).plan } + } +} + unsafe extern "C" fn properties_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> FFI_PlanProperties { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - - plan.properties().into() + plan.inner().properties().into() } unsafe extern "C" fn children_fn_wrapper( @@ -119,10 +128,7 @@ unsafe extern "C" fn execute_fn_wrapper( } unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - - plan.name().into() + plan.inner().name().into() } unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { @@ -168,6 +174,7 @@ impl FFI_ExecutionPlan { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -218,10 +225,14 @@ impl DisplayAs for ForeignExecutionPlan { } } -impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan { +impl TryFrom<&FFI_ExecutionPlan> for Arc { type Error = DataFusionError; fn try_from(plan: &FFI_ExecutionPlan) -> Result { + if (plan.library_marker_id)() == crate::get_library_marker_id() { + return Ok(Arc::clone(plan.inner())); + } + unsafe { let name = (plan.name)(plan).into(); @@ -230,16 +241,17 @@ impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan { let children_rvec = (plan.children)(plan); let children = children_rvec .iter() - .map(ForeignExecutionPlan::try_from) - .map(|child| child.map(|c| Arc::new(c) as Arc)) + .map(>::try_from) .collect::>>()?; - Ok(Self { + let plan = ForeignExecutionPlan { name, plan: plan.clone(), properties, children, - }) + }; + + Ok(Arc::new(plan)) } } } @@ -258,10 +270,7 @@ impl ExecutionPlan for ForeignExecutionPlan { } fn children(&self) -> Vec<&Arc> { - self.children - .iter() - .map(|p| p as &Arc) - .collect() + self.children.iter().collect() } fn with_new_children( @@ -290,6 +299,7 @@ impl ExecutionPlan for ForeignExecutionPlan { #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::{ physical_plan::{ @@ -299,8 +309,6 @@ mod tests { prelude::SessionContext, }; - use super::*; - #[derive(Debug)] pub struct EmptyExec { props: PlanProperties, @@ -380,14 +388,15 @@ mod tests { let original_plan = Arc::new(EmptyExec::new(schema)); let original_name = original_plan.name().to_string(); - let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None); + let mut local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None); + local_plan.library_marker_id = crate::mock_foreign_marker_id; - let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?; + let foreign_plan: Arc = (&local_plan).try_into()?; assert!(original_name == foreign_plan.name()); let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new( - &foreign_plan, + foreign_plan.as_ref(), ); let buf = display.one_line().to_string(); @@ -407,12 +416,14 @@ mod tests { // Version 1: Adding child to the foreign plan let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); - let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?); + let mut child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); + child_local.library_marker_id = crate::mock_foreign_marker_id; + let child_foreign = >::try_from(&child_local)?; let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); - let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?); + let mut parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); + parent_local.library_marker_id = crate::mock_foreign_marker_id; + let parent_foreign = >::try_from(&parent_local)?; assert_eq!(parent_foreign.children().len(), 0); assert_eq!(child_foreign.children().len(), 0); @@ -422,16 +433,41 @@ mod tests { // Version 2: Adding child to the local plan let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); - let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?); + let mut child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); + child_local.library_marker_id = crate::mock_foreign_marker_id; + let child_foreign = >::try_from(&child_local)?; let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); let parent_plan = parent_plan.with_new_children(vec![child_foreign])?; - let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); - let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?); + let mut parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); + parent_local.library_marker_id = crate::mock_foreign_marker_id; + let parent_foreign = >::try_from(&parent_local)?; assert_eq!(parent_foreign.children().len(), 1); Ok(()) } + + #[test] + fn test_ffi_execution_plan_local_bypass() { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let ctx = SessionContext::new(); + + let plan = Arc::new(EmptyExec::new(schema)); + + let mut ffi_plan = FFI_ExecutionPlan::new(plan, ctx.task_ctx(), None); + + // Verify local libraries can be downcast to their original + let foreign_plan: Arc = (&ffi_plan).try_into().unwrap(); + assert!(foreign_plan.as_any().downcast_ref::().is_some()); + + // Verify different library markers generate foreign providers + ffi_plan.library_marker_id = crate::mock_foreign_marker_id; + let foreign_plan: Arc = (&ffi_plan).try_into().unwrap(); + assert!(foreign_plan + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 39eb7babd90d..c85d16f5db33 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -58,5 +58,33 @@ pub extern "C" fn version() -> u64 { version.major } +static LIBRARY_MARKER: u8 = 0; + +/// This utility is used to determine if two FFI structs are within +/// the same library. It is possible that the interplay between +/// foreign and local functions calls create one FFI struct that +/// references another. It is helpful to determine if a foreign +/// struct is truly foreign or in the same library. If we are in the +/// same library, then we can access the underlying types directly. +/// +/// This function works by checking the address of the library +/// marker. Each library that implements the FFI code will have +/// a different address for the marker. By checking the marker +/// address we can determine if a struct is truly Foreign or is +/// actually within the same originating library. +/// +/// See the crate's `README.md` for additional information. +pub extern "C" fn get_library_marker_id() -> usize { + &LIBRARY_MARKER as *const u8 as usize +} + +/// For unit testing in this crate we need to trick the providers +/// into thinking we have a foreign call. We do this by overwriting +/// their `library_marker_id` function to return a different value. +#[cfg(test)] +pub(crate) extern "C" fn mock_foreign_marker_id() -> usize { + get_library_marker_id() + 1 +} + #[cfg(doctest)] doc_comment::doctest!("../README.md", readme_example_test); diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 48c2698a58c7..0b8177a41242 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -75,21 +75,32 @@ pub struct FFI_PlanProperties { /// Internal data. This is only to be accessed by the provider of the plan. /// The foreign library should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } struct PlanPropertiesPrivateData { props: PlanProperties, } +impl FFI_PlanProperties { + fn inner(&self) -> &PlanProperties { + let private_data = self.private_data as *const PlanPropertiesPrivateData; + unsafe { &(*private_data).props } + } +} + unsafe extern "C" fn output_partitioning_fn_wrapper( properties: &FFI_PlanProperties, ) -> RResult, RString> { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - let codec = DefaultPhysicalExtensionCodec {}; - let partitioning_data = - rresult_return!(serialize_partitioning(props.output_partitioning(), &codec)); + let partitioning_data = rresult_return!(serialize_partitioning( + properties.inner().output_partitioning(), + &codec + )); let output_partitioning = partitioning_data.encode_to_vec(); ROk(output_partitioning.into()) @@ -98,27 +109,20 @@ unsafe extern "C" fn output_partitioning_fn_wrapper( unsafe extern "C" fn emission_type_fn_wrapper( properties: &FFI_PlanProperties, ) -> FFI_EmissionType { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - props.emission_type.into() + properties.inner().emission_type.into() } unsafe extern "C" fn boundedness_fn_wrapper( properties: &FFI_PlanProperties, ) -> FFI_Boundedness { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - props.boundedness.into() + properties.inner().boundedness.into() } unsafe extern "C" fn output_ordering_fn_wrapper( properties: &FFI_PlanProperties, ) -> RResult, RString> { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - let codec = DefaultPhysicalExtensionCodec {}; - let output_ordering = match props.output_ordering() { + let output_ordering = match properties.inner().output_ordering() { Some(ordering) => { let physical_sort_expr_nodes = rresult_return!( serialize_physical_sort_exprs(ordering.to_owned(), &codec) @@ -135,10 +139,7 @@ unsafe extern "C" fn output_ordering_fn_wrapper( } unsafe extern "C" fn schema_fn_wrapper(properties: &FFI_PlanProperties) -> WrappedSchema { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - - let schema: SchemaRef = Arc::clone(props.eq_properties.schema()); + let schema: SchemaRef = Arc::clone(properties.inner().eq_properties.schema()); schema.into() } @@ -168,6 +169,7 @@ impl From<&PlanProperties> for FFI_PlanProperties { schema: schema_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -176,6 +178,10 @@ impl TryFrom for PlanProperties { type Error = DataFusionError; fn try_from(ffi_props: FFI_PlanProperties) -> Result { + if (ffi_props.library_marker_id)() == crate::get_library_marker_id() { + return Ok(ffi_props.inner().clone()); + } + let ffi_schema = unsafe { (ffi_props.schema)(&ffi_props) }; let schema = (&ffi_schema.0).try_into()?; @@ -304,8 +310,7 @@ mod tests { use super::*; - #[test] - fn test_round_trip_ffi_plan_properties() -> Result<()> { + fn create_test_props() -> Result { use arrow::datatypes::{DataType, Field, Schema}; let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); @@ -314,14 +319,19 @@ mod tests { let _ = eqp.reorder([PhysicalSortExpr::new_default( datafusion::physical_plan::expressions::col("a", &schema)?, )]); - let original_props = PlanProperties::new( + Ok(PlanProperties::new( eqp, Partitioning::RoundRobinBatch(3), EmissionType::Incremental, Boundedness::Bounded, - ); + )) + } - let local_props_ptr = FFI_PlanProperties::from(&original_props); + #[test] + fn test_round_trip_ffi_plan_properties() -> Result<()> { + let original_props = create_test_props()?; + let mut local_props_ptr = FFI_PlanProperties::from(&original_props); + local_props_ptr.library_marker_id = crate::mock_foreign_marker_id; let foreign_props: PlanProperties = local_props_ptr.try_into()?; @@ -329,4 +339,23 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_execution_plan_local_bypass() -> Result<()> { + let props = create_test_props()?; + + let ffi_plan = FFI_PlanProperties::from(&props); + + // Verify local libraries + let foreign_plan: PlanProperties = ffi_plan.try_into()?; + assert_eq!(format!("{foreign_plan:?}"), format!("{props:?}")); + + // Verify different library markers still can produce identical properties + let mut ffi_plan = FFI_PlanProperties::from(&props); + ffi_plan.library_marker_id = crate::mock_foreign_marker_id; + let foreign_plan: PlanProperties = ffi_plan.try_into()?; + assert_eq!(format!("{foreign_plan:?}"), format!("{props:?}")); + + Ok(()) + } } diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index b5970d5881d6..d6feeb6b8fb3 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -80,6 +80,11 @@ pub struct FFI_SchemaProvider { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignSchemaProvider`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_SchemaProvider {} @@ -191,6 +196,7 @@ unsafe extern "C" fn clone_fn_wrapper( register_table: register_table_fn_wrapper, deregister_table: deregister_table_fn_wrapper, table_exist: table_exist_fn_wrapper, + library_marker_id: crate::get_library_marker_id, } } @@ -220,6 +226,7 @@ impl FFI_SchemaProvider { register_table: register_table_fn_wrapper, deregister_table: deregister_table_fn_wrapper, table_exist: table_exist_fn_wrapper, + library_marker_id: crate::get_library_marker_id, } } } @@ -234,9 +241,14 @@ pub struct ForeignSchemaProvider(pub FFI_SchemaProvider); unsafe impl Send for ForeignSchemaProvider {} unsafe impl Sync for ForeignSchemaProvider {} -impl From<&FFI_SchemaProvider> for ForeignSchemaProvider { +impl From<&FFI_SchemaProvider> for Arc { fn from(provider: &FFI_SchemaProvider) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(unsafe { provider.inner() }); + } + + Arc::new(ForeignSchemaProvider(provider.clone())) + as Arc } } @@ -274,9 +286,7 @@ impl SchemaProvider for ForeignSchemaProvider { let table: Option = df_result!((self.0.table)(&self.0, name.into()).await)?.into(); - let table = table.as_ref().map(|t| { - Arc::new(ForeignTableProvider::from(t)) as Arc - }); + let table = table.as_ref().map(>::from); Ok(table) } @@ -319,11 +329,10 @@ impl SchemaProvider for ForeignSchemaProvider { #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::Schema; use datafusion::{catalog::MemorySchemaProvider, datasource::empty::EmptyTable}; - use super::*; - fn empty_table() -> Arc { Arc::new(EmptyTable::new(Arc::new(Schema::empty()))) } @@ -337,9 +346,10 @@ mod tests { .unwrap() .is_none()); - let ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, None); + let mut ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, None); + ffi_schema_provider.library_marker_id = crate::mock_foreign_marker_id; - let foreign_schema_provider: ForeignSchemaProvider = + let foreign_schema_provider: Arc = (&ffi_schema_provider).into(); let prior_table_names = foreign_schema_provider.table_names(); @@ -382,4 +392,26 @@ mod tests { assert!(returned_schema.is_some()); assert!(foreign_schema_provider.table_exist("second_table")); } + + #[test] + fn test_ffi_schema_provider_local_bypass() { + let schema_provider = Arc::new(MemorySchemaProvider::new()); + + let mut ffi_schema = FFI_SchemaProvider::new(schema_provider, None); + + // Verify local libraries can be downcast to their original + let foreign_schema: Arc = (&ffi_schema).into(); + assert!(foreign_schema + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_schema.library_marker_id = crate::mock_foreign_marker_id; + let foreign_schema: Arc = (&ffi_schema).into(); + assert!(foreign_schema + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 890511997a70..10b44a147fa0 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -50,8 +50,7 @@ use crate::{ }; use super::{ - execution_plan::{FFI_ExecutionPlan, ForeignExecutionPlan}, - insert_op::FFI_InsertOp, + execution_plan::FFI_ExecutionPlan, insert_op::FFI_InsertOp, session_config::FFI_SessionConfig, }; use datafusion::error::Result; @@ -154,8 +153,13 @@ pub struct FFI_TableProvider { pub version: unsafe extern "C" fn() -> u64, /// Internal data. This is only to be accessed by the provider of the plan. - /// A [`ForeignExecutionPlan`] should never attempt to access this data. + /// A [`ForeignTableProvider`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_TableProvider {} @@ -166,20 +170,26 @@ struct ProviderPrivateData { runtime: Option, } -unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; +impl FFI_TableProvider { + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ProviderPrivateData; + unsafe { &(*private_data).provider } + } - provider.schema().into() + fn runtime(&self) -> &Option { + let private_data = self.private_data as *const ProviderPrivateData; + unsafe { &(*private_data).runtime } + } +} + +unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema { + provider.inner().schema().into() } unsafe extern "C" fn table_type_fn_wrapper( provider: &FFI_TableProvider, ) -> FFI_TableType { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; - - provider.table_type().into() + provider.inner().table_type().into() } fn supports_filters_pushdown_internal( @@ -213,10 +223,7 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper( provider: &FFI_TableProvider, filters_serialized: RVec, ) -> RResult, RString> { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; - - supports_filters_pushdown_internal(provider, &filters_serialized) + supports_filters_pushdown_internal(provider.inner(), &filters_serialized) .map_err(|e| e.to_string().into()) .into() } @@ -228,10 +235,9 @@ unsafe extern "C" fn scan_fn_wrapper( filters_serialized: RVec, limit: ROption, ) -> FfiFuture> { - let private_data = provider.private_data as *mut ProviderPrivateData; - let internal_provider = &(*private_data).provider; + let runtime = provider.runtime().clone(); + let internal_provider = Arc::clone(provider.inner()); let session_config = session_config.clone(); - let runtime = &(*private_data).runtime; async move { let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); @@ -281,11 +287,10 @@ unsafe extern "C" fn insert_into_fn_wrapper( input: &FFI_ExecutionPlan, insert_op: FFI_InsertOp, ) -> FfiFuture> { - let private_data = provider.private_data as *mut ProviderPrivateData; - let internal_provider = &(*private_data).provider; + let runtime = provider.runtime().clone(); + let internal_provider = Arc::clone(provider.inner()); let session_config = session_config.clone(); let input = input.clone(); - let runtime = &(*private_data).runtime; async move { let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); @@ -295,7 +300,7 @@ unsafe extern "C" fn insert_into_fn_wrapper( .build(); let ctx = SessionContext::new_with_state(session); - let input = rresult_return!(ForeignExecutionPlan::try_from(&input).map(Arc::new)); + let input = rresult_return!(>::try_from(&input)); let insert_op = InsertOp::from(insert_op); @@ -320,11 +325,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) { } unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider { - let old_private_data = provider.private_data as *const ProviderPrivateData; - let runtime = (*old_private_data).runtime.clone(); + let runtime = provider.runtime().clone(); + let old_provider = Arc::clone(provider.inner()); let private_data = Box::into_raw(Box::new(ProviderPrivateData { - provider: Arc::clone(&(*old_private_data).provider), + provider: old_provider, runtime, })) as *mut c_void; @@ -338,6 +343,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table release: release_fn_wrapper, version: super::version, private_data, + library_marker_id: crate::get_library_marker_id, } } @@ -369,6 +375,7 @@ impl FFI_TableProvider { release: release_fn_wrapper, version: super::version, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -383,9 +390,13 @@ pub struct ForeignTableProvider(pub FFI_TableProvider); unsafe impl Send for ForeignTableProvider {} unsafe impl Sync for ForeignTableProvider {} -impl From<&FFI_TableProvider> for ForeignTableProvider { +impl From<&FFI_TableProvider> for Arc { fn from(provider: &FFI_TableProvider) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(provider.inner()) as Arc + } else { + Arc::new(ForeignTableProvider(provider.clone())) + } } } @@ -438,10 +449,10 @@ impl TableProvider for ForeignTableProvider { ) .await; - ForeignExecutionPlan::try_from(&df_result!(maybe_plan)?)? + >::try_from(&df_result!(maybe_plan)?)? }; - Ok(Arc::new(plan)) + Ok(plan) } /// Tests whether the table provider can make use of a filter expression @@ -491,22 +502,20 @@ impl TableProvider for ForeignTableProvider { let maybe_plan = (self.0.insert_into)(&self.0, &session_config, &input, insert_op).await; - ForeignExecutionPlan::try_from(&df_result!(maybe_plan)?)? + >::try_from(&df_result!(maybe_plan)?)? }; - Ok(Arc::new(plan)) + Ok(plan) } } #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::Schema; use datafusion::prelude::{col, lit}; - use super::*; - - #[tokio::test] - async fn test_round_trip_ffi_table_provider_scan() -> Result<()> { + fn create_test_table_provider() -> Result> { use arrow::datatypes::Field; use datafusion::arrow::{ array::Float32Array, datatypes::DataType, record_batch::RecordBatch, @@ -526,16 +535,23 @@ mod tests { vec![Arc::new(Float32Array::from(vec![64.0]))], )?; - let ctx = SessionContext::new(); + Ok(Arc::new(MemTable::try_new( + schema, + vec![vec![batch1], vec![batch2]], + )?)) + } - let provider = - Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?); + #[tokio::test] + async fn test_round_trip_ffi_table_provider_scan() -> Result<()> { + let provider = create_test_table_provider()?; + let ctx = SessionContext::new(); - let ffi_provider = FFI_TableProvider::new(provider, true, None); + let mut ffi_provider = FFI_TableProvider::new(provider, true, None); + ffi_provider.library_marker_id = crate::mock_foreign_marker_id; - let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); + let foreign_table_provider: Arc = (&ffi_provider).into(); - ctx.register_table("t", Arc::new(foreign_table_provider))?; + ctx.register_table("t", foreign_table_provider)?; let df = ctx.table("t").await?; @@ -549,35 +565,15 @@ mod tests { #[tokio::test] async fn test_round_trip_ffi_table_provider_insert_into() -> Result<()> { - use arrow::datatypes::Field; - use datafusion::arrow::{ - array::Float32Array, datatypes::DataType, record_batch::RecordBatch, - }; - use datafusion::datasource::MemTable; - - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); - - // define data in two partitions - let batch1 = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))], - )?; - let batch2 = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(Float32Array::from(vec![64.0]))], - )?; - + let provider = create_test_table_provider()?; let ctx = SessionContext::new(); - let provider = - Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?); + let mut ffi_provider = FFI_TableProvider::new(provider, true, None); + ffi_provider.library_marker_id = crate::mock_foreign_marker_id; - let ffi_provider = FFI_TableProvider::new(provider, true, None); - - let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); + let foreign_table_provider: Arc = (&ffi_provider).into(); - ctx.register_table("t", Arc::new(foreign_table_provider))?; + ctx.register_table("t", foreign_table_provider)?; let result = ctx .sql("INSERT INTO t VALUES (128.0);") @@ -621,9 +617,9 @@ mod tests { let ffi_provider = FFI_TableProvider::new(provider, true, None); - let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); + let foreign_table_provider: Arc = (&ffi_provider).into(); - ctx.register_table("t", Arc::new(foreign_table_provider))?; + ctx.register_table("t", foreign_table_provider)?; let result = ctx .sql("SELECT COUNT(*) as cnt FROM t") @@ -641,4 +637,28 @@ mod tests { assert_batches_eq!(expected, &result); Ok(()) } + + #[test] + fn test_ffi_table_provider_local_bypass() -> Result<()> { + let table_provider = create_test_table_provider()?; + + let mut ffi_table = FFI_TableProvider::new(table_provider, false, None); + + // Verify local libraries can be downcast to their original + let foreign_table: Arc = (&ffi_table).into(); + assert!(foreign_table + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_table.library_marker_id = crate::mock_foreign_marker_id; + let foreign_table: Arc = (&ffi_table).into(); + assert!(foreign_table + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index 80b872159f48..8626d1a42679 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ffi::c_void, ops::Deref}; - use abi_stable::{ std_types::{RResult, RString, RVec}, StableAbi, @@ -28,6 +26,8 @@ use datafusion::{ scalar::ScalarValue, }; use prost::Message; +use std::ptr::null_mut; +use std::{ffi::c_void, ops::Deref}; use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; @@ -70,6 +70,11 @@ pub struct FFI_Accumulator { /// Internal data. This is only to be accessed by the provider of the accumulator. /// A [`ForeignAccumulator`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_Accumulator {} @@ -173,9 +178,11 @@ unsafe extern "C" fn retract_batch_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_Accumulator) { - let private_data = - Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData); - drop(private_data); + if !accumulator.private_data.is_null() { + let private_data = + Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData); + drop(private_data); + } } impl From> for FFI_Accumulator { @@ -193,6 +200,7 @@ impl From> for FFI_Accumulator { supports_retract_batch, release: release_fn_wrapper, private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -217,9 +225,20 @@ pub struct ForeignAccumulator { unsafe impl Send for ForeignAccumulator {} unsafe impl Sync for ForeignAccumulator {} -impl From for ForeignAccumulator { - fn from(accumulator: FFI_Accumulator) -> Self { - Self { accumulator } +impl From for Box { + fn from(mut accumulator: FFI_Accumulator) -> Self { + if (accumulator.library_marker_id)() == crate::get_library_marker_id() { + unsafe { + let private_data = Box::from_raw( + accumulator.private_data as *mut AccumulatorPrivateData, + ); + // We must set this to null to avoid a double free + accumulator.private_data = null_mut(); + private_data.accumulator + } + } else { + Box::new(ForeignAccumulator { accumulator }) + } } } @@ -306,6 +325,7 @@ impl Accumulator for ForeignAccumulator { #[cfg(test)] mod tests { + use super::{FFI_Accumulator, ForeignAccumulator}; use arrow::array::{make_array, Array}; use datafusion::{ common::create_array, error::Result, @@ -313,8 +333,6 @@ mod tests { scalar::ScalarValue, }; - use super::{FFI_Accumulator, ForeignAccumulator}; - #[test] fn test_foreign_avg_accumulator() -> Result<()> { let original_accum = AvgAccumulator::default(); @@ -322,8 +340,9 @@ mod tests { let original_supports_retract = original_accum.supports_retract_batch(); let boxed_accum: Box = Box::new(original_accum); - let ffi_accum: FFI_Accumulator = boxed_accum.into(); - let mut foreign_accum: ForeignAccumulator = ffi_accum.into(); + let mut ffi_accum: FFI_Accumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let mut foreign_accum: Box = ffi_accum.into(); // Send in an array to average. There are 5 values and it should average to 30.0 let values = create_array!(Float64, vec![10., 20., 30., 40., 50.]); @@ -363,4 +382,35 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_accumulator_local_bypass() -> Result<()> { + let original_accum = AvgAccumulator::default(); + let boxed_accum: Box = Box::new(original_accum); + let original_size = boxed_accum.size(); + + let ffi_accum: FFI_Accumulator = boxed_accum.into(); + + // Verify local libraries can be downcast to their original + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn Accumulator + as *const AvgAccumulator); + assert_eq!(original_size, concrete.size()); + } + + // Verify different library markers generate foreign accumulator + let original_accum = AvgAccumulator::default(); + let boxed_accum: Box = Box::new(original_accum); + let mut ffi_accum: FFI_Accumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn Accumulator + as *const ForeignAccumulator); + assert_eq!(original_size, concrete.size()); + } + + Ok(()) + } } diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 58a18c69db7c..b088804678b9 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ffi::c_void, ops::Deref, sync::Arc}; - use crate::{ arrow_wrappers::{WrappedArray, WrappedSchema}, df_result, rresult, rresult_return, @@ -34,6 +32,8 @@ use datafusion::{ error::{DataFusionError, Result}, logical_expr::{EmitTo, GroupsAccumulator}, }; +use std::ptr::null_mut; +use std::{ffi::c_void, ops::Deref, sync::Arc}; /// A stable struct for sharing [`GroupsAccumulator`] across FFI boundaries. /// For an explanation of each field, see the corresponding function @@ -86,6 +86,11 @@ pub struct FFI_GroupsAccumulator { /// Internal data. This is only to be accessed by the provider of the accumulator. /// A [`ForeignGroupsAccumulator`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_GroupsAccumulator {} @@ -215,9 +220,11 @@ unsafe extern "C" fn convert_to_state_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_GroupsAccumulator) { - let private_data = - Box::from_raw(accumulator.private_data as *mut GroupsAccumulatorPrivateData); - drop(private_data); + if !accumulator.private_data.is_null() { + let private_data = + Box::from_raw(accumulator.private_data as *mut GroupsAccumulatorPrivateData); + drop(private_data); + } } impl From> for FFI_GroupsAccumulator { @@ -236,6 +243,7 @@ impl From> for FFI_GroupsAccumulator { release: release_fn_wrapper, private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -260,9 +268,19 @@ pub struct ForeignGroupsAccumulator { unsafe impl Send for ForeignGroupsAccumulator {} unsafe impl Sync for ForeignGroupsAccumulator {} -impl From for ForeignGroupsAccumulator { - fn from(accumulator: FFI_GroupsAccumulator) -> Self { - Self { accumulator } +impl From for Box { + fn from(mut accumulator: FFI_GroupsAccumulator) -> Self { + if (accumulator.library_marker_id)() == crate::get_library_marker_id() { + unsafe { + let private_data = Box::from_raw( + accumulator.private_data as *mut GroupsAccumulatorPrivateData, + ); + accumulator.private_data = null_mut(); + private_data.accumulator + } + } else { + Box::new(ForeignGroupsAccumulator { accumulator }) + } } } @@ -428,22 +446,24 @@ impl From for EmitTo { #[cfg(test)] mod tests { + use super::{FFI_EmitTo, FFI_GroupsAccumulator, ForeignGroupsAccumulator}; use arrow::array::{make_array, Array, BooleanArray}; + use datafusion::functions_aggregate::stddev::StddevGroupsAccumulator; use datafusion::{ common::create_array, error::Result, logical_expr::{EmitTo, GroupsAccumulator}, }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; - - use super::{FFI_EmitTo, FFI_GroupsAccumulator, ForeignGroupsAccumulator}; + use datafusion_functions_aggregate_common::stats::StatsType; #[test] fn test_foreign_avg_accumulator() -> Result<()> { let boxed_accum: Box = Box::new(BooleanGroupsAccumulator::new(|a, b| a && b, true)); - let ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); - let mut foreign_accum: ForeignGroupsAccumulator = ffi_accum.into(); + let mut ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let mut foreign_accum: Box = ffi_accum.into(); // Send in an array to evaluate. We want a mean of 30 and standard deviation of 4. let values = create_array!(Boolean, vec![true, true, true, false, true, true]); @@ -510,4 +530,35 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_groups_accumulator_local_bypass_inner() -> Result<()> { + let original_accum = StddevGroupsAccumulator::new(StatsType::Population); + let boxed_accum: Box = Box::new(original_accum); + let original_size = boxed_accum.size(); + + let ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); + + // Verify local libraries can be downcast to their original + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn GroupsAccumulator + as *const StddevGroupsAccumulator); + assert_eq!(original_size, concrete.size()); + } + + // Verify different library markers generate foreign accumulator + let original_accum = StddevGroupsAccumulator::new(StatsType::Population); + let boxed_accum: Box = Box::new(original_accum); + let mut ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn GroupsAccumulator + as *const ForeignGroupsAccumulator); + assert_eq!(original_size, concrete.size()); + } + + Ok(()) + } } diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index ce5611590b67..a416753c371b 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -19,7 +19,7 @@ use abi_stable::{ std_types::{ROption, RResult, RStr, RString, RVec}, StableAbi, }; -use accumulator::{FFI_Accumulator, ForeignAccumulator}; +use accumulator::FFI_Accumulator; use accumulator_args::{FFI_AccumulatorArgs, ForeignAccumulatorArgs}; use arrow::datatypes::{DataType, Field}; use arrow::ffi::FFI_ArrowSchema; @@ -39,7 +39,7 @@ use datafusion::{ }; use datafusion_common::exec_datafusion_err; use datafusion_proto_common::from_proto::parse_proto_fields_to_fields; -use groups_accumulator::{FFI_GroupsAccumulator, ForeignGroupsAccumulator}; +use groups_accumulator::FFI_GroupsAccumulator; use std::hash::{Hash, Hasher}; use std::{ffi::c_void, sync::Arc}; @@ -145,6 +145,11 @@ pub struct FFI_AggregateUDF { /// Internal data. This is only to be accessed by the provider of the udaf. /// A [`ForeignAggregateUDF`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_AggregateUDF {} @@ -361,6 +366,7 @@ impl From> for FFI_AggregateUDF { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -400,18 +406,22 @@ impl Hash for ForeignAggregateUDF { } } -impl TryFrom<&FFI_AggregateUDF> for ForeignAggregateUDF { +impl TryFrom<&FFI_AggregateUDF> for Arc { type Error = DataFusionError; fn try_from(udaf: &FFI_AggregateUDF) -> Result { + if (udaf.library_marker_id)() == crate::get_library_marker_id() { + return Ok(Arc::clone(unsafe { udaf.inner().inner() })); + } + let signature = Signature::user_defined((&udaf.volatility).into()); let aliases = udaf.aliases.iter().map(|s| s.to_string()).collect(); - Ok(Self { + Ok(Arc::new(ForeignAggregateUDF { udaf: udaf.clone(), signature, aliases, - }) + })) } } @@ -453,9 +463,8 @@ impl AggregateUDFImpl for ForeignAggregateUDF { fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { let args = acc_args.try_into()?; unsafe { - df_result!((self.udaf.accumulator)(&self.udaf, args)).map(|accum| { - Box::new(ForeignAccumulator::from(accum)) as Box - }) + df_result!((self.udaf.accumulator)(&self.udaf, args)) + .map(>::from) } } @@ -517,12 +526,8 @@ impl AggregateUDFImpl for ForeignAggregateUDF { let args = FFI_AccumulatorArgs::try_from(args)?; unsafe { - df_result!((self.udaf.create_groups_accumulator)(&self.udaf, args)).map( - |accum| { - Box::new(ForeignGroupsAccumulator::from(accum)) - as Box - }, - ) + df_result!((self.udaf.create_groups_accumulator)(&self.udaf, args)) + .map(>::from) } } @@ -536,9 +541,8 @@ impl AggregateUDFImpl for ForeignAggregateUDF { ) -> Result> { let args = args.try_into()?; unsafe { - df_result!((self.udaf.create_sliding_accumulator)(&self.udaf, args)).map( - |accum| Box::new(ForeignAccumulator::from(accum)) as Box, - ) + df_result!((self.udaf.create_sliding_accumulator)(&self.udaf, args)) + .map(>::from) } } @@ -554,10 +558,10 @@ impl AggregateUDFImpl for ForeignAggregateUDF { .into_option(); let result = result - .map(|func| ForeignAggregateUDF::try_from(&func)) + .map(|func| >::try_from(&func)) .transpose()?; - Ok(result.map(|func| Arc::new(func) as Arc)) + Ok(result) } } @@ -613,6 +617,7 @@ impl From for FFI_AggregateOrderSensitivity { #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::Schema; use datafusion::{ common::create_array, functions_aggregate::sum::Sum, @@ -622,8 +627,6 @@ mod tests { use std::any::Any; use std::collections::HashMap; - use super::*; - #[derive(Default, Debug, Hash, Eq, PartialEq)] struct SumWithCopiedMetadata { inner: Sum, @@ -661,10 +664,11 @@ mod tests { ) -> Result { let original_udaf = Arc::new(AggregateUDF::from(original_udaf)); - let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + let mut local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + local_udaf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?; - Ok(foreign_udaf.into()) + let foreign_udaf: Arc = (&local_udaf).try_into()?; + Ok(AggregateUDF::new_from_shared_impl(foreign_udaf)) } #[test] @@ -674,11 +678,12 @@ mod tests { let original_udaf = Arc::new(AggregateUDF::from(original_udaf)); // Convert to FFI format - let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + let mut local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + local_udaf.library_marker_id = crate::mock_foreign_marker_id; // Convert back to native format - let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?; - let foreign_udaf: AggregateUDF = foreign_udaf.into(); + let foreign_udaf: Arc = (&local_udaf).try_into()?; + let foreign_udaf = AggregateUDF::new_from_shared_impl(foreign_udaf); assert_eq!(original_name, foreign_udaf.name()); Ok(()) @@ -731,8 +736,8 @@ mod tests { let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); // Convert back to native format - let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?; - let foreign_udaf: AggregateUDF = foreign_udaf.into(); + let foreign_udaf: Arc = (&local_udaf).try_into()?; + let foreign_udaf = AggregateUDF::new_from_shared_impl(foreign_udaf); let metadata: HashMap = [("a_key".to_string(), "a_value".to_string())] @@ -815,4 +820,26 @@ mod tests { test_round_trip_order_sensitivity(AggregateOrderSensitivity::SoftRequirement); test_round_trip_order_sensitivity(AggregateOrderSensitivity::Beneficial); } + + #[test] + fn test_ffi_udaf_local_bypass() -> Result<()> { + let original_udaf = Sum::new(); + let original_udaf = Arc::new(AggregateUDF::from(original_udaf)); + + let mut ffi_udaf = FFI_AggregateUDF::from(original_udaf); + + // Verify local libraries can be downcast to their original + let foreign_udaf: Arc = (&ffi_udaf).try_into()?; + assert!(foreign_udaf.as_any().downcast_ref::().is_some()); + + // Verify different library markers generate foreign providers + ffi_udaf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udaf: Arc = (&ffi_udaf).try_into()?; + assert!(foreign_udaf + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 5e59cfc5ecb0..b90cc267e0bd 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -44,6 +44,7 @@ use datafusion::{ ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, }, }; +use datafusion_common::internal_err; use return_type_args::{ FFI_ReturnFieldArgs, ForeignReturnFieldArgs, ForeignReturnFieldArgsOwned, }; @@ -66,15 +67,7 @@ pub struct FFI_ScalarUDF { /// FFI equivalent to the `volatility` of a [`ScalarUDF`] pub volatility: FFI_Volatility, - /// Determines the return type of the underlying [`ScalarUDF`] based on the - /// argument types. - pub return_type: unsafe extern "C" fn( - udf: &Self, - arg_types: RVec, - ) -> RResult, - - /// Determines the return info of the underlying [`ScalarUDF`]. Either this - /// or return_type may be implemented on a UDF. + /// Determines the return info of the underlying [`ScalarUDF`]. pub return_field_from_args: unsafe extern "C" fn( udf: &Self, args: FFI_ReturnFieldArgs, @@ -114,6 +107,11 @@ pub struct FFI_ScalarUDF { /// Internal data. This is only to be accessed by the provider of the udf. /// A [`ForeignScalarUDF`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_ScalarUDF {} @@ -123,34 +121,22 @@ pub struct ScalarUDFPrivateData { pub udf: Arc, } -unsafe extern "C" fn return_type_fn_wrapper( - udf: &FFI_ScalarUDF, - arg_types: RVec, -) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); - - let return_type = udf - .return_type(&arg_types) - .and_then(|v| FFI_ArrowSchema::try_from(v).map_err(DataFusionError::from)) - .map(WrappedSchema); - - rresult!(return_type) +impl FFI_ScalarUDF { + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ScalarUDFPrivateData; + unsafe { &(*private_data).udf } + } } unsafe extern "C" fn return_field_from_args_fn_wrapper( udf: &FFI_ScalarUDF, args: FFI_ReturnFieldArgs, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - let args: ForeignReturnFieldArgsOwned = rresult_return!((&args).try_into()); let args_ref: ForeignReturnFieldArgs = (&args).into(); let return_type = udf + .inner() .return_field_from_args((&args_ref).into()) .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(DataFusionError::from)) .map(WrappedSchema); @@ -162,12 +148,10 @@ unsafe extern "C" fn coerce_types_fn_wrapper( udf: &FFI_ScalarUDF, arg_types: RVec, ) -> RResult, RString> { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); - let return_types = rresult_return!(data_types_with_scalar_udf(&arg_types, udf)); + let return_types = + rresult_return!(data_types_with_scalar_udf(&arg_types, udf.inner())); rresult!(vec_datatype_to_rvec_wrapped(&return_types)) } @@ -179,9 +163,6 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper( number_rows: usize, return_field: WrappedSchema, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - let args = args .into_iter() .map(|arr| { @@ -213,6 +194,7 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper( }; let result = rresult_return!(udf + .inner() .invoke_with_args(args) .and_then(|r| r.to_array(number_rows))); @@ -257,12 +239,12 @@ impl From> for FFI_ScalarUDF { volatility, short_circuits, invoke_with_args: invoke_with_args_fn_wrapper, - return_type: return_type_fn_wrapper, return_field_from_args: return_field_from_args_fn_wrapper, coerce_types: coerce_types_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -321,21 +303,25 @@ impl Hash for ForeignScalarUDF { } } -impl TryFrom<&FFI_ScalarUDF> for ForeignScalarUDF { +impl TryFrom<&FFI_ScalarUDF> for Arc { type Error = DataFusionError; fn try_from(udf: &FFI_ScalarUDF) -> Result { - let name = udf.name.to_owned().into(); - let signature = Signature::user_defined((&udf.volatility).into()); - - let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); - - Ok(Self { - name, - udf: udf.clone(), - aliases, - signature, - }) + if (udf.library_marker_id)() == crate::get_library_marker_id() { + Ok(Arc::clone(udf.inner().inner())) + } else { + let name = udf.name.to_owned().into(); + let signature = Signature::user_defined((&udf.volatility).into()); + + let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); + + Ok(Arc::new(ForeignScalarUDF { + name, + udf: udf.clone(), + aliases, + signature, + })) + } } } @@ -352,14 +338,8 @@ impl ScalarUDFImpl for ForeignScalarUDF { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - let arg_types = vec_datatype_to_rvec_wrapped(arg_types)?; - - let result = unsafe { (self.udf.return_type)(&self.udf, arg_types) }; - - let result = df_result!(result); - - result.and_then(|r| (&r.0).try_into().map_err(DataFusionError::from)) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("ForeignScalarUDF implements return_field_from_args instead.") } fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { @@ -455,12 +435,36 @@ mod tests { let original_udf = datafusion::functions::math::abs::AbsFunc::new(); let original_udf = Arc::new(ScalarUDF::from(original_udf)); - let local_udf: FFI_ScalarUDF = Arc::clone(&original_udf).into(); + let mut local_udf: FFI_ScalarUDF = Arc::clone(&original_udf).into(); + local_udf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udf: ForeignScalarUDF = (&local_udf).try_into()?; + let foreign_udf: Arc = (&local_udf).try_into()?; assert_eq!(original_udf.name(), foreign_udf.name()); Ok(()) } + + #[test] + fn test_ffi_udf_local_bypass() -> Result<()> { + use datafusion::functions::math::abs::AbsFunc; + let original_udf = AbsFunc::new(); + let original_udf = Arc::new(ScalarUDF::from(original_udf)); + + let mut ffi_udf = FFI_ScalarUDF::from(original_udf); + + // Verify local libraries can be downcast to their original + let foreign_udf: Arc = (&ffi_udf).try_into()?; + assert!(foreign_udf.as_any().downcast_ref::().is_some()); + + // Verify different library markers generate foreign providers + ffi_udf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udf: Arc = (&ffi_udf).try_into()?; + assert!(foreign_udf + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index edd5273c70a8..e603b9234c33 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -36,10 +36,7 @@ use datafusion_proto::{ use prost::Message; use tokio::runtime::Handle; -use crate::{ - df_result, rresult_return, - table_provider::{FFI_TableProvider, ForeignTableProvider}, -}; +use crate::{df_result, rresult_return, table_provider::FFI_TableProvider}; /// A stable struct for sharing a [`TableFunctionImpl`] across FFI boundaries. #[repr(C)] @@ -63,6 +60,11 @@ pub struct FFI_TableFunction { /// Internal data. This is only to be accessed by the provider of the udtf. /// A [`ForeignTableFunction`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_TableFunction {} @@ -131,6 +133,7 @@ impl FFI_TableFunction { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -147,6 +150,7 @@ impl From> for FFI_TableFunction { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -169,9 +173,13 @@ pub struct ForeignTableFunction(FFI_TableFunction); unsafe impl Send for ForeignTableFunction {} unsafe impl Sync for ForeignTableFunction {} -impl From for ForeignTableFunction { +impl From for Arc { fn from(value: FFI_TableFunction) -> Self { - Self(value) + if (value.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(value.inner()) + } else { + Arc::new(ForeignTableFunction(value)) + } } } @@ -186,26 +194,26 @@ impl TableFunctionImpl for ForeignTableFunction { let table_provider = unsafe { (self.0.call)(&self.0, filters_serialized) }; let table_provider = df_result!(table_provider)?; - let table_provider: ForeignTableProvider = (&table_provider).into(); + let table_provider: Arc = (&table_provider).into(); - Ok(Arc::new(table_provider)) + Ok(table_provider) } } #[cfg(test)] mod tests { + use super::*; use arrow::{ array::{ record_batch, ArrayRef, Float64Array, RecordBatch, StringArray, UInt64Array, }, datatypes::{DataType, Field, Schema}, }; + use datafusion::logical_expr::ptr_eq::arc_ptr_eq; use datafusion::{ catalog::MemTable, common::exec_err, prelude::lit, scalar::ScalarValue, }; - use super::*; - #[derive(Debug)] struct TestUDTF {} @@ -288,10 +296,11 @@ mod tests { async fn test_round_trip_udtf() -> Result<()> { let original_udtf = Arc::new(TestUDTF {}) as Arc; - let local_udtf: FFI_TableFunction = + let mut local_udtf: FFI_TableFunction = FFI_TableFunction::new(Arc::clone(&original_udtf), None); + local_udtf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udf: ForeignTableFunction = local_udtf.into(); + let foreign_udf: Arc = local_udtf.into(); let table = foreign_udf.call(&[lit(6_u64), lit("one"), lit(2.0), lit(3_u64)])?; @@ -317,4 +326,22 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udtf_local_bypass() -> Result<()> { + let original_udtf = Arc::new(TestUDTF {}) as Arc; + + let mut ffi_udtf = FFI_TableFunction::from(Arc::clone(&original_udtf)); + + // Verify local libraries can be downcast to their original + let foreign_udtf: Arc = ffi_udtf.clone().into(); + assert!(arc_ptr_eq(&original_udtf, &foreign_udtf)); + + // Verify different library markers generate foreign providers + ffi_udtf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udtf: Arc = ffi_udtf.into(); + assert!(!arc_ptr_eq(&original_udtf, &foreign_udtf)); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 9f56e2d4788b..d961ffa5b59b 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -39,7 +39,7 @@ use datafusion::{ logical_expr::{Signature, WindowUDF, WindowUDFImpl}, }; use datafusion_common::exec_err; -use partition_evaluator::{FFI_PartitionEvaluator, ForeignPartitionEvaluator}; +use partition_evaluator::FFI_PartitionEvaluator; use partition_evaluator_args::{ FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs, }; @@ -105,6 +105,11 @@ pub struct FFI_WindowUDF { /// Internal data. This is only to be accessed by the provider of the udf. /// A [`ForeignWindowUDF`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_WindowUDF {} @@ -201,6 +206,7 @@ unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } @@ -230,6 +236,7 @@ impl From> for FFI_WindowUDF { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -270,21 +277,25 @@ impl Hash for ForeignWindowUDF { } } -impl TryFrom<&FFI_WindowUDF> for ForeignWindowUDF { +impl TryFrom<&FFI_WindowUDF> for Arc { type Error = DataFusionError; fn try_from(udf: &FFI_WindowUDF) -> Result { - let name = udf.name.to_owned().into(); - let signature = Signature::user_defined((&udf.volatility).into()); - - let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); - - Ok(Self { - name, - udf: udf.clone(), - aliases, - signature, - }) + if (udf.library_marker_id)() == crate::get_library_marker_id() { + Ok(Arc::clone(unsafe { udf.inner().inner() })) + } else { + let name = udf.name.to_owned().into(); + let signature = Signature::user_defined((&udf.volatility).into()); + + let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); + + Ok(Arc::new(ForeignWindowUDF { + name, + udf: udf.clone(), + aliases, + signature, + })) + } } } @@ -322,10 +333,7 @@ impl WindowUDFImpl for ForeignWindowUDF { (self.udf.partition_evaluator)(&self.udf, args) }; - df_result!(evaluator).map(|evaluator| { - Box::new(ForeignPartitionEvaluator::from(evaluator)) - as Box - }) + df_result!(evaluator).map(>::from) } fn field(&self, field_args: WindowUDFFieldArgs) -> Result { @@ -400,10 +408,11 @@ mod tests { ) -> datafusion::common::Result { let original_udwf = Arc::new(WindowUDF::from(original_udwf)); - let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + let mut local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + local_udwf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?; - Ok(foreign_udwf.into()) + let foreign_udwf: Arc = (&local_udwf).try_into()?; + Ok(WindowUDF::new_from_shared_impl(foreign_udwf)) } #[test] @@ -412,11 +421,12 @@ mod tests { let original_name = original_udwf.name().to_owned(); // Convert to FFI format - let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + let mut local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + local_udwf.library_marker_id = crate::mock_foreign_marker_id; // Convert back to native format - let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?; - let foreign_udwf: WindowUDF = foreign_udwf.into(); + let foreign_udwf: Arc = (&local_udwf).try_into()?; + let foreign_udwf = WindowUDF::new_from_shared_impl(foreign_udwf); assert_eq!(original_name, foreign_udwf.name()); Ok(()) @@ -450,4 +460,28 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udwf_local_bypass() -> datafusion_common::Result<()> { + let original_udwf = Arc::new(WindowUDF::from(WindowShift::lag())); + + let mut ffi_udwf = FFI_WindowUDF::from(original_udwf); + + // Verify local libraries can be downcast to their original + let foreign_udwf: Arc = (&ffi_udwf).try_into()?; + assert!(foreign_udwf + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).try_into()?; + assert!(foreign_udwf + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index 14cf23b919aa..448db1f8ae1a 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -76,6 +76,11 @@ pub struct FFI_PartitionEvaluator { /// Internal data. This is only to be accessed by the provider of the evaluator. /// A [`ForeignPartitionEvaluator`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_PartitionEvaluator {} @@ -170,9 +175,11 @@ unsafe extern "C" fn get_range_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(evaluator: &mut FFI_PartitionEvaluator) { - let private_data = - Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); - drop(private_data); + if !evaluator.private_data.is_null() { + let private_data = + Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); + drop(private_data); + } } impl From> for FFI_PartitionEvaluator { @@ -195,6 +202,7 @@ impl From> for FFI_PartitionEvaluator { uses_window_frame, release: release_fn_wrapper, private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -219,9 +227,19 @@ pub struct ForeignPartitionEvaluator { unsafe impl Send for ForeignPartitionEvaluator {} unsafe impl Sync for ForeignPartitionEvaluator {} -impl From for ForeignPartitionEvaluator { - fn from(evaluator: FFI_PartitionEvaluator) -> Self { - Self { evaluator } +impl From for Box { + fn from(mut evaluator: FFI_PartitionEvaluator) -> Self { + if (evaluator.library_marker_id)() == crate::get_library_marker_id() { + unsafe { + let private_data = Box::from_raw( + evaluator.private_data as *mut PartitionEvaluatorPrivateData, + ); + evaluator.private_data = std::ptr::null_mut(); + private_data.evaluator + } + } else { + Box::new(ForeignPartitionEvaluator { evaluator }) + } } } @@ -317,4 +335,54 @@ impl PartitionEvaluator for ForeignPartitionEvaluator { } #[cfg(test)] -mod tests {} +mod tests { + use crate::udwf::partition_evaluator::{ + FFI_PartitionEvaluator, ForeignPartitionEvaluator, + }; + use arrow::array::ArrayRef; + use datafusion::logical_expr::PartitionEvaluator; + + #[derive(Debug)] + struct TestPartitionEvaluator {} + + impl PartitionEvaluator for TestPartitionEvaluator { + fn evaluate_all( + &mut self, + values: &[ArrayRef], + _num_rows: usize, + ) -> datafusion_common::Result { + Ok(values[0].to_owned()) + } + } + + #[test] + fn test_ffi_partition_evaluator_local_bypass_inner() -> datafusion_common::Result<()> + { + let original_accum = TestPartitionEvaluator {}; + let boxed_accum: Box = Box::new(original_accum); + + let ffi_accum: FFI_PartitionEvaluator = boxed_accum.into(); + + // Verify local libraries can be downcast to their original + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn PartitionEvaluator + as *const TestPartitionEvaluator); + assert!(!concrete.uses_window_frame()); + } + + // Verify different library markers generate foreign accumulator + let original_accum = TestPartitionEvaluator {}; + let boxed_accum: Box = Box::new(original_accum); + let mut ffi_accum: FFI_PartitionEvaluator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn PartitionEvaluator + as *const ForeignPartitionEvaluator); + assert!(!concrete.uses_window_frame()); + } + + Ok(()) + } +} diff --git a/datafusion/ffi/tests/ffi_catalog.rs b/datafusion/ffi/tests/ffi_catalog.rs index b63d8cbd631b..c45a62ee5093 100644 --- a/datafusion/ffi/tests/ffi_catalog.rs +++ b/datafusion/ffi/tests/ffi_catalog.rs @@ -19,10 +19,9 @@ /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { + use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::prelude::SessionContext; use datafusion_common::DataFusionError; - use datafusion_ffi::catalog_provider::ForeignCatalogProvider; - use datafusion_ffi::catalog_provider_list::ForeignCatalogProviderList; use datafusion_ffi::tests::utils::get_module; use std::sync::Arc; @@ -37,10 +36,10 @@ mod tests { "External catalog provider failed to implement create_catalog" .to_string(), ))?(); - let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); let ctx = SessionContext::default(); - let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog)); + let _ = ctx.register_catalog("fruit", foreign_catalog); let df = ctx.table("fruit.apple.purchases").await?; @@ -64,10 +63,11 @@ mod tests { "External catalog provider failed to implement create_catalog_list" .to_string(), ))?(); - let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into(); + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); let ctx = SessionContext::default(); - ctx.register_catalog_list(Arc::new(foreign_catalog_list)); + ctx.register_catalog_list(foreign_catalog_list); let df = ctx.table("blue.apple.purchases").await?; diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index 7b4d1b1e350a..216d6576a821 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -19,9 +19,9 @@ /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { + use datafusion::catalog::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; - use datafusion_ffi::table_provider::ForeignTableProvider; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; use std::sync::Arc; @@ -41,13 +41,13 @@ mod tests { )?(synchronous); // In order to access the table provider within this executable, we need to - // turn it into a `ForeignTableProvider`. - let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into(); + // turn it into a `TableProvider`. + let foreign_table_provider: Arc = (&ffi_table_provider).into(); let ctx = SessionContext::new(); // Display the data to show the full cycle works. - ctx.register_table("external_table", Arc::new(foreign_table_provider))?; + ctx.register_table("external_table", foreign_table_provider)?; let df = ctx.table("external_table").await?; let results = df.collect().await?; diff --git a/datafusion/ffi/tests/ffi_udaf.rs b/datafusion/ffi/tests/ffi_udaf.rs index ffd99bac62ec..9d1823ded076 100644 --- a/datafusion/ffi/tests/ffi_udaf.rs +++ b/datafusion/ffi/tests/ffi_udaf.rs @@ -22,11 +22,11 @@ mod tests { use arrow::array::Float64Array; use datafusion::common::record_batch; use datafusion::error::{DataFusionError, Result}; - use datafusion::logical_expr::AggregateUDF; + use datafusion::logical_expr::{AggregateUDF, AggregateUDFImpl}; use datafusion::prelude::{col, SessionContext}; + use std::sync::Arc; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udaf::ForeignAggregateUDF; #[tokio::test] async fn test_ffi_udaf() -> Result<()> { @@ -38,9 +38,9 @@ mod tests { .ok_or(DataFusionError::NotImplemented( "External table provider failed to implement create_udaf".to_string(), ))?(); - let foreign_sum_func: ForeignAggregateUDF = (&ffi_sum_func).try_into()?; + let foreign_sum_func: Arc = (&ffi_sum_func).try_into()?; - let udaf: AggregateUDF = foreign_sum_func.into(); + let udaf = AggregateUDF::new_from_shared_impl(foreign_sum_func); let ctx = SessionContext::default(); let record_batch = record_batch!( @@ -80,9 +80,10 @@ mod tests { .ok_or(DataFusionError::NotImplemented( "External table provider failed to implement create_udaf".to_string(), ))?(); - let foreign_stddev_func: ForeignAggregateUDF = (&ffi_stddev_func).try_into()?; + let foreign_stddev_func: Arc = + (&ffi_stddev_func).try_into()?; - let udaf: AggregateUDF = foreign_stddev_func.into(); + let udaf = AggregateUDF::new_from_shared_impl(foreign_stddev_func); let ctx = SessionContext::default(); let record_batch = record_batch!( diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs index fd6a84bcf5b0..d50739be9975 100644 --- a/datafusion/ffi/tests/ffi_udf.rs +++ b/datafusion/ffi/tests/ffi_udf.rs @@ -19,16 +19,15 @@ /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { - use arrow::datatypes::DataType; use datafusion::common::record_batch; use datafusion::error::{DataFusionError, Result}; - use datafusion::logical_expr::ScalarUDF; + use datafusion::logical_expr::{ScalarUDF, ScalarUDFImpl}; use datafusion::prelude::{col, SessionContext}; + use std::sync::Arc; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udf::ForeignScalarUDF; /// This test validates that we can load an external module and use a scalar /// udf defined in it via the foreign function interface. In this case we are @@ -44,9 +43,9 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?; + let foreign_abs_func: Arc = (&ffi_abs_func).try_into()?; - let udf: ScalarUDF = foreign_abs_func.into(); + let udf = ScalarUDF::new_from_shared_impl(foreign_abs_func); let ctx = SessionContext::default(); let df = ctx.read_batch(create_record_batch(-5, 5))?; @@ -82,9 +81,9 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?; + let foreign_abs_func: Arc = (&ffi_abs_func).try_into()?; - let udf: ScalarUDF = foreign_abs_func.into(); + let udf = ScalarUDF::new_from_shared_impl(foreign_abs_func); let ctx = SessionContext::default(); let df = ctx.read_batch(create_record_batch(-5, 5))?; diff --git a/datafusion/ffi/tests/ffi_udtf.rs b/datafusion/ffi/tests/ffi_udtf.rs index 8c1c64a092e1..8339b350cd64 100644 --- a/datafusion/ffi/tests/ffi_udtf.rs +++ b/datafusion/ffi/tests/ffi_udtf.rs @@ -23,11 +23,11 @@ mod tests { use std::sync::Arc; use arrow::array::{create_array, ArrayRef}; + use datafusion::catalog::TableFunctionImpl; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udtf::ForeignTableFunction; /// This test validates that we can load an external module and use a scalar /// udf defined in it via the foreign function interface. In this case we are @@ -42,12 +42,10 @@ mod tests { "External table function provider failed to implement create_table_function" .to_string(), ))?(); - let foreign_table_func: ForeignTableFunction = ffi_table_func.into(); - - let udtf = Arc::new(foreign_table_func); + let foreign_table_func: Arc = ffi_table_func.into(); let ctx = SessionContext::default(); - ctx.register_udtf("my_range", udtf); + ctx.register_udtf("my_range", foreign_table_func); let result = ctx .sql("SELECT * FROM my_range(5)") diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs index 18ffd0c5bcb7..e87c65ca8907 100644 --- a/datafusion/ffi/tests/ffi_udwf.rs +++ b/datafusion/ffi/tests/ffi_udwf.rs @@ -22,11 +22,11 @@ mod tests { use arrow::array::{create_array, ArrayRef}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::expr::Sort; - use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF}; + use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF, WindowUDFImpl}; use datafusion::prelude::SessionContext; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udwf::ForeignWindowUDF; + use std::sync::Arc; #[tokio::test] async fn test_rank_udwf() -> Result<()> { @@ -39,9 +39,9 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_rank_func: ForeignWindowUDF = (&ffi_rank_func).try_into()?; + let foreign_rank_func: Arc = (&ffi_rank_func).try_into()?; - let udwf: WindowUDF = foreign_rank_func.into(); + let udwf = WindowUDF::new_from_shared_impl(foreign_rank_func); let ctx = SessionContext::default(); let df = ctx.read_batch(create_record_batch(-5, 5))?; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index e116bfffeda6..e6951baec109 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -152,6 +152,51 @@ Instead of silently succeeding. The remove API no longer requires a mutable instance +### FFI crate updates + +Many of the structs in the `datafusion-ffi` crate have been updated to allow easier +conversion to the underlying trait types they represent. This simplifies some code +paths, but also provides an additional improvement in cases where library code goes +through a round trip via the foreign function interface. + +To update your code, suppose you have a `FFI_SchemaProvider` called `ffi_provider` +and you wish to use this as a `SchemaProvider`. In the old approach you would do +something like: + +```rust,ignore + let foreign_provider: ForeignSchemaProvider = ffi_provider.into(); + let foreign_provider = Arc::new(foreign_provider) as Arc; +``` + +This code should now be written as: + +```rust,ignore + let foreign_provider: Arc = ffi_provider.into(); + let foreign_provider = foreign_provider as Arc; +``` + +For the case of user defined functions, the updates are similar but you +may need to change the way you call the creation of the `ScalarUDF`. +Aggregate and window functions follow the same pattern. + +Previously you may write: + +```rust,ignore + let foreign_udf: ForeignScalarUDF = ffi_udf.try_into()?; + let foreign_udf: ScalarUDF = foreign_udf.into(); +``` + +Instead this should now be: + +```rust,ignore + let foreign_udf: Arc = ffi_udf.try_into()?; + let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf); +``` + +Additionally, the FFI structure for Scalar UDF's no longer contains a +`return_type` call. This code was not used since the `ForeignScalarUDF` +struct implements the `return_field_from_args` instead. + ## DataFusion `51.0.0` ### `arrow` / `parquet` updated to 57.0.0