Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ publish = false
[dependencies]
abi_stable = "0.11.3"
datafusion = { workspace = true }
datafusion-ffi = { workspace = true }
ffi_module_interface = { path = "../ffi_module_interface" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion::{
};

use abi_stable::library::{development_utils::compute_library_path, RootModule};
use datafusion_ffi::table_provider::ForeignTableProvider;
use datafusion::datasource::TableProvider;
use ffi_module_interface::TableProviderModuleRef;

#[tokio::main]
Expand All @@ -49,13 +49,13 @@ async fn main() -> Result<()> {
))?();

// In order to access the table provider within this executable, we need to
// turn it into a `ForeignTableProvider`.
let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into();
// turn it into a `TableProvider`.
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_table_provider).into();

let ctx = SessionContext::new();

// Display the data to show the full cycle works.
ctx.register_table("external_table", Arc::new(foreign_table_provider))?;
ctx.register_table("external_table", foreign_table_provider)?;
let df = ctx.table("external_table").await?;
df.show().await?;

Expand Down
1 change: 1 addition & 0 deletions datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ semver = "1.0.27"
tokio = { workspace = true }

[dev-dependencies]
datafusion = { workspace = true, default-features = false, features = ["sql"] }
doc-comment = { workspace = true }

[features]
Expand Down
30 changes: 30 additions & 0 deletions datafusion/ffi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,36 @@ In this crate we have a variety of structs which closely mimic the behavior of
their internal counterparts. To see detailed notes about how to use them, see
the example in `FFI_TableProvider`.

## Library Marker ID

When reviewing the code, many of the structs in this crate contain a call to
a `library_marker_id`. The purpose of this call is to determine if a library is
accessing _local_ code through the FFI structs. Consider this example: you have
a `primary` program that exposes functions to create a schema provider. You
have a `secondary` library that exposes a function to create a catalog provider
and the `secondary` library uses the schema provider of the `primary` program.
From the point of view of the `secondary` library, the schema provider is
foreign code.

Now when we register the `secondary` library with the `primary` program as a
catalog provider and we make calls to get a schema, the `secondary` library
will return a FFI wrapped schema provider back to the `primary` program. In
this case that schema provider is actually local code to the `primary` program
except that it is wrapped in the FFI code!

We work around this by the `library_marker_id` calls. What this does is it
creates a global variable within each library and returns a `u64` address
of that library. This is guaranteed to be unique for every library that contains
FFI code. By comparing these `u64` addresses we can determine if a FFI struct
is local or foreign.

In our example of the schema provider, if you were to make a call in your
primary program to get the schema provider, it would reach out to the foreign
catalog provider and send back a `FFI_SchemaProvider` object. By then
comparing the `library_marker_id` of this object to the `primary` program, we
determine it is local code. This means it is safe to access the underlying
private data.

[apache datafusion]: https://datafusion.apache.org/
[api docs]: http://docs.rs/datafusion-ffi/latest
[rust abi]: https://doc.rust-lang.org/reference/abi.html
Expand Down
44 changes: 39 additions & 5 deletions datafusion/ffi/src/catalog_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub struct FFI_CatalogProvider {
/// Internal data. This is only to be accessed by the provider of the plan.
/// A [`ForeignCatalogProvider`] should never attempt to access this data.
pub private_data: *mut c_void,

/// Utility to identify when FFI objects are accessed locally through
/// the foreign interface.
pub library_marker_id: extern "C" fn() -> usize,
}

unsafe impl Send for FFI_CatalogProvider {}
Expand Down Expand Up @@ -116,7 +120,7 @@ unsafe extern "C" fn register_schema_fn_wrapper(
) -> RResult<ROption<FFI_SchemaProvider>, RString> {
let runtime = provider.runtime();
let provider = provider.inner();
let schema = Arc::new(ForeignSchemaProvider::from(schema));
let schema: Arc<dyn SchemaProvider + Send> = schema.into();

let returned_schema =
rresult_return!(provider.register_schema(name.as_str(), schema))
Expand Down Expand Up @@ -169,6 +173,7 @@ unsafe extern "C" fn clone_fn_wrapper(
release: release_fn_wrapper,
version: super::version,
private_data,
library_marker_id: crate::get_library_marker_id,
}
}

Expand All @@ -195,6 +200,7 @@ impl FFI_CatalogProvider {
release: release_fn_wrapper,
version: super::version,
private_data: Box::into_raw(private_data) as *mut c_void,
library_marker_id: crate::get_library_marker_id,
}
}
}
Expand All @@ -209,9 +215,14 @@ pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider);
unsafe impl Send for ForeignCatalogProvider {}
unsafe impl Sync for ForeignCatalogProvider {}

impl From<&FFI_CatalogProvider> for ForeignCatalogProvider {
impl From<&FFI_CatalogProvider> for Arc<dyn CatalogProvider + Send> {
fn from(provider: &FFI_CatalogProvider) -> Self {
Self(provider.clone())
if (provider.library_marker_id)() == crate::get_library_marker_id() {
return Arc::clone(unsafe { provider.inner() });
}

Arc::new(ForeignCatalogProvider(provider.clone()))
as Arc<dyn CatalogProvider + Send>
}
}

Expand Down Expand Up @@ -298,9 +309,10 @@ mod tests {
.unwrap()
.is_none());

let ffi_catalog = FFI_CatalogProvider::new(catalog, None);
let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None);
ffi_catalog.library_marker_id = crate::mock_foreign_marker_id;

