From 604ec93703378236aaf4428a180e2935b41636a3 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 12 Nov 2025 15:27:04 -0500 Subject: [PATCH 1/2] Initial commit for catalog provider list FFI work --- datafusion/ffi/src/catalog_provider.rs | 2 +- datafusion/ffi/src/catalog_provider_list.rs | 283 ++++++++++++++++++++ datafusion/ffi/src/lib.rs | 1 + datafusion/ffi/src/tests/catalog.rs | 54 ++++ datafusion/ffi/src/tests/mod.rs | 6 + datafusion/ffi/tests/ffi_catalog.rs | 82 ++++++ datafusion/ffi/tests/ffi_integration.rs | 27 -- 7 files changed, 427 insertions(+), 28 deletions(-) create mode 100644 datafusion/ffi/src/catalog_provider_list.rs create mode 100644 datafusion/ffi/tests/ffi_catalog.rs diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index 65dcab34f17d..d279951783b4 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -204,7 +204,7 @@ impl FFI_CatalogProvider { /// defined on this struct must only use the stable functions provided in /// FFI_CatalogProvider to interact with the foreign table provider. #[derive(Debug)] -pub struct ForeignCatalogProvider(FFI_CatalogProvider); +pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider); unsafe impl Send for ForeignCatalogProvider {} unsafe impl Sync for ForeignCatalogProvider {} diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs new file mode 100644 index 000000000000..0cbbcc93136a --- /dev/null +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, ffi::c_void, sync::Arc}; + +use abi_stable::{ + std_types::{ROption, RString, RVec}, + StableAbi, +}; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; +use tokio::runtime::Handle; + +use crate::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider}; + +/// A stable struct for sharing [`CatalogProviderList`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_CatalogProviderList { + /// Register a catalog + pub register_catalog: unsafe extern "C" fn( + &Self, + name: RString, + catalog: &FFI_CatalogProvider, + ) -> ROption, + + /// List of existing catalogs + pub catalog_names: unsafe extern "C" fn(&Self) -> RVec, + + /// Access a catalog + pub catalog: + unsafe extern "C" fn(&Self, name: RString) -> ROption, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Return the major DataFusion version number of this provider. + pub version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignCatalogProviderList`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_CatalogProviderList {} +unsafe impl Sync for FFI_CatalogProviderList {} + +struct ProviderPrivateData { + provider: Arc, + runtime: Option, +} + +impl FFI_CatalogProviderList { + unsafe fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ProviderPrivateData; + &(*private_data).provider + } + + unsafe fn runtime(&self) -> Option { + let private_data = self.private_data as *const ProviderPrivateData; + (*private_data).runtime.clone() + } +} + +unsafe extern "C" fn catalog_names_fn_wrapper( + provider: &FFI_CatalogProviderList, +) -> RVec { + let names = provider.inner().catalog_names(); + names.into_iter().map(|s| s.into()).collect() +} + +unsafe extern "C" fn register_catalog_fn_wrapper( + provider: &FFI_CatalogProviderList, + name: RString, + catalog: &FFI_CatalogProvider, +) -> ROption { + let runtime = provider.runtime(); + let provider = provider.inner(); + let catalog = Arc::new(ForeignCatalogProvider::from(catalog)); + + provider + .register_catalog(name.into(), catalog) + .map(|catalog| FFI_CatalogProvider::new(catalog, runtime)) + .into() +} + +unsafe extern "C" fn catalog_fn_wrapper( + provider: &FFI_CatalogProviderList, + name: RString, +) -> ROption { + let runtime = provider.runtime(); + let provider = provider.inner(); + provider + .catalog(name.as_str()) + .map(|catalog| FFI_CatalogProvider::new(catalog, runtime)) + .into() +} + +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProviderList) { + let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper( + provider: &FFI_CatalogProviderList, +) -> FFI_CatalogProviderList { + let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + runtime, + })) as *mut c_void; + + FFI_CatalogProviderList { + register_catalog: register_catalog_fn_wrapper, + catalog_names: catalog_names_fn_wrapper, + catalog: catalog_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + } +} + +impl Drop for FFI_CatalogProviderList { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl FFI_CatalogProviderList { + /// Creates a new [`FFI_CatalogProviderList`]. + pub fn new( + provider: Arc, + runtime: Option, + ) -> Self { + let private_data = Box::new(ProviderPrivateData { provider, runtime }); + + Self { + register_catalog: register_catalog_fn_wrapper, + catalog_names: catalog_names_fn_wrapper, + catalog: catalog_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data: Box::into_raw(private_data) as *mut c_void, + } + } +} + +/// This wrapper struct exists on the receiver side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_CatalogProviderList to interact with the foreign table provider. +#[derive(Debug)] +pub struct ForeignCatalogProviderList(FFI_CatalogProviderList); + +unsafe impl Send for ForeignCatalogProviderList {} +unsafe impl Sync for ForeignCatalogProviderList {} + +impl From<&FFI_CatalogProviderList> for ForeignCatalogProviderList { + fn from(provider: &FFI_CatalogProviderList) -> Self { + Self(provider.clone()) + } +} + +impl Clone for FFI_CatalogProviderList { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl CatalogProviderList for ForeignCatalogProviderList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + unsafe { + let catalog = match catalog.as_any().downcast_ref::() + { + Some(s) => &s.0, + None => &FFI_CatalogProvider::new(catalog, None), + }; + + (self.0.register_catalog)(&self.0, name.into(), catalog) + .map(|s| Arc::new(ForeignCatalogProvider(s)) as Arc) + .into() + } + } + + fn catalog_names(&self) -> Vec { + unsafe { + (self.0.catalog_names)(&self.0) + .into_iter() + .map(Into::into) + .collect() + } + } + + fn catalog(&self, name: &str) -> Option> { + unsafe { + (self.0.catalog)(&self.0, name.into()) + .map(|catalog| { + Arc::new(ForeignCatalogProvider(catalog)) as Arc + }) + .into() + } + } +} + +#[cfg(test)] +mod tests { + use datafusion::catalog::{MemoryCatalogProvider, MemoryCatalogProviderList}; + + use super::*; + + #[test] + fn test_round_trip_ffi_catalog_provider_list() { + let prior_catalog = Arc::new(MemoryCatalogProvider::new()); + + let catalog_list = Arc::new(MemoryCatalogProviderList::new()); + assert!(catalog_list + .as_ref() + .register_catalog("prior_catalog".to_owned(), prior_catalog) + .is_none()); + + let ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None); + + let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into(); + + let prior_catalog_names = foreign_catalog_list.catalog_names(); + assert_eq!(prior_catalog_names.len(), 1); + assert_eq!(prior_catalog_names[0], "prior_catalog"); + + // Replace an existing catalog with one of the same name + let returned_catalog = foreign_catalog_list.register_catalog( + "prior_catalog".to_owned(), + Arc::new(MemoryCatalogProvider::new()), + ); + assert!(returned_catalog.is_some()); + assert_eq!(foreign_catalog_list.catalog_names().len(), 1); + + // Add a new catalog + let returned_catalog = foreign_catalog_list.register_catalog( + "second_catalog".to_owned(), + Arc::new(MemoryCatalogProvider::new()), + ); + assert!(returned_catalog.is_none()); + assert_eq!(foreign_catalog_list.catalog_names().len(), 2); + + // Retrieve non-existent catalog + let returned_catalog = foreign_catalog_list.catalog("non_existent_catalog"); + assert!(returned_catalog.is_none()); + + // Retrieve valid catalog + let returned_catalog = foreign_catalog_list.catalog("second_catalog"); + assert!(returned_catalog.is_some()); + } +} diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 0c2340e8ce7b..a8094057773b 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -26,6 +26,7 @@ pub mod arrow_wrappers; pub mod catalog_provider; +pub mod catalog_provider_list; pub mod execution_plan; pub mod insert_op; pub mod plan_properties; diff --git a/datafusion/ffi/src/tests/catalog.rs b/datafusion/ffi/src/tests/catalog.rs index f4293adb41b9..e934a01cdb60 100644 --- a/datafusion/ffi/src/tests/catalog.rs +++ b/datafusion/ffi/src/tests/catalog.rs @@ -28,8 +28,10 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use crate::catalog_provider::FFI_CatalogProvider; +use crate::catalog_provider_list::FFI_CatalogProviderList; use arrow::datatypes::Schema; use async_trait::async_trait; +use datafusion::catalog::{CatalogProviderList, MemoryCatalogProviderList}; use datafusion::{ catalog::{ CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, @@ -181,3 +183,55 @@ pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider { let catalog_provider = Arc::new(FixedCatalogProvider::default()); FFI_CatalogProvider::new(catalog_provider, None) } + +/// This catalog provider list is intended only for unit tests. It prepopulates with one +/// catalog and only allows for catalogs named after four colors. +#[derive(Debug)] +pub struct FixedCatalogProviderList { + inner: MemoryCatalogProviderList, +} + +impl Default for FixedCatalogProviderList { + fn default() -> Self { + let inner = MemoryCatalogProviderList::new(); + + let _ = inner.register_catalog( + "blue".to_owned(), + Arc::new(FixedCatalogProvider::default()), + ); + + Self { inner } + } +} + +impl CatalogProviderList for FixedCatalogProviderList { + fn as_any(&self) -> &dyn Any { + self + } + + fn catalog_names(&self) -> Vec { + self.inner.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + self.inner.catalog(name) + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + if !["blue", "red", "green", "yellow"].contains(&name.as_str()) { + log::warn!("FixedCatalogProviderList only provides four schemas: blue, red, green, yellow"); + return None; + } + + self.inner.register_catalog(name, catalog) + } +} + +pub(crate) extern "C" fn create_catalog_provider_list() -> FFI_CatalogProviderList { + let catalog_provider_list = Arc::new(FixedCatalogProviderList::default()); + FFI_CatalogProviderList::new(catalog_provider_list, None) +} diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index 816086c32041..d9b4a61579e9 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -34,6 +34,8 @@ use crate::udaf::FFI_AggregateUDF; use crate::udwf::FFI_WindowUDF; use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF}; +use crate::catalog_provider_list::FFI_CatalogProviderList; +use crate::tests::catalog::create_catalog_provider_list; use arrow::array::RecordBatch; use async_provider::create_async_table_provider; use datafusion::{ @@ -62,6 +64,9 @@ pub struct ForeignLibraryModule { /// Construct an opinionated catalog provider pub create_catalog: extern "C" fn() -> FFI_CatalogProvider, + /// Construct an opinionated catalog provider list + pub create_catalog_list: extern "C" fn() -> FFI_CatalogProviderList, + /// Constructs the table provider pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider, @@ -123,6 +128,7 @@ extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider { pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { ForeignLibraryModule { create_catalog: create_catalog_provider, + create_catalog_list: create_catalog_provider_list, create_table: construct_table_provider, create_scalar_udf: create_ffi_abs_func, create_nullary_udf: create_ffi_random_func, diff --git a/datafusion/ffi/tests/ffi_catalog.rs b/datafusion/ffi/tests/ffi_catalog.rs new file mode 100644 index 000000000000..a264440cd784 --- /dev/null +++ b/datafusion/ffi/tests/ffi_catalog.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Add an additional module here for convenience to scope this to only +/// when the feature integration-tests is built +#[cfg(feature = "integration-tests")] +mod tests { + use datafusion::prelude::SessionContext; + use datafusion_common::DataFusionError; + use datafusion_ffi::catalog_provider::ForeignCatalogProvider; + use datafusion_ffi::catalog_provider_list::ForeignCatalogProviderList; + use datafusion_ffi::tests::utils::get_module; + use std::sync::Arc; + + #[tokio::test] + async fn test_catalog() -> datafusion_common::Result<()> { + let module = get_module()?; + + let ffi_catalog = + module + .create_catalog() + .ok_or(DataFusionError::NotImplemented( + "External catalog provider failed to implement create_catalog" + .to_string(), + ))?(); + let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); + + let ctx = SessionContext::default(); + let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog)); + + let df = ctx.table("fruit.apple.purchases").await?; + + let results = df.collect().await?; + + assert_eq!(results.len(), 2); + let num_rows: usize = results.into_iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(num_rows, 5); + + Ok(()) + } + + #[tokio::test] + async fn test_catalog_list() -> datafusion_common::Result<()> { + let module = get_module()?; + + let ffi_catalog_list = + module + .create_catalog_list() + .ok_or(DataFusionError::NotImplemented( + "External catalog provider failed to implement create_catalog" + .to_string(), + ))?(); + let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into(); + + let ctx = SessionContext::default(); + ctx.register_catalog_list(Arc::new(foreign_catalog_list)); + + let df = ctx.table("blue.apple.purchases").await?; + + let results = df.collect().await?; + + assert_eq!(results.len(), 2); + let num_rows: usize = results.into_iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(num_rows, 5); + + Ok(()) + } +} diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index eb53e76bfb9b..7b4d1b1e350a 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -21,7 +21,6 @@ mod tests { use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; - use datafusion_ffi::catalog_provider::ForeignCatalogProvider; use datafusion_ffi::table_provider::ForeignTableProvider; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; @@ -69,30 +68,4 @@ mod tests { async fn sync_test_table_provider() -> Result<()> { test_table_provider(true).await } - - #[tokio::test] - async fn test_catalog() -> Result<()> { - let module = get_module()?; - - let ffi_catalog = - module - .create_catalog() - .ok_or(DataFusionError::NotImplemented( - "External catalog provider failed to implement create_catalog" - .to_string(), - ))?(); - let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); - - let ctx = SessionContext::default(); - let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog)); - - let df = ctx.table("fruit.apple.purchases").await?; - - let results = df.collect().await?; - - assert!(!results.is_empty()); - assert!(results[0].num_rows() != 0); - - Ok(()) - } } From ade3094585c3399a8f3da0bfb0aa1928ebd6058a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 13 Nov 2025 08:25:17 -0500 Subject: [PATCH 2/2] apply naming changes from PR review --- datafusion/ffi/src/catalog_provider_list.rs | 6 +++--- datafusion/ffi/src/tests/catalog.rs | 7 +++---- datafusion/ffi/tests/ffi_catalog.rs | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index 0cbbcc93136a..b09f06d318c1 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -45,8 +45,8 @@ pub struct FFI_CatalogProviderList { pub catalog: unsafe extern "C" fn(&Self, name: RString) -> ROption, - /// Used to create a clone on the provider of the execution plan. This should - /// only need to be called by the receiver of the plan. + /// Used to create a clone on the provider. This should only need to be called + /// by the receiver of the plan. pub clone: unsafe extern "C" fn(plan: &Self) -> Self, /// Release the memory of the private data when it is no longer being used. @@ -170,7 +170,7 @@ impl FFI_CatalogProviderList { /// This wrapper struct exists on the receiver side of the FFI interface, so it has /// no guarantees about being able to access the data in `private_data`. Any functions /// defined on this struct must only use the stable functions provided in -/// FFI_CatalogProviderList to interact with the foreign table provider. +/// FFI_CatalogProviderList to interact with the foreign catalog provider list. #[derive(Debug)] pub struct ForeignCatalogProviderList(FFI_CatalogProviderList); diff --git a/datafusion/ffi/src/tests/catalog.rs b/datafusion/ffi/src/tests/catalog.rs index e934a01cdb60..b6efbdf726e0 100644 --- a/datafusion/ffi/src/tests/catalog.rs +++ b/datafusion/ffi/src/tests/catalog.rs @@ -31,11 +31,10 @@ use crate::catalog_provider::FFI_CatalogProvider; use crate::catalog_provider_list::FFI_CatalogProviderList; use arrow::datatypes::Schema; use async_trait::async_trait; -use datafusion::catalog::{CatalogProviderList, MemoryCatalogProviderList}; use datafusion::{ catalog::{ - CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, - TableProvider, + CatalogProvider, CatalogProviderList, MemoryCatalogProvider, + MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider, TableProvider, }, common::exec_err, datasource::MemTable, @@ -223,7 +222,7 @@ impl CatalogProviderList for FixedCatalogProviderList { catalog: Arc, ) -> Option> { if !["blue", "red", "green", "yellow"].contains(&name.as_str()) { - log::warn!("FixedCatalogProviderList only provides four schemas: blue, red, green, yellow"); + log::warn!("FixedCatalogProviderList only provides four catalogs: blue, red, green, yellow"); return None; } diff --git a/datafusion/ffi/tests/ffi_catalog.rs b/datafusion/ffi/tests/ffi_catalog.rs index a264440cd784..b63d8cbd631b 100644 --- a/datafusion/ffi/tests/ffi_catalog.rs +++ b/datafusion/ffi/tests/ffi_catalog.rs @@ -61,7 +61,7 @@ mod tests { module .create_catalog_list() .ok_or(DataFusionError::NotImplemented( - "External catalog provider failed to implement create_catalog" + "External catalog provider failed to implement create_catalog_list" .to_string(), ))?(); let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into();