From 22d3e53bcf27025d672454b6790a7916f2fe2318 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 15:44:32 -0500 Subject: [PATCH 01/22] Add discussion about library marker ID --- datafusion/ffi/README.md | 30 ++++++++++++++++++++++++++++++ datafusion/ffi/src/lib.rs | 18 ++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/datafusion/ffi/README.md b/datafusion/ffi/README.md index 72070984f931..bf7fe2dfb538 100644 --- a/datafusion/ffi/README.md +++ b/datafusion/ffi/README.md @@ -101,6 +101,36 @@ In this crate we have a variety of structs which closely mimic the behavior of their internal counterparts. To see detailed notes about how to use them, see the example in `FFI_TableProvider`. +## Library Marker ID + +When reviewing the code, many of the structs in this crate contain a call to +a `library_maker_id`. The purpose of this call is to determine if a library is +accessing _local_ code through the FFI structs. Consider this example: you have +a `primary` program that exposes functions to create a schema provider. You +have a `secondary` library that exposes a function to create a catalog provider +and the `secondary` library uses the schema provider of the `primary` program. +From the point of view of the `secondary` library, the schema provider is +foreign code. + +Now when we register the `secondary` library with the `primary` program as a +catalog provider and we make calls to get a schema, the `secondary` library +will return a FFI wrapped schema provider back to the `primary` program. In +this case that schema provider is actually local code to the `primary` program +except that it is wrapped in the FFI code! + +We work around this by the `library_marker_id` calls. What this does is it +creates a global variable within each library and returns a `u64` address +of that library. This is guaranteed to be unique for every library that contains +FFI code. By comparing these `u64` addresses we can determine if a FFI struct +is local or foreign. + +In our example of the schema provider, if you were to make a call in your +primary program to get the schema provider, it would reach out to the foreign +catalog provider and send back a `FFI_SchemaProvider` object. By then +comparing the `library_marker_id` of this object to the `primary` program, we +determine it is local code. This means it is safe to access the underlying +private data. + [apache datafusion]: https://datafusion.apache.org/ [api docs]: http://docs.rs/datafusion-ffi/latest [rust abi]: https://doc.rust-lang.org/reference/abi.html diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 39eb7babd90d..0e03c8aaee71 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -58,5 +58,23 @@ pub extern "C" fn version() -> u64 { version.major } +static LIBRARY_MARKER: u8 = 0; + +/// This utility is used to determine if two FFI structs are within +/// the same library. It is possible that the interplay between +/// foreign and local functions calls create one FFI struct that +/// references another. It is helpful to determine if a foreign +/// struct is truly foreign or in the same library. If we are in the +/// same library, then we can access the underlying types directly. +/// +/// This function works by checking the address of the library +/// marker. Each library that implements the FFI code will have +/// a different address for the marker. By checking the marker +/// address we can determine if a struct is truly Foreign or is +/// actually within the same originating library. +pub extern "C" fn get_library_marker_id() -> u64 { + &LIBRARY_MARKER as *const u8 as u64 +} + #[cfg(doctest)] doc_comment::doctest!("../README.md", readme_example_test); From 31fc07002ae79aa5a13f5ed6608a395ccf7fe2cf Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 15:56:20 -0500 Subject: [PATCH 02/22] Implement library marker ID for catalog provider --- datafusion/ffi/Cargo.toml | 1 + datafusion/ffi/src/catalog_provider.rs | 20 ++++++++++++++++---- datafusion/ffi/src/catalog_provider_list.rs | 2 +- datafusion/ffi/src/lib.rs | 8 ++++++++ datafusion/ffi/tests/ffi_catalog.rs | 6 +++--- 5 files changed, 29 insertions(+), 8 deletions(-) diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 3ac08180fb68..2cf040f9bd9c 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -59,6 +59,7 @@ tokio = { workspace = true } [dev-dependencies] doc-comment = { workspace = true } +datafusion = { workspace = true, default-features = false, features = ["sql"] } [features] integration-tests = [] diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index d279951783b4..d20e728aca31 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -70,6 +70,10 @@ pub struct FFI_CatalogProvider { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignCatalogProvider`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_CatalogProvider {} @@ -169,6 +173,7 @@ unsafe extern "C" fn clone_fn_wrapper( release: release_fn_wrapper, version: super::version, private_data, + library_marker_id: crate::get_library_marker_id, } } @@ -195,6 +200,7 @@ impl FFI_CatalogProvider { release: release_fn_wrapper, version: super::version, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -209,9 +215,14 @@ pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider); unsafe impl Send for ForeignCatalogProvider {} unsafe impl Sync for ForeignCatalogProvider {} -impl From<&FFI_CatalogProvider> for ForeignCatalogProvider { +impl From<&FFI_CatalogProvider> for Arc { fn from(provider: &FFI_CatalogProvider) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(unsafe { provider.inner() }); + } + + Arc::new(ForeignCatalogProvider(provider.clone())) + as Arc } } @@ -298,9 +309,10 @@ mod tests { .unwrap() .is_none()); - let ffi_catalog = FFI_CatalogProvider::new(catalog, None); + let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None); + ffi_catalog.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); let prior_schema_names = foreign_catalog.schema_names(); assert_eq!(prior_schema_names.len(), 1); diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index b09f06d318c1..6cb8ca6e1a16 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -94,7 +94,7 @@ unsafe extern "C" fn register_catalog_fn_wrapper( ) -> ROption { let runtime = provider.runtime(); let provider = provider.inner(); - let catalog = Arc::new(ForeignCatalogProvider::from(catalog)); + let catalog: Arc = catalog.into(); provider .register_catalog(name.into(), catalog) diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 0e03c8aaee71..872d6d08fae8 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -76,5 +76,13 @@ pub extern "C" fn get_library_marker_id() -> u64 { &LIBRARY_MARKER as *const u8 as u64 } +/// For unit testing in this crate we need to trick the providers +/// into thinking we have a foreign call. We do this by overwriting +/// their `library_marker_id` function to return a different value. +#[cfg(test)] +pub(crate) extern "C" fn mock_foreign_marker_id() -> u64 { + get_library_marker_id() + 1 +} + #[cfg(doctest)] doc_comment::doctest!("../README.md", readme_example_test); diff --git a/datafusion/ffi/tests/ffi_catalog.rs b/datafusion/ffi/tests/ffi_catalog.rs index b63d8cbd631b..92d29dbecc89 100644 --- a/datafusion/ffi/tests/ffi_catalog.rs +++ b/datafusion/ffi/tests/ffi_catalog.rs @@ -19,9 +19,9 @@ /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { + use datafusion::catalog::CatalogProvider; use datafusion::prelude::SessionContext; use datafusion_common::DataFusionError; - use datafusion_ffi::catalog_provider::ForeignCatalogProvider; use datafusion_ffi::catalog_provider_list::ForeignCatalogProviderList; use datafusion_ffi::tests::utils::get_module; use std::sync::Arc; @@ -37,10 +37,10 @@ mod tests { "External catalog provider failed to implement create_catalog" .to_string(), ))?(); - let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); let ctx = SessionContext::default(); - let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog)); + let _ = ctx.register_catalog("fruit", foreign_catalog); let df = ctx.table("fruit.apple.purchases").await?; From 54c9884e06f1ddd727dfbecfd9c364ce18f01ba7 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:01:14 -0500 Subject: [PATCH 03/22] Implement library marker ID for catalog provider list --- datafusion/ffi/src/catalog_provider_list.rs | 21 +++++++++++++++++---- datafusion/ffi/tests/ffi_catalog.rs | 8 ++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index 6cb8ca6e1a16..d2ab31ffb7de 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -58,6 +58,10 @@ pub struct FFI_CatalogProviderList { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignCatalogProviderList`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_CatalogProviderList {} @@ -138,6 +142,7 @@ unsafe extern "C" fn clone_fn_wrapper( release: release_fn_wrapper, version: super::version, private_data, + library_marker_id: crate::get_library_marker_id, } } @@ -163,6 +168,7 @@ impl FFI_CatalogProviderList { release: release_fn_wrapper, version: super::version, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -177,9 +183,14 @@ pub struct ForeignCatalogProviderList(FFI_CatalogProviderList); unsafe impl Send for ForeignCatalogProviderList {} unsafe impl Sync for ForeignCatalogProviderList {} -impl From<&FFI_CatalogProviderList> for ForeignCatalogProviderList { +impl From<&FFI_CatalogProviderList> for Arc { fn from(provider: &FFI_CatalogProviderList) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(unsafe { provider.inner() }); + } + + Arc::new(ForeignCatalogProviderList(provider.clone())) + as Arc } } @@ -248,9 +259,11 @@ mod tests { .register_catalog("prior_catalog".to_owned(), prior_catalog) .is_none()); - let ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None); + let mut ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None); + ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into(); + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); let prior_catalog_names = foreign_catalog_list.catalog_names(); assert_eq!(prior_catalog_names.len(), 1); diff --git a/datafusion/ffi/tests/ffi_catalog.rs b/datafusion/ffi/tests/ffi_catalog.rs index 92d29dbecc89..c45a62ee5093 100644 --- a/datafusion/ffi/tests/ffi_catalog.rs +++ b/datafusion/ffi/tests/ffi_catalog.rs @@ -19,10 +19,9 @@ /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { - use datafusion::catalog::CatalogProvider; + use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::prelude::SessionContext; use datafusion_common::DataFusionError; - use datafusion_ffi::catalog_provider_list::ForeignCatalogProviderList; use datafusion_ffi::tests::utils::get_module; use std::sync::Arc; @@ -64,10 +63,11 @@ mod tests { "External catalog provider failed to implement create_catalog_list" .to_string(), ))?(); - let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into(); + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); let ctx = SessionContext::default(); - ctx.register_catalog_list(Arc::new(foreign_catalog_list)); + ctx.register_catalog_list(foreign_catalog_list); let df = ctx.table("blue.apple.purchases").await?; From 7fd172418d66699c5eb1ff7fecbcb38aa691b35b Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:08:09 -0500 Subject: [PATCH 04/22] Implement library marker ID for schema provider --- datafusion/ffi/src/catalog_provider.rs | 2 +- datafusion/ffi/src/schema_provider.rs | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index d20e728aca31..ad3b070ee5fb 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -120,7 +120,7 @@ unsafe extern "C" fn register_schema_fn_wrapper( ) -> RResult, RString> { let runtime = provider.runtime(); let provider = provider.inner(); - let schema = Arc::new(ForeignSchemaProvider::from(schema)); + let schema: Arc = schema.into(); let returned_schema = rresult_return!(provider.register_schema(name.as_str(), schema)) diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index b5970d5881d6..66d4e289ce64 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -80,6 +80,10 @@ pub struct FFI_SchemaProvider { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignSchemaProvider`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_SchemaProvider {} @@ -191,6 +195,7 @@ unsafe extern "C" fn clone_fn_wrapper( register_table: register_table_fn_wrapper, deregister_table: deregister_table_fn_wrapper, table_exist: table_exist_fn_wrapper, + library_marker_id: crate::get_library_marker_id, } } @@ -220,6 +225,7 @@ impl FFI_SchemaProvider { register_table: register_table_fn_wrapper, deregister_table: deregister_table_fn_wrapper, table_exist: table_exist_fn_wrapper, + library_marker_id: crate::get_library_marker_id, } } } @@ -234,9 +240,14 @@ pub struct ForeignSchemaProvider(pub FFI_SchemaProvider); unsafe impl Send for ForeignSchemaProvider {} unsafe impl Sync for ForeignSchemaProvider {} -impl From<&FFI_SchemaProvider> for ForeignSchemaProvider { +impl From<&FFI_SchemaProvider> for Arc { fn from(provider: &FFI_SchemaProvider) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(unsafe { provider.inner() }); + } + + Arc::new(ForeignSchemaProvider(provider.clone())) + as Arc } } @@ -337,9 +348,10 @@ mod tests { .unwrap() .is_none()); - let ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, None); + let mut ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, None); + ffi_schema_provider.library_marker_id = crate::mock_foreign_marker_id; - let foreign_schema_provider: ForeignSchemaProvider = + let foreign_schema_provider: Arc = (&ffi_schema_provider).into(); let prior_table_names = foreign_schema_provider.table_names(); From b43e808724b02249386895aa80d2541bd24f39ee Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:24:30 -0500 Subject: [PATCH 05/22] Implement library marker ID for table provider --- .../ffi/ffi_module_loader/src/main.rs | 6 +- datafusion/ffi/src/schema_provider.rs | 4 +- datafusion/ffi/src/table_provider.rs | 72 +++++++++++-------- datafusion/ffi/src/udtf.rs | 9 +-- datafusion/ffi/tests/ffi_integration.rs | 6 +- 5 files changed, 52 insertions(+), 45 deletions(-) diff --git a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs index 6e376ca866e8..b0b24367ec2c 100644 --- a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs +++ b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs @@ -23,7 +23,7 @@ use datafusion::{ }; use abi_stable::library::{development_utils::compute_library_path, RootModule}; -use datafusion_ffi::table_provider::ForeignTableProvider; +use datafusion::datasource::TableProvider; use ffi_module_interface::TableProviderModuleRef; #[tokio::main] @@ -50,12 +50,12 @@ async fn main() -> Result<()> { // In order to access the table provider within this executable, we need to // turn it into a `ForeignTableProvider`. - let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into(); + let foreign_table_provider: Arc = (&ffi_table_provider).into(); let ctx = SessionContext::new(); // Display the data to show the full cycle works. - ctx.register_table("external_table", Arc::new(foreign_table_provider))?; + ctx.register_table("external_table", foreign_table_provider)?; let df = ctx.table("external_table").await?; df.show().await?; diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index 66d4e289ce64..71c48c09126f 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -285,9 +285,7 @@ impl SchemaProvider for ForeignSchemaProvider { let table: Option = df_result!((self.0.table)(&self.0, name.into()).await)?.into(); - let table = table.as_ref().map(|t| { - Arc::new(ForeignTableProvider::from(t)) as Arc - }); + let table = table.as_ref().map(>::from); Ok(table) } diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 890511997a70..f2b2614d18f1 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -156,6 +156,10 @@ pub struct FFI_TableProvider { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignExecutionPlan`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_TableProvider {} @@ -166,20 +170,26 @@ struct ProviderPrivateData { runtime: Option, } -unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; +impl FFI_TableProvider { + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ProviderPrivateData; + unsafe { &(*private_data).provider } + } + + fn runtime(&self) -> &Option { + let private_data = self.private_data as *const ProviderPrivateData; + unsafe { &(*private_data).runtime } + } +} - provider.schema().into() +unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema { + provider.inner().schema().into() } unsafe extern "C" fn table_type_fn_wrapper( provider: &FFI_TableProvider, ) -> FFI_TableType { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; - - provider.table_type().into() + provider.inner().table_type().into() } fn supports_filters_pushdown_internal( @@ -213,10 +223,7 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper( provider: &FFI_TableProvider, filters_serialized: RVec, ) -> RResult, RString> { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; - - supports_filters_pushdown_internal(provider, &filters_serialized) + supports_filters_pushdown_internal(provider.inner(), &filters_serialized) .map_err(|e| e.to_string().into()) .into() } @@ -228,10 +235,9 @@ unsafe extern "C" fn scan_fn_wrapper( filters_serialized: RVec, limit: ROption, ) -> FfiFuture> { - let private_data = provider.private_data as *mut ProviderPrivateData; - let internal_provider = &(*private_data).provider; + let runtime = provider.runtime().clone(); + let internal_provider = Arc::clone(provider.inner()); let session_config = session_config.clone(); - let runtime = &(*private_data).runtime; async move { let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); @@ -281,11 +287,10 @@ unsafe extern "C" fn insert_into_fn_wrapper( input: &FFI_ExecutionPlan, insert_op: FFI_InsertOp, ) -> FfiFuture> { - let private_data = provider.private_data as *mut ProviderPrivateData; - let internal_provider = &(*private_data).provider; + let runtime = provider.runtime().clone(); + let internal_provider = Arc::clone(provider.inner()); let session_config = session_config.clone(); let input = input.clone(); - let runtime = &(*private_data).runtime; async move { let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); @@ -320,11 +325,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) { } unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider { - let old_private_data = provider.private_data as *const ProviderPrivateData; - let runtime = (*old_private_data).runtime.clone(); + let runtime = provider.runtime().clone(); + let old_provider = Arc::clone(provider.inner()); let private_data = Box::into_raw(Box::new(ProviderPrivateData { - provider: Arc::clone(&(*old_private_data).provider), + provider: old_provider, runtime, })) as *mut c_void; @@ -338,6 +343,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table release: release_fn_wrapper, version: super::version, private_data, + library_marker_id: crate::get_library_marker_id, } } @@ -369,6 +375,7 @@ impl FFI_TableProvider { release: release_fn_wrapper, version: super::version, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -383,9 +390,13 @@ pub struct ForeignTableProvider(pub FFI_TableProvider); unsafe impl Send for ForeignTableProvider {} unsafe impl Sync for ForeignTableProvider {} -impl From<&FFI_TableProvider> for ForeignTableProvider { +impl From<&FFI_TableProvider> for Arc { fn from(provider: &FFI_TableProvider) -> Self { - Self(provider.clone()) + if (provider.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(provider.inner()) as Arc + } else { + Arc::new(ForeignTableProvider(provider.clone())) + } } } @@ -531,11 +542,12 @@ mod tests { let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?); - let ffi_provider = FFI_TableProvider::new(provider, true, None); + let mut ffi_provider = FFI_TableProvider::new(provider, true, None); + ffi_provider.library_marker_id = crate::mock_foreign_marker_id; - let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); + let foreign_table_provider: Arc = (&ffi_provider).into(); - ctx.register_table("t", Arc::new(foreign_table_provider))?; + ctx.register_table("t", foreign_table_provider)?; let df = ctx.table("t").await?; @@ -575,9 +587,9 @@ mod tests { let ffi_provider = FFI_TableProvider::new(provider, true, None); - let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); + let foreign_table_provider: Arc = (&ffi_provider).into(); - ctx.register_table("t", Arc::new(foreign_table_provider))?; + ctx.register_table("t", foreign_table_provider)?; let result = ctx .sql("INSERT INTO t VALUES (128.0);") @@ -621,9 +633,9 @@ mod tests { let ffi_provider = FFI_TableProvider::new(provider, true, None); - let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); + let foreign_table_provider: Arc = (&ffi_provider).into(); - ctx.register_table("t", Arc::new(foreign_table_provider))?; + ctx.register_table("t", foreign_table_provider)?; let result = ctx .sql("SELECT COUNT(*) as cnt FROM t") diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index edd5273c70a8..0abfab55cbb9 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -36,10 +36,7 @@ use datafusion_proto::{ use prost::Message; use tokio::runtime::Handle; -use crate::{ - df_result, rresult_return, - table_provider::{FFI_TableProvider, ForeignTableProvider}, -}; +use crate::{df_result, rresult_return, table_provider::FFI_TableProvider}; /// A stable struct for sharing a [`TableFunctionImpl`] across FFI boundaries. #[repr(C)] @@ -186,9 +183,9 @@ impl TableFunctionImpl for ForeignTableFunction { let table_provider = unsafe { (self.0.call)(&self.0, filters_serialized) }; let table_provider = df_result!(table_provider)?; - let table_provider: ForeignTableProvider = (&table_provider).into(); + let table_provider: Arc = (&table_provider).into(); - Ok(Arc::new(table_provider)) + Ok(table_provider) } } diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index 7b4d1b1e350a..c782a66fc71f 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -19,9 +19,9 @@ /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { + use datafusion::catalog::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; - use datafusion_ffi::table_provider::ForeignTableProvider; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; use std::sync::Arc; @@ -42,12 +42,12 @@ mod tests { // In order to access the table provider within this executable, we need to // turn it into a `ForeignTableProvider`. - let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into(); + let foreign_table_provider: Arc = (&ffi_table_provider).into(); let ctx = SessionContext::new(); // Display the data to show the full cycle works. - ctx.register_table("external_table", Arc::new(foreign_table_provider))?; + ctx.register_table("external_table", foreign_table_provider)?; let df = ctx.table("external_table").await?; let results = df.collect().await?; From d35b03f261d412cffce6cec3668dddf6e93d8a80 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:34:18 -0500 Subject: [PATCH 06/22] Implement library marker ID for execution plan --- datafusion/ffi/src/execution_plan.rs | 64 +++++++++++++++++----------- datafusion/ffi/src/table_provider.rs | 15 +++---- 2 files changed, 47 insertions(+), 32 deletions(-) diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 70c957d8c373..704d910c6f24 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -65,6 +65,10 @@ pub struct FFI_ExecutionPlan { /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignExecutionPlan`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_ExecutionPlan {} @@ -76,13 +80,17 @@ pub struct ExecutionPlanPrivateData { pub runtime: Option, } +impl FFI_ExecutionPlan { + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ExecutionPlanPrivateData; + unsafe { &(*private_data).plan } + } +} + unsafe extern "C" fn properties_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> FFI_PlanProperties { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - - plan.properties().into() + plan.inner().properties().into() } unsafe extern "C" fn children_fn_wrapper( @@ -119,10 +127,7 @@ unsafe extern "C" fn execute_fn_wrapper( } unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - - plan.name().into() + plan.inner().name().into() } unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { @@ -168,6 +173,7 @@ impl FFI_ExecutionPlan { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -218,10 +224,14 @@ impl DisplayAs for ForeignExecutionPlan { } } -impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan { +impl TryFrom<&FFI_ExecutionPlan> for Arc { type Error = DataFusionError; fn try_from(plan: &FFI_ExecutionPlan) -> Result { + if (plan.library_marker_id)() == crate::get_library_marker_id() { + return Ok(Arc::clone(plan.inner())); + } + unsafe { let name = (plan.name)(plan).into(); @@ -230,16 +240,17 @@ impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan { let children_rvec = (plan.children)(plan); let children = children_rvec .iter() - .map(ForeignExecutionPlan::try_from) - .map(|child| child.map(|c| Arc::new(c) as Arc)) + .map(>::try_from) .collect::>>()?; - Ok(Self { + let plan = ForeignExecutionPlan { name, plan: plan.clone(), properties, children, - }) + }; + + Ok(Arc::new(plan)) } } } @@ -380,14 +391,15 @@ mod tests { let original_plan = Arc::new(EmptyExec::new(schema)); let original_name = original_plan.name().to_string(); - let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None); + let mut local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None); + local_plan.library_marker_id = crate::mock_foreign_marker_id; - let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?; + let foreign_plan: Arc = (&local_plan).try_into()?; assert!(original_name == foreign_plan.name()); let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new( - &foreign_plan, + foreign_plan.as_ref(), ); let buf = display.one_line().to_string(); @@ -407,12 +419,14 @@ mod tests { // Version 1: Adding child to the foreign plan let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); - let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?); + let mut child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); + child_local.library_marker_id = crate::mock_foreign_marker_id; + let child_foreign = >::try_from(&child_local)?; let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); - let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?); + let mut parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); + parent_local.library_marker_id = crate::mock_foreign_marker_id; + let parent_foreign = >::try_from(&parent_local)?; assert_eq!(parent_foreign.children().len(), 0); assert_eq!(child_foreign.children().len(), 0); @@ -422,13 +436,15 @@ mod tests { // Version 2: Adding child to the local plan let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); - let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); - let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?); + let mut child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None); + child_local.library_marker_id = crate::mock_foreign_marker_id; + let child_foreign = >::try_from(&child_local)?; let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); let parent_plan = parent_plan.with_new_children(vec![child_foreign])?; - let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); - let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?); + let mut parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None); + parent_local.library_marker_id = crate::mock_foreign_marker_id; + let parent_foreign = >::try_from(&parent_local)?; assert_eq!(parent_foreign.children().len(), 1); diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index f2b2614d18f1..b1e1c06091cb 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -50,8 +50,7 @@ use crate::{ }; use super::{ - execution_plan::{FFI_ExecutionPlan, ForeignExecutionPlan}, - insert_op::FFI_InsertOp, + execution_plan::FFI_ExecutionPlan, insert_op::FFI_InsertOp, session_config::FFI_SessionConfig, }; use datafusion::error::Result; @@ -154,7 +153,7 @@ pub struct FFI_TableProvider { pub version: unsafe extern "C" fn() -> u64, /// Internal data. This is only to be accessed by the provider of the plan. - /// A [`ForeignExecutionPlan`] should never attempt to access this data. + /// A [`ForeignTableProvider`] should never attempt to access this data. pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through @@ -300,7 +299,7 @@ unsafe extern "C" fn insert_into_fn_wrapper( .build(); let ctx = SessionContext::new_with_state(session); - let input = rresult_return!(ForeignExecutionPlan::try_from(&input).map(Arc::new)); + let input = rresult_return!(>::try_from(&input)); let insert_op = InsertOp::from(insert_op); @@ -449,10 +448,10 @@ impl TableProvider for ForeignTableProvider { ) .await; - ForeignExecutionPlan::try_from(&df_result!(maybe_plan)?)? + >::try_from(&df_result!(maybe_plan)?)? }; - Ok(Arc::new(plan)) + Ok(plan) } /// Tests whether the table provider can make use of a filter expression @@ -502,10 +501,10 @@ impl TableProvider for ForeignTableProvider { let maybe_plan = (self.0.insert_into)(&self.0, &session_config, &input, insert_op).await; - ForeignExecutionPlan::try_from(&df_result!(maybe_plan)?)? + >::try_from(&df_result!(maybe_plan)?)? }; - Ok(Arc::new(plan)) + Ok(plan) } } From 135955259d6ea7482ada0b0658211cd33527d254 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:38:49 -0500 Subject: [PATCH 07/22] Implement library marker ID for plan properties --- datafusion/ffi/src/plan_properties.rs | 46 +++++++++++++++------------ 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 48c2698a58c7..bf68da1d01b4 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -75,21 +75,31 @@ pub struct FFI_PlanProperties { /// Internal data. This is only to be accessed by the provider of the plan. /// The foreign library should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } struct PlanPropertiesPrivateData { props: PlanProperties, } +impl FFI_PlanProperties { + fn inner(&self) -> &PlanProperties { + let private_data = self.private_data as *const PlanPropertiesPrivateData; + unsafe { &(*private_data).props } + } +} + unsafe extern "C" fn output_partitioning_fn_wrapper( properties: &FFI_PlanProperties, ) -> RResult, RString> { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - let codec = DefaultPhysicalExtensionCodec {}; - let partitioning_data = - rresult_return!(serialize_partitioning(props.output_partitioning(), &codec)); + let partitioning_data = rresult_return!(serialize_partitioning( + properties.inner().output_partitioning(), + &codec + )); let output_partitioning = partitioning_data.encode_to_vec(); ROk(output_partitioning.into()) @@ -98,27 +108,20 @@ unsafe extern "C" fn output_partitioning_fn_wrapper( unsafe extern "C" fn emission_type_fn_wrapper( properties: &FFI_PlanProperties, ) -> FFI_EmissionType { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - props.emission_type.into() + properties.inner().emission_type.into() } unsafe extern "C" fn boundedness_fn_wrapper( properties: &FFI_PlanProperties, ) -> FFI_Boundedness { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - props.boundedness.into() + properties.inner().boundedness.into() } unsafe extern "C" fn output_ordering_fn_wrapper( properties: &FFI_PlanProperties, ) -> RResult, RString> { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - let codec = DefaultPhysicalExtensionCodec {}; - let output_ordering = match props.output_ordering() { + let output_ordering = match properties.inner().output_ordering() { Some(ordering) => { let physical_sort_expr_nodes = rresult_return!( serialize_physical_sort_exprs(ordering.to_owned(), &codec) @@ -135,10 +138,7 @@ unsafe extern "C" fn output_ordering_fn_wrapper( } unsafe extern "C" fn schema_fn_wrapper(properties: &FFI_PlanProperties) -> WrappedSchema { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - - let schema: SchemaRef = Arc::clone(props.eq_properties.schema()); + let schema: SchemaRef = Arc::clone(properties.inner().eq_properties.schema()); schema.into() } @@ -168,6 +168,7 @@ impl From<&PlanProperties> for FFI_PlanProperties { schema: schema_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -176,6 +177,10 @@ impl TryFrom for PlanProperties { type Error = DataFusionError; fn try_from(ffi_props: FFI_PlanProperties) -> Result { + if (ffi_props.library_marker_id)() == crate::get_library_marker_id() { + return Ok(ffi_props.inner().clone()); + } + let ffi_schema = unsafe { (ffi_props.schema)(&ffi_props) }; let schema = (&ffi_schema.0).try_into()?; @@ -321,7 +326,8 @@ mod tests { Boundedness::Bounded, ); - let local_props_ptr = FFI_PlanProperties::from(&original_props); + let mut local_props_ptr = FFI_PlanProperties::from(&original_props); + local_props_ptr.library_marker_id = crate::mock_foreign_marker_id; let foreign_props: PlanProperties = local_props_ptr.try_into()?; From b978312aefc4610af29cdd976e989541ff5087c6 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:42:56 -0500 Subject: [PATCH 08/22] Implement library marker ID for table functions --- datafusion/ffi/src/udtf.rs | 19 +++++++++++++++---- datafusion/ffi/tests/ffi_udtf.rs | 8 +++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index 0abfab55cbb9..37f1b6821af9 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -60,6 +60,10 @@ pub struct FFI_TableFunction { /// Internal data. This is only to be accessed by the provider of the udtf. /// A [`ForeignTableFunction`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_TableFunction {} @@ -128,6 +132,7 @@ impl FFI_TableFunction { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -144,6 +149,7 @@ impl From> for FFI_TableFunction { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -166,9 +172,13 @@ pub struct ForeignTableFunction(FFI_TableFunction); unsafe impl Send for ForeignTableFunction {} unsafe impl Sync for ForeignTableFunction {} -impl From for ForeignTableFunction { +impl From for Arc { fn from(value: FFI_TableFunction) -> Self { - Self(value) + if (value.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(value.inner()) + } else { + Arc::new(ForeignTableFunction(value)) + } } } @@ -285,10 +295,11 @@ mod tests { async fn test_round_trip_udtf() -> Result<()> { let original_udtf = Arc::new(TestUDTF {}) as Arc; - let local_udtf: FFI_TableFunction = + let mut local_udtf: FFI_TableFunction = FFI_TableFunction::new(Arc::clone(&original_udtf), None); + local_udtf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udf: ForeignTableFunction = local_udtf.into(); + let foreign_udf: Arc = local_udtf.into(); let table = foreign_udf.call(&[lit(6_u64), lit("one"), lit(2.0), lit(3_u64)])?; diff --git a/datafusion/ffi/tests/ffi_udtf.rs b/datafusion/ffi/tests/ffi_udtf.rs index 8c1c64a092e1..8339b350cd64 100644 --- a/datafusion/ffi/tests/ffi_udtf.rs +++ b/datafusion/ffi/tests/ffi_udtf.rs @@ -23,11 +23,11 @@ mod tests { use std::sync::Arc; use arrow::array::{create_array, ArrayRef}; + use datafusion::catalog::TableFunctionImpl; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udtf::ForeignTableFunction; /// This test validates that we can load an external module and use a scalar /// udf defined in it via the foreign function interface. In this case we are @@ -42,12 +42,10 @@ mod tests { "External table function provider failed to implement create_table_function" .to_string(), ))?(); - let foreign_table_func: ForeignTableFunction = ffi_table_func.into(); - - let udtf = Arc::new(foreign_table_func); + let foreign_table_func: Arc = ffi_table_func.into(); let ctx = SessionContext::default(); - ctx.register_udtf("my_range", udtf); + ctx.register_udtf("my_range", foreign_table_func); let result = ctx .sql("SELECT * FROM my_range(5)") From 54e25175ccc3d9375e65693bc77177cbb92f1b33 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:49:03 -0500 Subject: [PATCH 09/22] Implement library marker ID for scalar functions --- datafusion/ffi/src/udf/mod.rs | 63 +++++++++++++++++++-------------- datafusion/ffi/tests/ffi_udf.rs | 13 ++++--- 2 files changed, 42 insertions(+), 34 deletions(-) diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 5e59cfc5ecb0..ebd601fdb753 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -114,6 +114,10 @@ pub struct FFI_ScalarUDF { /// Internal data. This is only to be accessed by the provider of the udf. /// A [`ForeignScalarUDF`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_ScalarUDF {} @@ -123,16 +127,21 @@ pub struct ScalarUDFPrivateData { pub udf: Arc, } +impl FFI_ScalarUDF { + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ScalarUDFPrivateData; + unsafe { &(*private_data).udf } + } +} + unsafe extern "C" fn return_type_fn_wrapper( udf: &FFI_ScalarUDF, arg_types: RVec, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); let return_type = udf + .inner() .return_type(&arg_types) .and_then(|v| FFI_ArrowSchema::try_from(v).map_err(DataFusionError::from)) .map(WrappedSchema); @@ -144,13 +153,11 @@ unsafe extern "C" fn return_field_from_args_fn_wrapper( udf: &FFI_ScalarUDF, args: FFI_ReturnFieldArgs, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - let args: ForeignReturnFieldArgsOwned = rresult_return!((&args).try_into()); let args_ref: ForeignReturnFieldArgs = (&args).into(); let return_type = udf + .inner() .return_field_from_args((&args_ref).into()) .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(DataFusionError::from)) .map(WrappedSchema); @@ -162,12 +169,10 @@ unsafe extern "C" fn coerce_types_fn_wrapper( udf: &FFI_ScalarUDF, arg_types: RVec, ) -> RResult, RString> { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); - let return_types = rresult_return!(data_types_with_scalar_udf(&arg_types, udf)); + let return_types = + rresult_return!(data_types_with_scalar_udf(&arg_types, udf.inner())); rresult!(vec_datatype_to_rvec_wrapped(&return_types)) } @@ -179,9 +184,6 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper( number_rows: usize, return_field: WrappedSchema, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - let args = args .into_iter() .map(|arr| { @@ -213,6 +215,7 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper( }; let result = rresult_return!(udf + .inner() .invoke_with_args(args) .and_then(|r| r.to_array(number_rows))); @@ -263,6 +266,7 @@ impl From> for FFI_ScalarUDF { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -321,21 +325,25 @@ impl Hash for ForeignScalarUDF { } } -impl TryFrom<&FFI_ScalarUDF> for ForeignScalarUDF { +impl TryFrom<&FFI_ScalarUDF> for Arc { type Error = DataFusionError; fn try_from(udf: &FFI_ScalarUDF) -> Result { - let name = udf.name.to_owned().into(); - let signature = Signature::user_defined((&udf.volatility).into()); - - let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); - - Ok(Self { - name, - udf: udf.clone(), - aliases, - signature, - }) + if (udf.library_marker_id)() == crate::get_library_marker_id() { + Ok(Arc::clone(udf.inner().inner())) + } else { + let name = udf.name.to_owned().into(); + let signature = Signature::user_defined((&udf.volatility).into()); + + let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); + + Ok(Arc::new(ForeignScalarUDF { + name, + udf: udf.clone(), + aliases, + signature, + })) + } } } @@ -455,9 +463,10 @@ mod tests { let original_udf = datafusion::functions::math::abs::AbsFunc::new(); let original_udf = Arc::new(ScalarUDF::from(original_udf)); - let local_udf: FFI_ScalarUDF = Arc::clone(&original_udf).into(); + let mut local_udf: FFI_ScalarUDF = Arc::clone(&original_udf).into(); + local_udf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udf: ForeignScalarUDF = (&local_udf).try_into()?; + let foreign_udf: Arc = (&local_udf).try_into()?; assert_eq!(original_udf.name(), foreign_udf.name()); diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs index fd6a84bcf5b0..d50739be9975 100644 --- a/datafusion/ffi/tests/ffi_udf.rs +++ b/datafusion/ffi/tests/ffi_udf.rs @@ -19,16 +19,15 @@ /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { - use arrow::datatypes::DataType; use datafusion::common::record_batch; use datafusion::error::{DataFusionError, Result}; - use datafusion::logical_expr::ScalarUDF; + use datafusion::logical_expr::{ScalarUDF, ScalarUDFImpl}; use datafusion::prelude::{col, SessionContext}; + use std::sync::Arc; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udf::ForeignScalarUDF; /// This test validates that we can load an external module and use a scalar /// udf defined in it via the foreign function interface. In this case we are @@ -44,9 +43,9 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?; + let foreign_abs_func: Arc = (&ffi_abs_func).try_into()?; - let udf: ScalarUDF = foreign_abs_func.into(); + let udf = ScalarUDF::new_from_shared_impl(foreign_abs_func); let ctx = SessionContext::default(); let df = ctx.read_batch(create_record_batch(-5, 5))?; @@ -82,9 +81,9 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?; + let foreign_abs_func: Arc = (&ffi_abs_func).try_into()?; - let udf: ScalarUDF = foreign_abs_func.into(); + let udf = ScalarUDF::new_from_shared_impl(foreign_abs_func); let ctx = SessionContext::default(); let df = ctx.read_batch(create_record_batch(-5, 5))?; From 91e87a0868ae4f1862bc53b45b6ea27908e94894 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 16:57:46 -0500 Subject: [PATCH 10/22] Implement library marker ID for accumulator --- datafusion/ffi/src/udaf/accumulator.rs | 41 +++++++++++++++++++------- datafusion/ffi/src/udaf/mod.rs | 12 ++++---- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index 80b872159f48..d528454ca59f 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ffi::c_void, ops::Deref}; - use abi_stable::{ std_types::{RResult, RString, RVec}, StableAbi, @@ -28,6 +26,8 @@ use datafusion::{ scalar::ScalarValue, }; use prost::Message; +use std::ptr::null_mut; +use std::{ffi::c_void, ops::Deref}; use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; @@ -70,6 +70,10 @@ pub struct FFI_Accumulator { /// Internal data. This is only to be accessed by the provider of the accumulator. /// A [`ForeignAccumulator`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_Accumulator {} @@ -173,9 +177,11 @@ unsafe extern "C" fn retract_batch_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_Accumulator) { - let private_data = - Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData); - drop(private_data); + if !accumulator.private_data.is_null() { + let private_data = + Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData); + drop(private_data); + } } impl From> for FFI_Accumulator { @@ -193,6 +199,7 @@ impl From> for FFI_Accumulator { supports_retract_batch, release: release_fn_wrapper, private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -217,9 +224,20 @@ pub struct ForeignAccumulator { unsafe impl Send for ForeignAccumulator {} unsafe impl Sync for ForeignAccumulator {} -impl From for ForeignAccumulator { - fn from(accumulator: FFI_Accumulator) -> Self { - Self { accumulator } +impl From for Box { + fn from(mut accumulator: FFI_Accumulator) -> Self { + if (accumulator.library_marker_id)() == crate::get_library_marker_id() { + unsafe { + let private_data = Box::from_raw( + accumulator.private_data as *mut AccumulatorPrivateData, + ); + // We must set this to null to avoid a double free + accumulator.private_data = null_mut(); + private_data.accumulator + } + } else { + Box::new(ForeignAccumulator { accumulator }) + } } } @@ -313,7 +331,7 @@ mod tests { scalar::ScalarValue, }; - use super::{FFI_Accumulator, ForeignAccumulator}; + use super::FFI_Accumulator; #[test] fn test_foreign_avg_accumulator() -> Result<()> { @@ -322,8 +340,9 @@ mod tests { let original_supports_retract = original_accum.supports_retract_batch(); let boxed_accum: Box = Box::new(original_accum); - let ffi_accum: FFI_Accumulator = boxed_accum.into(); - let mut foreign_accum: ForeignAccumulator = ffi_accum.into(); + let mut ffi_accum: FFI_Accumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let mut foreign_accum: Box = ffi_accum.into(); // Send in an array to average. There are 5 values and it should average to 30.0 let values = create_array!(Float64, vec![10., 20., 30., 40., 50.]); diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index ce5611590b67..c506f41318e7 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -19,7 +19,7 @@ use abi_stable::{ std_types::{ROption, RResult, RStr, RString, RVec}, StableAbi, }; -use accumulator::{FFI_Accumulator, ForeignAccumulator}; +use accumulator::FFI_Accumulator; use accumulator_args::{FFI_AccumulatorArgs, ForeignAccumulatorArgs}; use arrow::datatypes::{DataType, Field}; use arrow::ffi::FFI_ArrowSchema; @@ -453,9 +453,8 @@ impl AggregateUDFImpl for ForeignAggregateUDF { fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { let args = acc_args.try_into()?; unsafe { - df_result!((self.udaf.accumulator)(&self.udaf, args)).map(|accum| { - Box::new(ForeignAccumulator::from(accum)) as Box - }) + df_result!((self.udaf.accumulator)(&self.udaf, args)) + .map(>::from) } } @@ -536,9 +535,8 @@ impl AggregateUDFImpl for ForeignAggregateUDF { ) -> Result> { let args = args.try_into()?; unsafe { - df_result!((self.udaf.create_sliding_accumulator)(&self.udaf, args)).map( - |accum| Box::new(ForeignAccumulator::from(accum)) as Box, - ) + df_result!((self.udaf.create_sliding_accumulator)(&self.udaf, args)) + .map(>::from) } } From 48ad82513c28d0f132a66bd0b3bdd2b858964815 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 17:02:17 -0500 Subject: [PATCH 11/22] Implement library marker ID for groups accumulator --- datafusion/ffi/src/udaf/groups_accumulator.rs | 40 ++++++++++++++----- datafusion/ffi/src/udaf/mod.rs | 10 ++--- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 58a18c69db7c..967f20ec00d5 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ffi::c_void, ops::Deref, sync::Arc}; - use crate::{ arrow_wrappers::{WrappedArray, WrappedSchema}, df_result, rresult, rresult_return, @@ -34,6 +32,8 @@ use datafusion::{ error::{DataFusionError, Result}, logical_expr::{EmitTo, GroupsAccumulator}, }; +use std::ptr::null_mut; +use std::{ffi::c_void, ops::Deref, sync::Arc}; /// A stable struct for sharing [`GroupsAccumulator`] across FFI boundaries. /// For an explanation of each field, see the corresponding function @@ -86,6 +86,10 @@ pub struct FFI_GroupsAccumulator { /// Internal data. This is only to be accessed by the provider of the accumulator. /// A [`ForeignGroupsAccumulator`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_GroupsAccumulator {} @@ -215,9 +219,11 @@ unsafe extern "C" fn convert_to_state_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_GroupsAccumulator) { - let private_data = - Box::from_raw(accumulator.private_data as *mut GroupsAccumulatorPrivateData); - drop(private_data); + if !accumulator.private_data.is_null() { + let private_data = + Box::from_raw(accumulator.private_data as *mut GroupsAccumulatorPrivateData); + drop(private_data); + } } impl From> for FFI_GroupsAccumulator { @@ -236,6 +242,7 @@ impl From> for FFI_GroupsAccumulator { release: release_fn_wrapper, private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -260,9 +267,19 @@ pub struct ForeignGroupsAccumulator { unsafe impl Send for ForeignGroupsAccumulator {} unsafe impl Sync for ForeignGroupsAccumulator {} -impl From for ForeignGroupsAccumulator { - fn from(accumulator: FFI_GroupsAccumulator) -> Self { - Self { accumulator } +impl From for Box { + fn from(mut accumulator: FFI_GroupsAccumulator) -> Self { + if (accumulator.library_marker_id)() == crate::get_library_marker_id() { + unsafe { + let private_data = Box::from_raw( + accumulator.private_data as *mut GroupsAccumulatorPrivateData, + ); + accumulator.private_data = null_mut(); + private_data.accumulator + } + } else { + Box::new(ForeignGroupsAccumulator { accumulator }) + } } } @@ -436,14 +453,15 @@ mod tests { }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; - use super::{FFI_EmitTo, FFI_GroupsAccumulator, ForeignGroupsAccumulator}; + use super::{FFI_EmitTo, FFI_GroupsAccumulator}; #[test] fn test_foreign_avg_accumulator() -> Result<()> { let boxed_accum: Box = Box::new(BooleanGroupsAccumulator::new(|a, b| a && b, true)); - let ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); - let mut foreign_accum: ForeignGroupsAccumulator = ffi_accum.into(); + let mut ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let mut foreign_accum: Box = ffi_accum.into(); // Send in an array to evaluate. We want a mean of 30 and standard deviation of 4. let values = create_array!(Boolean, vec![true, true, true, false, true, true]); diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index c506f41318e7..de9dda2071fb 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -39,7 +39,7 @@ use datafusion::{ }; use datafusion_common::exec_datafusion_err; use datafusion_proto_common::from_proto::parse_proto_fields_to_fields; -use groups_accumulator::{FFI_GroupsAccumulator, ForeignGroupsAccumulator}; +use groups_accumulator::FFI_GroupsAccumulator; use std::hash::{Hash, Hasher}; use std::{ffi::c_void, sync::Arc}; @@ -516,12 +516,8 @@ impl AggregateUDFImpl for ForeignAggregateUDF { let args = FFI_AccumulatorArgs::try_from(args)?; unsafe { - df_result!((self.udaf.create_groups_accumulator)(&self.udaf, args)).map( - |accum| { - Box::new(ForeignGroupsAccumulator::from(accum)) - as Box - }, - ) + df_result!((self.udaf.create_groups_accumulator)(&self.udaf, args)) + .map(>::from) } } From 59aa6152b5488e5f120fba39724d70ebeb89790f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 17:27:14 -0500 Subject: [PATCH 12/22] Implement library marker ID for udaf --- datafusion/ffi/src/udaf/mod.rs | 37 +++++++++++++++++++++----------- datafusion/ffi/tests/ffi_udaf.rs | 13 +++++------ 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index de9dda2071fb..6fa326ce61e3 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -145,6 +145,10 @@ pub struct FFI_AggregateUDF { /// Internal data. This is only to be accessed by the provider of the udaf. /// A [`ForeignAggregateUDF`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_AggregateUDF {} @@ -361,6 +365,7 @@ impl From> for FFI_AggregateUDF { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -400,18 +405,22 @@ impl Hash for ForeignAggregateUDF { } } -impl TryFrom<&FFI_AggregateUDF> for ForeignAggregateUDF { +impl TryFrom<&FFI_AggregateUDF> for Arc { type Error = DataFusionError; fn try_from(udaf: &FFI_AggregateUDF) -> Result { + if (udaf.library_marker_id)() == crate::get_library_marker_id() { + return Ok(Arc::clone(unsafe { udaf.inner().inner() })); + } + let signature = Signature::user_defined((&udaf.volatility).into()); let aliases = udaf.aliases.iter().map(|s| s.to_string()).collect(); - Ok(Self { + Ok(Arc::new(ForeignAggregateUDF { udaf: udaf.clone(), signature, aliases, - }) + })) } } @@ -548,10 +557,10 @@ impl AggregateUDFImpl for ForeignAggregateUDF { .into_option(); let result = result - .map(|func| ForeignAggregateUDF::try_from(&func)) + .map(|func| >::try_from(&func)) .transpose()?; - Ok(result.map(|func| Arc::new(func) as Arc)) + Ok(result) } } @@ -655,10 +664,11 @@ mod tests { ) -> Result { let original_udaf = Arc::new(AggregateUDF::from(original_udaf)); - let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + let mut local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + local_udaf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?; - Ok(foreign_udaf.into()) + let foreign_udaf: Arc = (&local_udaf).try_into()?; + Ok(AggregateUDF::new_from_shared_impl(foreign_udaf)) } #[test] @@ -668,11 +678,12 @@ mod tests { let original_udaf = Arc::new(AggregateUDF::from(original_udaf)); // Convert to FFI format - let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + let mut local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); + local_udaf.library_marker_id = crate::mock_foreign_marker_id; // Convert back to native format - let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?; - let foreign_udaf: AggregateUDF = foreign_udaf.into(); + let foreign_udaf: Arc = (&local_udaf).try_into()?; + let foreign_udaf = AggregateUDF::new_from_shared_impl(foreign_udaf); assert_eq!(original_name, foreign_udaf.name()); Ok(()) @@ -725,8 +736,8 @@ mod tests { let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into(); // Convert back to native format - let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?; - let foreign_udaf: AggregateUDF = foreign_udaf.into(); + let foreign_udaf: Arc = (&local_udaf).try_into()?; + let foreign_udaf = AggregateUDF::new_from_shared_impl(foreign_udaf); let metadata: HashMap = [("a_key".to_string(), "a_value".to_string())] diff --git a/datafusion/ffi/tests/ffi_udaf.rs b/datafusion/ffi/tests/ffi_udaf.rs index ffd99bac62ec..9d1823ded076 100644 --- a/datafusion/ffi/tests/ffi_udaf.rs +++ b/datafusion/ffi/tests/ffi_udaf.rs @@ -22,11 +22,11 @@ mod tests { use arrow::array::Float64Array; use datafusion::common::record_batch; use datafusion::error::{DataFusionError, Result}; - use datafusion::logical_expr::AggregateUDF; + use datafusion::logical_expr::{AggregateUDF, AggregateUDFImpl}; use datafusion::prelude::{col, SessionContext}; + use std::sync::Arc; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udaf::ForeignAggregateUDF; #[tokio::test] async fn test_ffi_udaf() -> Result<()> { @@ -38,9 +38,9 @@ mod tests { .ok_or(DataFusionError::NotImplemented( "External table provider failed to implement create_udaf".to_string(), ))?(); - let foreign_sum_func: ForeignAggregateUDF = (&ffi_sum_func).try_into()?; + let foreign_sum_func: Arc = (&ffi_sum_func).try_into()?; - let udaf: AggregateUDF = foreign_sum_func.into(); + let udaf = AggregateUDF::new_from_shared_impl(foreign_sum_func); let ctx = SessionContext::default(); let record_batch = record_batch!( @@ -80,9 +80,10 @@ mod tests { .ok_or(DataFusionError::NotImplemented( "External table provider failed to implement create_udaf".to_string(), ))?(); - let foreign_stddev_func: ForeignAggregateUDF = (&ffi_stddev_func).try_into()?; + let foreign_stddev_func: Arc = + (&ffi_stddev_func).try_into()?; - let udaf: AggregateUDF = foreign_stddev_func.into(); + let udaf = AggregateUDF::new_from_shared_impl(foreign_stddev_func); let ctx = SessionContext::default(); let record_batch = record_batch!( From ddaa9d0ba316f0adab35788dcf72a0bd85b94cb4 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 17:30:53 -0500 Subject: [PATCH 13/22] Implement library marker ID for partition evaluator --- datafusion/ffi/src/udwf/mod.rs | 7 ++--- .../ffi/src/udwf/partition_evaluator.rs | 29 +++++++++++++++---- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 9f56e2d4788b..87c63cf36e10 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -39,7 +39,7 @@ use datafusion::{ logical_expr::{Signature, WindowUDF, WindowUDFImpl}, }; use datafusion_common::exec_err; -use partition_evaluator::{FFI_PartitionEvaluator, ForeignPartitionEvaluator}; +use partition_evaluator::FFI_PartitionEvaluator; use partition_evaluator_args::{ FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs, }; @@ -322,10 +322,7 @@ impl WindowUDFImpl for ForeignWindowUDF { (self.udf.partition_evaluator)(&self.udf, args) }; - df_result!(evaluator).map(|evaluator| { - Box::new(ForeignPartitionEvaluator::from(evaluator)) - as Box - }) + df_result!(evaluator).map(>::from) } fn field(&self, field_args: WindowUDFFieldArgs) -> Result { diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index 14cf23b919aa..4ec33c7b08d5 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -76,6 +76,10 @@ pub struct FFI_PartitionEvaluator { /// Internal data. This is only to be accessed by the provider of the evaluator. /// A [`ForeignPartitionEvaluator`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_PartitionEvaluator {} @@ -170,9 +174,11 @@ unsafe extern "C" fn get_range_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(evaluator: &mut FFI_PartitionEvaluator) { - let private_data = - Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); - drop(private_data); + if !evaluator.private_data.is_null() { + let private_data = + Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); + drop(private_data); + } } impl From> for FFI_PartitionEvaluator { @@ -195,6 +201,7 @@ impl From> for FFI_PartitionEvaluator { uses_window_frame, release: release_fn_wrapper, private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -219,9 +226,19 @@ pub struct ForeignPartitionEvaluator { unsafe impl Send for ForeignPartitionEvaluator {} unsafe impl Sync for ForeignPartitionEvaluator {} -impl From for ForeignPartitionEvaluator { - fn from(evaluator: FFI_PartitionEvaluator) -> Self { - Self { evaluator } +impl From for Box { + fn from(mut evaluator: FFI_PartitionEvaluator) -> Self { + if (evaluator.library_marker_id)() == crate::get_library_marker_id() { + unsafe { + let private_data = Box::from_raw( + evaluator.private_data as *mut PartitionEvaluatorPrivateData, + ); + evaluator.private_data = std::ptr::null_mut(); + private_data.evaluator + } + } else { + Box::new(ForeignPartitionEvaluator { evaluator }) + } } } From 880baab019b48d03341ad13246877855258c1677 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 17:34:58 -0500 Subject: [PATCH 14/22] Implement library marker ID for udwf --- datafusion/ffi/src/udwf/mod.rs | 50 ++++++++++++++++++++------------ datafusion/ffi/tests/ffi_udwf.rs | 8 ++--- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 87c63cf36e10..e4819b5fa7bf 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -105,6 +105,10 @@ pub struct FFI_WindowUDF { /// Internal data. This is only to be accessed by the provider of the udf. /// A [`ForeignWindowUDF`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> u64, } unsafe impl Send for FFI_WindowUDF {} @@ -201,6 +205,7 @@ unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } @@ -230,6 +235,7 @@ impl From> for FFI_WindowUDF { clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, } } } @@ -270,21 +276,25 @@ impl Hash for ForeignWindowUDF { } } -impl TryFrom<&FFI_WindowUDF> for ForeignWindowUDF { +impl TryFrom<&FFI_WindowUDF> for Arc { type Error = DataFusionError; fn try_from(udf: &FFI_WindowUDF) -> Result { - let name = udf.name.to_owned().into(); - let signature = Signature::user_defined((&udf.volatility).into()); - - let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); - - Ok(Self { - name, - udf: udf.clone(), - aliases, - signature, - }) + if (udf.library_marker_id)() == crate::get_library_marker_id() { + Ok(Arc::clone(unsafe { udf.inner().inner() })) + } else { + let name = udf.name.to_owned().into(); + let signature = Signature::user_defined((&udf.volatility).into()); + + let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); + + Ok(Arc::new(ForeignWindowUDF { + name, + udf: udf.clone(), + aliases, + signature, + })) + } } } @@ -384,7 +394,7 @@ impl From<&FFI_SortOptions> for SortOptions { #[cfg(feature = "integration-tests")] mod tests { use crate::tests::create_record_batch; - use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; + use crate::udwf::FFI_WindowUDF; use arrow::array::{create_array, ArrayRef}; use datafusion::functions_window::lead_lag::{lag_udwf, WindowShift}; use datafusion::logical_expr::expr::Sort; @@ -397,10 +407,11 @@ mod tests { ) -> datafusion::common::Result { let original_udwf = Arc::new(WindowUDF::from(original_udwf)); - let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + let mut local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + local_udwf.library_marker_id = crate::mock_foreign_marker_id; - let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?; - Ok(foreign_udwf.into()) + let foreign_udwf: Arc = (&local_udwf).try_into()?; + Ok(WindowUDF::new_from_shared_impl(foreign_udwf)) } #[test] @@ -409,11 +420,12 @@ mod tests { let original_name = original_udwf.name().to_owned(); // Convert to FFI format - let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + let mut local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into(); + local_udwf.library_marker_id = crate::mock_foreign_marker_id; // Convert back to native format - let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?; - let foreign_udwf: WindowUDF = foreign_udwf.into(); + let foreign_udwf: Arc = (&local_udwf).try_into()?; + let foreign_udwf = WindowUDF::new_from_shared_impl(foreign_udwf); assert_eq!(original_name, foreign_udwf.name()); Ok(()) diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs index 18ffd0c5bcb7..e87c65ca8907 100644 --- a/datafusion/ffi/tests/ffi_udwf.rs +++ b/datafusion/ffi/tests/ffi_udwf.rs @@ -22,11 +22,11 @@ mod tests { use arrow::array::{create_array, ArrayRef}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::expr::Sort; - use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF}; + use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF, WindowUDFImpl}; use datafusion::prelude::SessionContext; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; - use datafusion_ffi::udwf::ForeignWindowUDF; + use std::sync::Arc; #[tokio::test] async fn test_rank_udwf() -> Result<()> { @@ -39,9 +39,9 @@ mod tests { "External table provider failed to implement create_scalar_udf" .to_string(), ))?(); - let foreign_rank_func: ForeignWindowUDF = (&ffi_rank_func).try_into()?; + let foreign_rank_func: Arc = (&ffi_rank_func).try_into()?; - let udwf: WindowUDF = foreign_rank_func.into(); + let udwf = WindowUDF::new_from_shared_impl(foreign_rank_func); let ctx = SessionContext::default(); let df = ctx.read_batch(create_record_batch(-5, 5))?; From 16802e94326875fa656c3b09d368281985a886c1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 13 Nov 2025 07:30:01 -0500 Subject: [PATCH 15/22] Add unit tests for library marker --- datafusion/ffi/src/catalog_provider.rs | 22 ++++++ datafusion/ffi/src/catalog_provider_list.rs | 24 +++++++ datafusion/ffi/src/execution_plan.rs | 31 +++++++-- datafusion/ffi/src/plan_properties.rs | 30 ++++++-- datafusion/ffi/src/schema_provider.rs | 25 ++++++- datafusion/ffi/src/table_provider.rs | 68 +++++++++++-------- datafusion/ffi/src/udaf/accumulator.rs | 34 +++++++++- datafusion/ffi/src/udaf/groups_accumulator.rs | 36 +++++++++- datafusion/ffi/src/udaf/mod.rs | 25 ++++++- datafusion/ffi/src/udf/mod.rs | 57 +++++++--------- datafusion/ffi/src/udtf.rs | 22 +++++- datafusion/ffi/src/udwf/mod.rs | 26 ++++++- .../ffi/src/udwf/partition_evaluator.rs | 52 +++++++++++++- 13 files changed, 369 insertions(+), 83 deletions(-) diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index ad3b070ee5fb..c9a6d9946483 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -347,4 +347,26 @@ mod tests { let returned_schema = foreign_catalog.schema("second_schema"); assert!(returned_schema.is_some()); } + + #[test] + fn test_ffi_catalog_provider_local_bypass() { + let catalog = Arc::new(MemoryCatalogProvider::new()); + + let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None); + + // Verify local libraries can be downcast to their original + let foreign_catalog: Arc = (&ffi_catalog).into(); + assert!(foreign_catalog + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_catalog.library_marker_id = crate::mock_foreign_marker_id; + let foreign_catalog: Arc = (&ffi_catalog).into(); + assert!(foreign_catalog + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index d2ab31ffb7de..db0e33ab51c9 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -293,4 +293,28 @@ mod tests { let returned_catalog = foreign_catalog_list.catalog("second_catalog"); assert!(returned_catalog.is_some()); } + + #[test] + fn test_ffi_catalog_provider_list_local_bypass() { + let catalog_list = Arc::new(MemoryCatalogProviderList::new()); + + let mut ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None); + + // Verify local libraries can be downcast to their original + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); + assert!(foreign_catalog_list + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id; + let foreign_catalog_list: Arc = + (&ffi_catalog_list).into(); + assert!(foreign_catalog_list + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 704d910c6f24..bad206f36c1e 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -269,10 +269,7 @@ impl ExecutionPlan for ForeignExecutionPlan { } fn children(&self) -> Vec<&Arc> { - self.children - .iter() - .map(|p| p as &Arc) - .collect() + self.children.iter().collect() } fn with_new_children( @@ -301,6 +298,7 @@ impl ExecutionPlan for ForeignExecutionPlan { #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::{ physical_plan::{ @@ -310,8 +308,6 @@ mod tests { prelude::SessionContext, }; - use super::*; - #[derive(Debug)] pub struct EmptyExec { props: PlanProperties, @@ -450,4 +446,27 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_execution_plan_local_bypass() { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let ctx = SessionContext::new(); + + let plan = Arc::new(EmptyExec::new(schema)); + + let mut ffi_plan = FFI_ExecutionPlan::new(plan, ctx.task_ctx(), None); + + // Verify local libraries can be downcast to their original + let foreign_plan: Arc = (&ffi_plan).try_into().unwrap(); + assert!(foreign_plan.as_any().downcast_ref::().is_some()); + + // Verify different library markers generate foreign providers + ffi_plan.library_marker_id = crate::mock_foreign_marker_id; + let foreign_plan: Arc = (&ffi_plan).try_into().unwrap(); + assert!(foreign_plan + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index bf68da1d01b4..e8b45cab06ab 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -309,8 +309,7 @@ mod tests { use super::*; - #[test] - fn test_round_trip_ffi_plan_properties() -> Result<()> { + fn create_test_props() -> Result { use arrow::datatypes::{DataType, Field, Schema}; let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); @@ -319,13 +318,17 @@ mod tests { let _ = eqp.reorder([PhysicalSortExpr::new_default( datafusion::physical_plan::expressions::col("a", &schema)?, )]); - let original_props = PlanProperties::new( + Ok(PlanProperties::new( eqp, Partitioning::RoundRobinBatch(3), EmissionType::Incremental, Boundedness::Bounded, - ); + )) + } + #[test] + fn test_round_trip_ffi_plan_properties() -> Result<()> { + let original_props = create_test_props()?; let mut local_props_ptr = FFI_PlanProperties::from(&original_props); local_props_ptr.library_marker_id = crate::mock_foreign_marker_id; @@ -335,4 +338,23 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_execution_plan_local_bypass() -> Result<()> { + let props = create_test_props()?; + + let ffi_plan = FFI_PlanProperties::from(&props); + + // Verify local libraries + let foreign_plan: PlanProperties = ffi_plan.try_into()?; + assert_eq!(format!("{foreign_plan:?}"), format!("{:?}", foreign_plan)); + + // Verify different library markers still can produce identical properties + let mut ffi_plan = FFI_PlanProperties::from(&props); + ffi_plan.library_marker_id = crate::mock_foreign_marker_id; + let foreign_plan: PlanProperties = ffi_plan.try_into()?; + assert_eq!(format!("{foreign_plan:?}"), format!("{:?}", foreign_plan)); + + Ok(()) + } } diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index 71c48c09126f..c10ee0d282ca 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -328,11 +328,10 @@ impl SchemaProvider for ForeignSchemaProvider { #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::Schema; use datafusion::{catalog::MemorySchemaProvider, datasource::empty::EmptyTable}; - use super::*; - fn empty_table() -> Arc { Arc::new(EmptyTable::new(Arc::new(Schema::empty()))) } @@ -392,4 +391,26 @@ mod tests { assert!(returned_schema.is_some()); assert!(foreign_schema_provider.table_exist("second_table")); } + + #[test] + fn test_ffi_schema_provider_local_bypass() { + let schema_provider = Arc::new(MemorySchemaProvider::new()); + + let mut ffi_schema = FFI_SchemaProvider::new(schema_provider, None); + + // Verify local libraries can be downcast to their original + let foreign_schema: Arc = (&ffi_schema).into(); + assert!(foreign_schema + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_schema.library_marker_id = crate::mock_foreign_marker_id; + let foreign_schema: Arc = (&ffi_schema).into(); + assert!(foreign_schema + .as_any() + .downcast_ref::() + .is_some()); + } } diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index b1e1c06091cb..5f051f7f5db0 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -510,13 +510,11 @@ impl TableProvider for ForeignTableProvider { #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::Schema; use datafusion::prelude::{col, lit}; - use super::*; - - #[tokio::test] - async fn test_round_trip_ffi_table_provider_scan() -> Result<()> { + fn create_test_table_provider() -> Result> { use arrow::datatypes::Field; use datafusion::arrow::{ array::Float32Array, datatypes::DataType, record_batch::RecordBatch, @@ -536,10 +534,16 @@ mod tests { vec![Arc::new(Float32Array::from(vec![64.0]))], )?; - let ctx = SessionContext::new(); + Ok(Arc::new(MemTable::try_new( + schema, + vec![vec![batch1], vec![batch2]], + )?)) + } - let provider = - Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?); + #[tokio::test] + async fn test_round_trip_ffi_table_provider_scan() -> Result<()> { + let provider = create_test_table_provider()?; + let ctx = SessionContext::new(); let mut ffi_provider = FFI_TableProvider::new(provider, true, None); ffi_provider.library_marker_id = crate::mock_foreign_marker_id; @@ -560,31 +564,11 @@ mod tests { #[tokio::test] async fn test_round_trip_ffi_table_provider_insert_into() -> Result<()> { - use arrow::datatypes::Field; - use datafusion::arrow::{ - array::Float32Array, datatypes::DataType, record_batch::RecordBatch, - }; - use datafusion::datasource::MemTable; - - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); - - // define data in two partitions - let batch1 = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))], - )?; - let batch2 = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(Float32Array::from(vec![64.0]))], - )?; - + let provider = create_test_table_provider()?; let ctx = SessionContext::new(); - let provider = - Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?); - - let ffi_provider = FFI_TableProvider::new(provider, true, None); + let mut ffi_provider = FFI_TableProvider::new(provider, true, None); + ffi_provider.library_marker_id = crate::mock_foreign_marker_id; let foreign_table_provider: Arc = (&ffi_provider).into(); @@ -652,4 +636,28 @@ mod tests { assert_batches_eq!(expected, &result); Ok(()) } + + #[test] + fn test_ffi_table_provider_local_bypass() -> Result<()> { + let table_provider = create_test_table_provider()?; + + let mut ffi_table = FFI_TableProvider::new(table_provider, false, None); + + // Verify local libraries can be downcast to their original + let foreign_table: Arc = (&ffi_table).into(); + assert!(foreign_table + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_table.library_marker_id = crate::mock_foreign_marker_id; + let foreign_table: Arc = (&ffi_table).into(); + assert!(foreign_table + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index d528454ca59f..227b04902aa2 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -324,6 +324,7 @@ impl Accumulator for ForeignAccumulator { #[cfg(test)] mod tests { + use super::{FFI_Accumulator, ForeignAccumulator}; use arrow::array::{make_array, Array}; use datafusion::{ common::create_array, error::Result, @@ -331,8 +332,6 @@ mod tests { scalar::ScalarValue, }; - use super::FFI_Accumulator; - #[test] fn test_foreign_avg_accumulator() -> Result<()> { let original_accum = AvgAccumulator::default(); @@ -382,4 +381,35 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_accumulator_local_bypass() -> Result<()> { + let original_accum = AvgAccumulator::default(); + let boxed_accum: Box = Box::new(original_accum); + let original_size = boxed_accum.size(); + + let ffi_accum: FFI_Accumulator = boxed_accum.into(); + + // Verify local libraries can be downcast to their original + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn Accumulator + as *const AvgAccumulator); + assert_eq!(original_size, concrete.size()); + } + + // Verify different library markers generate foreign accumulator + let original_accum = AvgAccumulator::default(); + let boxed_accum: Box = Box::new(original_accum); + let mut ffi_accum: FFI_Accumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn Accumulator + as *const ForeignAccumulator); + assert_eq!(original_size, concrete.size()); + } + + Ok(()) + } } diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 967f20ec00d5..82095383efb2 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -445,15 +445,16 @@ impl From for EmitTo { #[cfg(test)] mod tests { + use super::{FFI_EmitTo, FFI_GroupsAccumulator, ForeignGroupsAccumulator}; use arrow::array::{make_array, Array, BooleanArray}; + use datafusion::functions_aggregate::stddev::StddevGroupsAccumulator; use datafusion::{ common::create_array, error::Result, logical_expr::{EmitTo, GroupsAccumulator}, }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; - - use super::{FFI_EmitTo, FFI_GroupsAccumulator}; + use datafusion_functions_aggregate_common::stats::StatsType; #[test] fn test_foreign_avg_accumulator() -> Result<()> { @@ -528,4 +529,35 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_groups_accumulator_local_bypass_inner() -> Result<()> { + let original_accum = StddevGroupsAccumulator::new(StatsType::Population); + let boxed_accum: Box = Box::new(original_accum); + let original_size = boxed_accum.size(); + + let ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); + + // Verify local libraries can be downcast to their original + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn GroupsAccumulator + as *const StddevGroupsAccumulator); + assert_eq!(original_size, concrete.size()); + } + + // Verify different library markers generate foreign accumulator + let original_accum = StddevGroupsAccumulator::new(StatsType::Population); + let boxed_accum: Box = Box::new(original_accum); + let mut ffi_accum: FFI_GroupsAccumulator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn GroupsAccumulator + as *const ForeignGroupsAccumulator); + assert_eq!(original_size, concrete.size()); + } + + Ok(()) + } } diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index 6fa326ce61e3..78dc27af8b22 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -616,6 +616,7 @@ impl From for FFI_AggregateOrderSensitivity { #[cfg(test)] mod tests { + use super::*; use arrow::datatypes::Schema; use datafusion::{ common::create_array, functions_aggregate::sum::Sum, @@ -625,8 +626,6 @@ mod tests { use std::any::Any; use std::collections::HashMap; - use super::*; - #[derive(Default, Debug, Hash, Eq, PartialEq)] struct SumWithCopiedMetadata { inner: Sum, @@ -820,4 +819,26 @@ mod tests { test_round_trip_order_sensitivity(AggregateOrderSensitivity::SoftRequirement); test_round_trip_order_sensitivity(AggregateOrderSensitivity::Beneficial); } + + #[test] + fn test_ffi_udaf_local_bypass() -> Result<()> { + let original_udaf = Sum::new(); + let original_udaf = Arc::new(AggregateUDF::from(original_udaf)); + + let mut ffi_udaf = FFI_AggregateUDF::from(original_udaf); + + // Verify local libraries can be downcast to their original + let foreign_udaf: Arc = (&ffi_udaf).try_into()?; + assert!(foreign_udaf.as_any().downcast_ref::().is_some()); + + // Verify different library markers generate foreign providers + ffi_udaf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udaf: Arc = (&ffi_udaf).try_into()?; + assert!(foreign_udaf + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index ebd601fdb753..4900ca821be4 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -44,6 +44,7 @@ use datafusion::{ ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, }, }; +use datafusion_common::internal_err; use return_type_args::{ FFI_ReturnFieldArgs, ForeignReturnFieldArgs, ForeignReturnFieldArgsOwned, }; @@ -66,13 +67,6 @@ pub struct FFI_ScalarUDF { /// FFI equivalent to the `volatility` of a [`ScalarUDF`] pub volatility: FFI_Volatility, - /// Determines the return type of the underlying [`ScalarUDF`] based on the - /// argument types. - pub return_type: unsafe extern "C" fn( - udf: &Self, - arg_types: RVec, - ) -> RResult, - /// Determines the return info of the underlying [`ScalarUDF`]. Either this /// or return_type may be implemented on a UDF. pub return_field_from_args: unsafe extern "C" fn( @@ -134,21 +128,6 @@ impl FFI_ScalarUDF { } } -unsafe extern "C" fn return_type_fn_wrapper( - udf: &FFI_ScalarUDF, - arg_types: RVec, -) -> RResult { - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); - - let return_type = udf - .inner() - .return_type(&arg_types) - .and_then(|v| FFI_ArrowSchema::try_from(v).map_err(DataFusionError::from)) - .map(WrappedSchema); - - rresult!(return_type) -} - unsafe extern "C" fn return_field_from_args_fn_wrapper( udf: &FFI_ScalarUDF, args: FFI_ReturnFieldArgs, @@ -260,7 +239,6 @@ impl From> for FFI_ScalarUDF { volatility, short_circuits, invoke_with_args: invoke_with_args_fn_wrapper, - return_type: return_type_fn_wrapper, return_field_from_args: return_field_from_args_fn_wrapper, coerce_types: coerce_types_fn_wrapper, clone: clone_fn_wrapper, @@ -360,14 +338,8 @@ impl ScalarUDFImpl for ForeignScalarUDF { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - let arg_types = vec_datatype_to_rvec_wrapped(arg_types)?; - - let result = unsafe { (self.udf.return_type)(&self.udf, arg_types) }; - - let result = df_result!(result); - - result.and_then(|r| (&r.0).try_into().map_err(DataFusionError::from)) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("ForeignScalarUDF implements return_field_from_args instead.") } fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { @@ -472,4 +444,27 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udf_local_bypass() -> Result<()> { + use datafusion::functions::math::abs::AbsFunc; + let original_udf = AbsFunc::new(); + let original_udf = Arc::new(ScalarUDF::from(original_udf)); + + let mut ffi_udf = FFI_ScalarUDF::from(original_udf); + + // Verify local libraries can be downcast to their original + let foreign_udf: Arc = (&ffi_udf).try_into()?; + assert!(foreign_udf.as_any().downcast_ref::().is_some()); + + // Verify different library markers generate foreign providers + ffi_udf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udf: Arc = (&ffi_udf).try_into()?; + assert!(foreign_udf + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index 37f1b6821af9..1439a18401fa 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -201,18 +201,18 @@ impl TableFunctionImpl for ForeignTableFunction { #[cfg(test)] mod tests { + use super::*; use arrow::{ array::{ record_batch, ArrayRef, Float64Array, RecordBatch, StringArray, UInt64Array, }, datatypes::{DataType, Field, Schema}, }; + use datafusion::logical_expr::ptr_eq::arc_ptr_eq; use datafusion::{ catalog::MemTable, common::exec_err, prelude::lit, scalar::ScalarValue, }; - use super::*; - #[derive(Debug)] struct TestUDTF {} @@ -325,4 +325,22 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udtf_local_bypass() -> Result<()> { + let original_udtf = Arc::new(TestUDTF {}) as Arc; + + let mut ffi_udtf = FFI_TableFunction::from(Arc::clone(&original_udtf)); + + // Verify local libraries can be downcast to their original + let foreign_udtf: Arc = ffi_udtf.clone().into(); + assert!(arc_ptr_eq(&original_udtf, &foreign_udtf)); + + // Verify different library markers generate foreign providers + ffi_udtf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udtf: Arc = ffi_udtf.into(); + assert!(!arc_ptr_eq(&original_udtf, &foreign_udtf)); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index e4819b5fa7bf..74c68f3e81fe 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -394,7 +394,7 @@ impl From<&FFI_SortOptions> for SortOptions { #[cfg(feature = "integration-tests")] mod tests { use crate::tests::create_record_batch; - use crate::udwf::FFI_WindowUDF; + use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; use arrow::array::{create_array, ArrayRef}; use datafusion::functions_window::lead_lag::{lag_udwf, WindowShift}; use datafusion::logical_expr::expr::Sort; @@ -459,4 +459,28 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_udwf_local_bypass() -> datafusion_common::Result<()> { + let original_udwf = Arc::new(WindowUDF::from(WindowShift::lag())); + + let mut ffi_udwf = FFI_WindowUDF::from(original_udwf); + + // Verify local libraries can be downcast to their original + let foreign_udwf: Arc = (&ffi_udwf).try_into()?; + assert!(foreign_udwf + .as_any() + .downcast_ref::() + .is_some()); + + // Verify different library markers generate foreign providers + ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; + let foreign_udwf: Arc = (&ffi_udwf).try_into()?; + assert!(foreign_udwf + .as_any() + .downcast_ref::() + .is_some()); + + Ok(()) + } } diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index 4ec33c7b08d5..2f7784753009 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -334,4 +334,54 @@ impl PartitionEvaluator for ForeignPartitionEvaluator { } #[cfg(test)] -mod tests {} +mod tests { + use crate::udwf::partition_evaluator::{ + FFI_PartitionEvaluator, ForeignPartitionEvaluator, + }; + use arrow::array::ArrayRef; + use datafusion::logical_expr::PartitionEvaluator; + + #[derive(Debug)] + struct TestPartitionEvaluator {} + + impl PartitionEvaluator for TestPartitionEvaluator { + fn evaluate_all( + &mut self, + values: &[ArrayRef], + _num_rows: usize, + ) -> datafusion_common::Result { + Ok(values[0].to_owned()) + } + } + + #[test] + fn test_ffi_partition_evaluator_local_bypass_inner() -> datafusion_common::Result<()> + { + let original_accum = TestPartitionEvaluator {}; + let boxed_accum: Box = Box::new(original_accum); + + let ffi_accum: FFI_PartitionEvaluator = boxed_accum.into(); + + // Verify local libraries can be downcast to their original + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn PartitionEvaluator + as *const TestPartitionEvaluator); + assert!(!concrete.uses_window_frame()); + } + + // Verify different library markers generate foreign accumulator + let original_accum = TestPartitionEvaluator {}; + let boxed_accum: Box = Box::new(original_accum); + let mut ffi_accum: FFI_PartitionEvaluator = boxed_accum.into(); + ffi_accum.library_marker_id = crate::mock_foreign_marker_id; + let foreign_accum: Box = ffi_accum.into(); + unsafe { + let concrete = &*(foreign_accum.as_ref() as *const dyn PartitionEvaluator + as *const ForeignPartitionEvaluator); + assert!(!concrete.uses_window_frame()); + } + + Ok(()) + } +} From 0f616440c182f619fd905b8675b99f7e99433c6f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 13 Nov 2025 08:08:31 -0500 Subject: [PATCH 16/22] Machete --- Cargo.lock | 1 - datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0f811f16f1b..20aabb996662 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3042,7 +3042,6 @@ version = "0.1.0" dependencies = [ "abi_stable", "datafusion", - "datafusion-ffi", "ffi_module_interface", "tokio", ] diff --git a/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml b/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml index 028a366aab1c..1f68bb1bb1be 100644 --- a/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml +++ b/datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml @@ -24,6 +24,5 @@ publish = false [dependencies] abi_stable = "0.11.3" datafusion = { workspace = true } -datafusion-ffi = { workspace = true } ffi_module_interface = { path = "../ffi_module_interface" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } From 823f3c9a43f2455c46e7fab4e41c219ba0fa1239 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 13 Nov 2025 08:54:25 -0500 Subject: [PATCH 17/22] taplo fmt --- datafusion/ffi/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 2cf040f9bd9c..b797804731f4 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -58,8 +58,8 @@ semver = "1.0.27" tokio = { workspace = true } [dev-dependencies] -doc-comment = { workspace = true } datafusion = { workspace = true, default-features = false, features = ["sql"] } +doc-comment = { workspace = true } [features] integration-tests = [] From 7629a6a6e9fb3504943eefa02ca6677747340a32 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 13 Nov 2025 10:35:58 -0500 Subject: [PATCH 18/22] Add upgrade text --- docs/source/library-user-guide/upgrading.md | 41 +++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index e116bfffeda6..f62095ab56a0 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -152,6 +152,47 @@ Instead of silently succeeding. The remove API no longer requires a mutable instance +### FFI object conversion + +Many of the structs in the `datafusion-ffi` crate have been updated to allow easier +conversion to the underlying trait types they represent. This simplifies some code +paths, but also provides an additional improvement in cases where library code goes +through a round trip via the foreign function interface. + +To update your code, suppose you have a `FFI_SchemaProvider` called `ffi_provider` +and you wish to use this as a `SchemaProvider`. In the old approach you would do +something like: + +```rust + let foreign_provider: ForeignSchemaProvider = provider.into(); + let foreign_provider = Arc::new(foreign_provider) as Arc; +``` + +This code should now be written as: + +```rust + let foreign_provider: Arc = provider.into(); + let foreign_provider = foreign_provider as Arc; +``` + +For the case of user defined functions, the updates are similar but you +may need to change the way you call the creation of the `ScalarUDF`. +Aggregate and window functions follow the same pattern. + +Previously you may write: + +```rust + let foreign_udf: ForeignScalarUDF = ffi_udf.try_into()?; + let foreign_udf: ScalarUDF = foreign_udf.into(); +``` + +Instead this should now be: + +```rust + let foreign_udf: Arc = ffi_udf.try_into()?; + let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf); +``` + ## DataFusion `51.0.0` ### `arrow` / `parquet` updated to 57.0.0 From 35cf83326341bb4735629a0047167340d64ffe59 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 20 Nov 2025 13:04:37 -0500 Subject: [PATCH 19/22] Minor update to doc --- docs/source/library-user-guide/upgrading.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index f62095ab56a0..af0e75b80523 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -163,14 +163,14 @@ To update your code, suppose you have a `FFI_SchemaProvider` called `ffi_provide and you wish to use this as a `SchemaProvider`. In the old approach you would do something like: -```rust +```rust,ignore let foreign_provider: ForeignSchemaProvider = provider.into(); let foreign_provider = Arc::new(foreign_provider) as Arc; ``` This code should now be written as: -```rust +```rust,ignore let foreign_provider: Arc = provider.into(); let foreign_provider = foreign_provider as Arc; ``` @@ -181,14 +181,14 @@ Aggregate and window functions follow the same pattern. Previously you may write: -```rust +```rust,ignore let foreign_udf: ForeignScalarUDF = ffi_udf.try_into()?; let foreign_udf: ScalarUDF = foreign_udf.into(); ``` Instead this should now be: -```rust +```rust,ignore let foreign_udf: Arc = ffi_udf.try_into()?; let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf); ``` From 5fcc800e3e3f7b2a0c6c0ddb34ad0327ae6b277e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 21 Nov 2025 09:59:01 -0500 Subject: [PATCH 20/22] Update to review comments, mostly documentation and switchin library marker id to usize --- .../examples/ffi/ffi_module_loader/src/main.rs | 2 +- datafusion/ffi/README.md | 2 +- datafusion/ffi/src/catalog_provider.rs | 2 +- datafusion/ffi/src/catalog_provider_list.rs | 2 +- datafusion/ffi/src/execution_plan.rs | 2 +- datafusion/ffi/src/lib.rs | 6 +++--- datafusion/ffi/src/plan_properties.rs | 6 +++--- datafusion/ffi/src/schema_provider.rs | 2 +- datafusion/ffi/src/table_provider.rs | 2 +- datafusion/ffi/src/udaf/accumulator.rs | 2 +- datafusion/ffi/src/udaf/groups_accumulator.rs | 2 +- datafusion/ffi/src/udaf/mod.rs | 2 +- datafusion/ffi/src/udf/mod.rs | 5 ++--- datafusion/ffi/src/udtf.rs | 2 +- datafusion/ffi/src/udwf/mod.rs | 2 +- datafusion/ffi/src/udwf/partition_evaluator.rs | 2 +- datafusion/ffi/tests/ffi_integration.rs | 2 +- docs/source/library-user-guide/upgrading.md | 4 ++-- 18 files changed, 24 insertions(+), 25 deletions(-) diff --git a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs index b0b24367ec2c..170c7c3eff8a 100644 --- a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs +++ b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs @@ -49,7 +49,7 @@ async fn main() -> Result<()> { ))?(); // In order to access the table provider within this executable, we need to - // turn it into a `ForeignTableProvider`. + // turn it into a `TableProvider`. let foreign_table_provider: Arc = (&ffi_table_provider).into(); let ctx = SessionContext::new(); diff --git a/datafusion/ffi/README.md b/datafusion/ffi/README.md index bf7fe2dfb538..3c7595da6f49 100644 --- a/datafusion/ffi/README.md +++ b/datafusion/ffi/README.md @@ -104,7 +104,7 @@ the example in `FFI_TableProvider`. ## Library Marker ID When reviewing the code, many of the structs in this crate contain a call to -a `library_maker_id`. The purpose of this call is to determine if a library is +a `library_marker_id`. The purpose of this call is to determine if a library is accessing _local_ code through the FFI structs. Consider this example: you have a `primary` program that exposes functions to create a schema provider. You have a `secondary` library that exposes a function to create a catalog provider diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index c9a6d9946483..cf19d1cecf9c 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -73,7 +73,7 @@ pub struct FFI_CatalogProvider { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_CatalogProvider {} diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index db0e33ab51c9..aa5a75c2ec58 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -61,7 +61,7 @@ pub struct FFI_CatalogProviderList { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_CatalogProviderList {} diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index bad206f36c1e..328df551d4a5 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -68,7 +68,7 @@ pub struct FFI_ExecutionPlan { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_ExecutionPlan {} diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 872d6d08fae8..d64e29ac0999 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -72,15 +72,15 @@ static LIBRARY_MARKER: u8 = 0; /// a different address for the marker. By checking the marker /// address we can determine if a struct is truly Foreign or is /// actually within the same originating library. -pub extern "C" fn get_library_marker_id() -> u64 { - &LIBRARY_MARKER as *const u8 as u64 +pub extern "C" fn get_library_marker_id() -> usize { + &LIBRARY_MARKER as *const u8 as usize } /// For unit testing in this crate we need to trick the providers /// into thinking we have a foreign call. We do this by overwriting /// their `library_marker_id` function to return a different value. #[cfg(test)] -pub(crate) extern "C" fn mock_foreign_marker_id() -> u64 { +pub(crate) extern "C" fn mock_foreign_marker_id() -> usize { get_library_marker_id() + 1 } diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index e8b45cab06ab..8aa856bf38f7 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -78,7 +78,7 @@ pub struct FFI_PlanProperties { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } struct PlanPropertiesPrivateData { @@ -347,13 +347,13 @@ mod tests { // Verify local libraries let foreign_plan: PlanProperties = ffi_plan.try_into()?; - assert_eq!(format!("{foreign_plan:?}"), format!("{:?}", foreign_plan)); + assert_eq!(format!("{foreign_plan:?}"), format!("{props:?}")); // Verify different library markers still can produce identical properties let mut ffi_plan = FFI_PlanProperties::from(&props); ffi_plan.library_marker_id = crate::mock_foreign_marker_id; let foreign_plan: PlanProperties = ffi_plan.try_into()?; - assert_eq!(format!("{foreign_plan:?}"), format!("{:?}", foreign_plan)); + assert_eq!(format!("{foreign_plan:?}"), format!("{props:?}")); Ok(()) } diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index c10ee0d282ca..8dbd6eac7fb7 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -83,7 +83,7 @@ pub struct FFI_SchemaProvider { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_SchemaProvider {} diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 5f051f7f5db0..f62666e916c3 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -158,7 +158,7 @@ pub struct FFI_TableProvider { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_TableProvider {} diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index 227b04902aa2..bbdc4bb7b227 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -73,7 +73,7 @@ pub struct FFI_Accumulator { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_Accumulator {} diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 82095383efb2..d581fa446b06 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -89,7 +89,7 @@ pub struct FFI_GroupsAccumulator { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_GroupsAccumulator {} diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index 78dc27af8b22..bd1bfa064243 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -148,7 +148,7 @@ pub struct FFI_AggregateUDF { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_AggregateUDF {} diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 4900ca821be4..90141af9d05d 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -67,8 +67,7 @@ pub struct FFI_ScalarUDF { /// FFI equivalent to the `volatility` of a [`ScalarUDF`] pub volatility: FFI_Volatility, - /// Determines the return info of the underlying [`ScalarUDF`]. Either this - /// or return_type may be implemented on a UDF. + /// Determines the return info of the underlying [`ScalarUDF`]. pub return_field_from_args: unsafe extern "C" fn( udf: &Self, args: FFI_ReturnFieldArgs, @@ -111,7 +110,7 @@ pub struct FFI_ScalarUDF { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_ScalarUDF {} diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index 1439a18401fa..0810aacaa87f 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -63,7 +63,7 @@ pub struct FFI_TableFunction { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_TableFunction {} diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 74c68f3e81fe..f7468095bc5f 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -108,7 +108,7 @@ pub struct FFI_WindowUDF { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_WindowUDF {} diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index 2f7784753009..e0791a74243e 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -79,7 +79,7 @@ pub struct FFI_PartitionEvaluator { /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. - pub library_marker_id: extern "C" fn() -> u64, + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_PartitionEvaluator {} diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index c782a66fc71f..216d6576a821 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -41,7 +41,7 @@ mod tests { )?(synchronous); // In order to access the table provider within this executable, we need to - // turn it into a `ForeignTableProvider`. + // turn it into a `TableProvider`. let foreign_table_provider: Arc = (&ffi_table_provider).into(); let ctx = SessionContext::new(); diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index af0e75b80523..ddd6f5f1389c 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -164,14 +164,14 @@ and you wish to use this as a `SchemaProvider`. In the old approach you would do something like: ```rust,ignore - let foreign_provider: ForeignSchemaProvider = provider.into(); + let foreign_provider: ForeignSchemaProvider = ffi_provider.into(); let foreign_provider = Arc::new(foreign_provider) as Arc; ``` This code should now be written as: ```rust,ignore - let foreign_provider: Arc = provider.into(); + let foreign_provider: Arc = ffi_provider.into(); let foreign_provider = foreign_provider as Arc; ``` From 11d907b22d0e26463f68882d77c4ce503abdcd78 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 22 Nov 2025 11:04:20 -0500 Subject: [PATCH 21/22] Documentation updates --- datafusion/ffi/README.md | 43 ++++++++++++++++++- datafusion/ffi/src/catalog_provider.rs | 3 +- datafusion/ffi/src/catalog_provider_list.rs | 3 +- datafusion/ffi/src/execution_plan.rs | 3 +- datafusion/ffi/src/lib.rs | 2 + datafusion/ffi/src/plan_properties.rs | 3 +- datafusion/ffi/src/schema_provider.rs | 3 +- datafusion/ffi/src/table_provider.rs | 3 +- datafusion/ffi/src/udaf/accumulator.rs | 3 +- datafusion/ffi/src/udaf/groups_accumulator.rs | 3 +- datafusion/ffi/src/udaf/mod.rs | 3 +- datafusion/ffi/src/udf/mod.rs | 3 +- datafusion/ffi/src/udtf.rs | 3 +- datafusion/ffi/src/udwf/mod.rs | 3 +- .../ffi/src/udwf/partition_evaluator.rs | 3 +- docs/source/library-user-guide/upgrading.md | 6 ++- 16 files changed, 74 insertions(+), 16 deletions(-) diff --git a/datafusion/ffi/README.md b/datafusion/ffi/README.md index 3c7595da6f49..cd6c4a809e49 100644 --- a/datafusion/ffi/README.md +++ b/datafusion/ffi/README.md @@ -101,6 +101,37 @@ In this crate we have a variety of structs which closely mimic the behavior of their internal counterparts. To see detailed notes about how to use them, see the example in `FFI_TableProvider`. +## Memory Management + +One of the advantages of Rust the ownership model, which means programmers +_usually_ do not need to worry about memory management. When interacting with +foreign code, this is not necessarily true. If you review the structures in +this crate, you will find that many of them implement the `Drop` trait and +perform a foreign call. + +Suppose we have a `FFI_CatalogProvider`, for example. This struct is safe to +pass across the FFI boundary, so it may be owned be either the library that +produces the underlying `CatalogProvider` or by another library that consumes +it. If we look closer at the `FFI_CatalogProvider`, it has a pointer to +some private data. That private data is only accessible on the producer's +side. If you attempt to access it on the consumer's side, you may get +segmentation faults or other bad behavior. Within that private data is the +actual `Arc` must be freed, but if the +`FFI_CatalogProvider` is only owned on the consumer's side, we have no way +to access the private data and free it. + +To account for this most structs in this crate have a `release` method that +is used to clean up any privately held data. This calls into the producer's +side, regardless of if it is called on either the local or foreign side. +Most of the structs in this crate carry atomic reference counts to the +underlying data, and this is straight forward. Some structs like the +`FFI_Accumulator` contain an inner `Box`. The reason for +this is that we need to be able to mutably acccess these based on the +`Accumulator` trait definition. For these we have slightly more complicated +release code based on whether it is being dropped on the local or foreign side. +Traits that use a `Box<>` for their underlying data also cannot implement +`Clone`. + ## Library Marker ID When reviewing the code, many of the structs in this crate contain a call to @@ -119,9 +150,9 @@ this case that schema provider is actually local code to the `primary` program except that it is wrapped in the FFI code! We work around this by the `library_marker_id` calls. What this does is it -creates a global variable within each library and returns a `u64` address +creates a global variable within each library and returns a `usize` address of that library. This is guaranteed to be unique for every library that contains -FFI code. By comparing these `u64` addresses we can determine if a FFI struct +FFI code. By comparing these `usize` addresses we can determine if a FFI struct is local or foreign. In our example of the schema provider, if you were to make a call in your @@ -131,6 +162,14 @@ comparing the `library_marker_id` of this object to the `primary` program, we determine it is local code. This means it is safe to access the underlying private data. +Users of the FFI code should not need to access these function. If you are +implementing a new FFI struct, then it is recommended that you follow the +established patterns for converting from FFI struct into the underlying +traits. Specifically you should use `crate::get_library_marker_id` and in +your unit tests you should override this with +`crate::mock_foreign_marker_id` to force your test to create the foreign +variant of your struct. + [apache datafusion]: https://datafusion.apache.org/ [api docs]: http://docs.rs/datafusion-ffi/latest [rust abi]: https://doc.rust-lang.org/reference/abi.html diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index cf19d1cecf9c..00e8dc315811 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -72,7 +72,8 @@ pub struct FFI_CatalogProvider { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index aa5a75c2ec58..429897269470 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -60,7 +60,8 @@ pub struct FFI_CatalogProviderList { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 328df551d4a5..d76dcd8dd0c9 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -67,7 +67,8 @@ pub struct FFI_ExecutionPlan { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index d64e29ac0999..c85d16f5db33 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -72,6 +72,8 @@ static LIBRARY_MARKER: u8 = 0; /// a different address for the marker. By checking the marker /// address we can determine if a struct is truly Foreign or is /// actually within the same originating library. +/// +/// See the crate's `README.md` for additional information. pub extern "C" fn get_library_marker_id() -> usize { &LIBRARY_MARKER as *const u8 as usize } diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 8aa856bf38f7..0b8177a41242 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -77,7 +77,8 @@ pub struct FFI_PlanProperties { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index 8dbd6eac7fb7..d6feeb6b8fb3 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -82,7 +82,8 @@ pub struct FFI_SchemaProvider { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index f62666e916c3..10b44a147fa0 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -157,7 +157,8 @@ pub struct FFI_TableProvider { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index bbdc4bb7b227..8626d1a42679 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -72,7 +72,8 @@ pub struct FFI_Accumulator { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index d581fa446b06..b088804678b9 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -88,7 +88,8 @@ pub struct FFI_GroupsAccumulator { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index bd1bfa064243..a416753c371b 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -147,7 +147,8 @@ pub struct FFI_AggregateUDF { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 90141af9d05d..b90cc267e0bd 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -109,7 +109,8 @@ pub struct FFI_ScalarUDF { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index 0810aacaa87f..e603b9234c33 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -62,7 +62,8 @@ pub struct FFI_TableFunction { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index f7468095bc5f..d961ffa5b59b 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -107,7 +107,8 @@ pub struct FFI_WindowUDF { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index e0791a74243e..448db1f8ae1a 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -78,7 +78,8 @@ pub struct FFI_PartitionEvaluator { pub private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through - /// the foreign interface. + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. pub library_marker_id: extern "C" fn() -> usize, } diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index ddd6f5f1389c..e6951baec109 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -152,7 +152,7 @@ Instead of silently succeeding. The remove API no longer requires a mutable instance -### FFI object conversion +### FFI crate updates Many of the structs in the `datafusion-ffi` crate have been updated to allow easier conversion to the underlying trait types they represent. This simplifies some code @@ -193,6 +193,10 @@ Instead this should now be: let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf); ``` +Additionally, the FFI structure for Scalar UDF's no longer contains a +`return_type` call. This code was not used since the `ForeignScalarUDF` +struct implements the `return_field_from_args` instead. + ## DataFusion `51.0.0` ### `arrow` / `parquet` updated to 57.0.0 From 407c1d155f1198c23e210cabdbc1a35bd79941c0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 22 Nov 2025 11:08:48 -0500 Subject: [PATCH 22/22] Spell checker --- datafusion/ffi/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/ffi/README.md b/datafusion/ffi/README.md index cd6c4a809e49..afbda95d45e2 100644 --- a/datafusion/ffi/README.md +++ b/datafusion/ffi/README.md @@ -103,7 +103,7 @@ the example in `FFI_TableProvider`. ## Memory Management -One of the advantages of Rust the ownership model, which means programmers +One of the advantages of Rust the ownership model, which means programmers _usually_ do not need to worry about memory management. When interacting with foreign code, this is not necessarily true. If you review the structures in this crate, you will find that many of them implement the `Drop` trait and @@ -126,7 +126,7 @@ side, regardless of if it is called on either the local or foreign side. Most of the structs in this crate carry atomic reference counts to the underlying data, and this is straight forward. Some structs like the `FFI_Accumulator` contain an inner `Box`. The reason for -this is that we need to be able to mutably acccess these based on the +this is that we need to be able to mutably access these based on the `Accumulator` trait definition. For these we have slightly more complicated release code based on whether it is being dropped on the local or foreign side. Traits that use a `Box<>` for their underlying data also cannot implement