let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
let foreign_catalog: Arc<dyn CatalogProvider + Send> = (&ffi_catalog).into();

let prior_schema_names = foreign_catalog.schema_names();
assert_eq!(prior_schema_names.len(), 1);
Expand Down Expand Up @@ -335,4 +347,26 @@ mod tests {
let returned_schema = foreign_catalog.schema("second_schema");
assert!(returned_schema.is_some());
}

#[test]
fn test_ffi_catalog_provider_local_bypass() {
let catalog = Arc::new(MemoryCatalogProvider::new());

let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None);

// Verify local libraries can be downcast to their original
let foreign_catalog: Arc<dyn CatalogProvider + Send> = (&ffi_catalog).into();
assert!(foreign_catalog
.as_any()
.downcast_ref::<MemoryCatalogProvider>()
.is_some());

// Verify different library markers generate foreign providers
ffi_catalog.library_marker_id = crate::mock_foreign_marker_id;
let foreign_catalog: Arc<dyn CatalogProvider + Send> = (&ffi_catalog).into();
assert!(foreign_catalog
.as_any()
.downcast_ref::<ForeignCatalogProvider>()
.is_some());
}
}
47 changes: 42 additions & 5 deletions datafusion/ffi/src/catalog_provider_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub struct FFI_CatalogProviderList {
/// Internal data. This is only to be accessed by the provider of the plan.
/// A [`ForeignCatalogProviderList`] should never attempt to access this data.
pub private_data: *mut c_void,

/// Utility to identify when FFI objects are accessed locally through
/// the foreign interface.
pub library_marker_id: extern "C" fn() -> usize,
}

unsafe impl Send for FFI_CatalogProviderList {}
Expand Down Expand Up @@ -94,7 +98,7 @@ unsafe extern "C" fn register_catalog_fn_wrapper(
) -> ROption<FFI_CatalogProvider> {
let runtime = provider.runtime();
let provider = provider.inner();
let catalog = Arc::new(ForeignCatalogProvider::from(catalog));
let catalog: Arc<dyn CatalogProvider + Send> = catalog.into();

provider
.register_catalog(name.into(), catalog)
Expand Down Expand Up @@ -138,6 +142,7 @@ unsafe extern "C" fn clone_fn_wrapper(
release: release_fn_wrapper,
version: super::version,
private_data,
library_marker_id: crate::get_library_marker_id,
}
}

Expand All @@ -163,6 +168,7 @@ impl FFI_CatalogProviderList {
release: release_fn_wrapper,
version: super::version,
private_data: Box::into_raw(private_data) as *mut c_void,
library_marker_id: crate::get_library_marker_id,
}
}
}
Expand All @@ -177,9 +183,14 @@ pub struct ForeignCatalogProviderList(FFI_CatalogProviderList);
unsafe impl Send for ForeignCatalogProviderList {}
unsafe impl Sync for ForeignCatalogProviderList {}

impl From<&FFI_CatalogProviderList> for ForeignCatalogProviderList {
impl From<&FFI_CatalogProviderList> for Arc<dyn CatalogProviderList + Send> {
fn from(provider: &FFI_CatalogProviderList) -> Self {
Self(provider.clone())
if (provider.library_marker_id)() == crate::get_library_marker_id() {
return Arc::clone(unsafe { provider.inner() });
}

Arc::new(ForeignCatalogProviderList(provider.clone()))
as Arc<dyn CatalogProviderList + Send>
}
}

Expand Down Expand Up @@ -248,9 +259,11 @@ mod tests {
.register_catalog("prior_catalog".to_owned(), prior_catalog)
.is_none());

let ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None);
let mut ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None);
ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id;

let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into();
let foreign_catalog_list: Arc<dyn CatalogProviderList + Send> =
(&ffi_catalog_list).into();

let prior_catalog_names = foreign_catalog_list.catalog_names();
assert_eq!(prior_catalog_names.len(), 1);
Expand Down Expand Up @@ -280,4 +293,28 @@ mod tests {
let returned_catalog = foreign_catalog_list.catalog("second_catalog");
assert!(returned_catalog.is_some());
}

#[test]
fn test_ffi_catalog_provider_list_local_bypass() {
let catalog_list = Arc::new(MemoryCatalogProviderList::new());

let mut ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None);

// Verify local libraries can be downcast to their original
let foreign_catalog_list: Arc<dyn CatalogProviderList + Send> =
(&ffi_catalog_list).into();
assert!(foreign_catalog_list
.as_any()
.downcast_ref::<MemoryCatalogProviderList>()
.is_some());

// Verify different library markers generate foreign providers
ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id;
let foreign_catalog_list: Arc<dyn CatalogProviderList + Send> =
(&ffi_catalog_list).into();
assert!(foreign_catalog_list
.as_any()
.downcast_ref::<ForeignCatalogProviderList>()
.is_some());
}
}
Loading