Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3e7e4af
Add discussion about library marker ID
timsaucer Nov 12, 2025
115c036
Implement library marker ID for catalog provider
timsaucer Nov 12, 2025
d5fbffb
Implement library marker ID for catalog provider list
timsaucer Nov 12, 2025
d9bac32
Implement library marker ID for schema provider
timsaucer Nov 12, 2025
d1fba46
Implement library marker ID for table provider
timsaucer Nov 12, 2025
0985f32
Implement library marker ID for execution plan
timsaucer Nov 12, 2025
ff4c15f
Implement library marker ID for plan properties
timsaucer Nov 12, 2025
e8ec8f2
Implement library marker ID for table functions
timsaucer Nov 12, 2025
6e06111
Implement library marker ID for scalar functions
timsaucer Nov 12, 2025
fd019a5
Implement library marker ID for accumulator
timsaucer Nov 12, 2025
7c04be3
Implement library marker ID for groups accumulator
timsaucer Nov 12, 2025
ede1450
Implement library marker ID for udaf
timsaucer Nov 12, 2025
86c0a3f
Implement library marker ID for partition evaluator
timsaucer Nov 12, 2025
ee05a86
Implement library marker ID for udwf
timsaucer Nov 12, 2025
cd68231
Add unit tests for library marker
timsaucer Nov 13, 2025
b1f7729
Machete
timsaucer Nov 13, 2025
58ff67b
taplo fmt
timsaucer Nov 13, 2025
9dcabe0
Add upgrade text
timsaucer Nov 13, 2025
bc0968b
Minor update to doc
timsaucer Nov 20, 2025
ef8a1e2
Define TaskContextProvider
timsaucer Nov 13, 2025
50c3bb5
Add TaskContext and TaskContextProvider FFI implementations
timsaucer Nov 13, 2025
bf976bd
Expose task_ctx to rest of ffi crate
timsaucer Nov 13, 2025
dcc4bc5
Add task_ctx_provider to catalog provider and list
timsaucer Nov 13, 2025
632d719
Plumb through task_ctx_provider for schema provider
timsaucer Nov 13, 2025
2200d58
add task_ctx_provider to table_provider and remove sessioncontext whe…
timsaucer Nov 13, 2025
351c7fa
Use TaskContextProvider in FFI_ExecutionPlan
timsaucer Nov 13, 2025
c2a2eb4
Plumb TaskContextProvider through to UDAf and execution plans
timsaucer Nov 13, 2025
2250f78
Pass TaskContextProvider through udaf accumulator args
timsaucer Nov 13, 2025
10b6adf
Plump TaskContextProvider through udwf
timsaucer Nov 13, 2025
6e06b3f
Plumb TaskContextProvider through udwf partition evaluator args
timsaucer Nov 13, 2025
5d83f43
Fix scope of ctx on unit tests
timsaucer Nov 13, 2025
38bb271
Add readme information about the task context provider
timsaucer Nov 13, 2025
25a7651
Add readme information about FFI coverage
timsaucer Nov 13, 2025
6d48b25
formatting
timsaucer Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use abi_stable::{export_root_module, prefix_type::PrefixTypeTrait};
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::{common::record_batch, datasource::MemTable};
use datafusion_ffi::execution::FFI_TaskContextProvider;
use datafusion_ffi::table_provider::FFI_TableProvider;
use ffi_module_interface::{TableProviderModule, TableProviderModuleRef};

Expand All @@ -34,7 +35,9 @@ fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {

/// Here we only wish to create a simple table provider as an example.
/// We create an in-memory table and convert it to it's FFI counterpart.
extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {
extern "C" fn construct_simple_table_provider(
task_ctx_provider: FFI_TaskContextProvider,
) -> FFI_TableProvider {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float64, true),
Expand All @@ -50,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {

let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();

FFI_TableProvider::new(Arc::new(table_provider), true, None)
FFI_TableProvider::new(Arc::new(table_provider), true, None, task_ctx_provider)
}

#[export_root_module]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use abi_stable::{
sabi_types::VersionStrings,
StableAbi,
};
use datafusion_ffi::execution::FFI_TaskContextProvider;
use datafusion_ffi::table_provider::FFI_TableProvider;

#[repr(C)]
Expand All @@ -34,7 +35,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider;
/// how a user may wish to separate these concerns.
pub struct TableProviderModule {
/// Constructs the table provider
pub create_table: extern "C" fn() -> FFI_TableProvider,
pub create_table: extern "C" fn(FFI_TaskContextProvider) -> FFI_TableProvider,
}

impl RootModule for TableProviderModuleRef {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ publish = false
[dependencies]
abi_stable = "0.11.3"
datafusion = { workspace = true }
datafusion-ffi = { workspace = true }
ffi_module_interface = { path = "../ffi_module_interface" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
14 changes: 8 additions & 6 deletions datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use datafusion::{
};

use abi_stable::library::{development_utils::compute_library_path, RootModule};
use datafusion_ffi::table_provider::ForeignTableProvider;
use datafusion::datasource::TableProvider;
use datafusion::execution::TaskContextProvider;
use ffi_module_interface::TableProviderModuleRef;

#[tokio::main]
Expand All @@ -39,23 +40,24 @@ async fn main() -> Result<()> {
TableProviderModuleRef::load_from_directory(&library_path)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let ctx = Arc::new(SessionContext::new());
let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;

// By calling the code below, the table provided will be created within
// the module's code.
let ffi_table_provider =
table_provider_module
.create_table()
.ok_or(DataFusionError::NotImplemented(
"External table provider failed to implement create_table".to_string(),
))?();
))?(task_ctx_provider.into());

// In order to access the table provider within this executable, we need to
// turn it into a `ForeignTableProvider`.
let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into();

let ctx = SessionContext::new();
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_table_provider).into();

// Display the data to show the full cycle works.
ctx.register_table("external_table", Arc::new(foreign_table_provider))?;
ctx.register_table("external_table", foreign_table_provider)?;
let df = ctx.table("external_table").await?;
df.show().await?;

Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,12 @@ impl SessionContext {
}
}

impl datafusion_execution::TaskContextProvider for SessionContext {
fn task_ctx(&self) -> Arc<TaskContext> {
SessionContext::task_ctx(self)
}
}

impl FunctionRegistry for SessionContext {
fn udfs(&self) -> HashSet<String> {
self.state.read().udfs()
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ impl Session for SessionState {
}
}

impl datafusion_execution::TaskContextProvider for SessionState {
fn task_ctx(&self) -> Arc<TaskContext> {
SessionState::task_ctx(self)
}
}

impl SessionState {
pub(crate) fn resolve_table_ref(
&self,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ pub mod registry {
pub use disk_manager::DiskManager;
pub use registry::FunctionRegistry;
pub use stream::{RecordBatchStream, SendableRecordBatchStream};
pub use task::TaskContext;
pub use task::{TaskContext, TaskContextProvider};
7 changes: 6 additions & 1 deletion datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{collections::HashMap, sync::Arc};
/// information.
///
/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TaskContext {
/// Session Id
session_id: String,
Expand Down Expand Up @@ -211,6 +211,11 @@ impl FunctionRegistry for TaskContext {
}
}

/// Produce the [`TaskContext`].
pub trait TaskContextProvider {
fn task_ctx(&self) -> Arc<TaskContext>;
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 3 additions & 0 deletions datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ async-ffi = { version = "0.5.0", features = ["abi_stable"] }
async-trait = { workspace = true }
datafusion = { workspace = true, default-features = false }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
Expand All @@ -58,6 +60,7 @@ semver = "1.0.27"
tokio = { workspace = true }

[dev-dependencies]
datafusion = { workspace = true, default-features = false, features = ["sql"] }
doc-comment = { workspace = true }

[features]
Expand Down
96 changes: 96 additions & 0 deletions datafusion/ffi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,101 @@ In this crate we have a variety of structs which closely mimic the behavior of
their internal counterparts. To see detailed notes about how to use them, see
the example in `FFI_TableProvider`.

## Task Context Provider

Many of the FFI structs in this crate contain a `FFI_TaskContextProvider`. The
purpose of this struct is to _weakly_ hold a reference to a method to
access the current `TaskContext`. The reason we need this accessor is because
we use the `datafusion-proto` crate to serialize and deserialize data across
the FFI boundary. In particular, we need to serialize and deserialize
functions using a `TaskContext`.

This becomes difficult because we may need to register multiple user defined
functions, table or catalog providers, etc with a `Session`, and each of these
will need the `TaskContext` to perform the processing. For this reason we
cannot simply include the `TaskContext` at the time of registration because
it would not have knowledge of anything registered afterward.

The `FFI_TaskContextProvider` is built from a trait that provides a method
to get the current `TaskContext`. `FFI_TaskContextProvider` only holds a
`Weak` reference to the `TaskContextProvider`, because otherwise we could
create a circular dependency at runtime. It is imperative that if you use
these methods that your provider remains valid for the lifetime of the
calls. The `FFI_TaskContextProvider` is implemented on `SessionContext`
and it is easy to implement on an struct that implements `Session`.

## Library Marker ID

When reviewing the code, many of the structs in this crate contain a call to
a `library_maker_id`. The purpose of this call is to determine if a library is
accessing _local_ code through the FFI structs. Consider this example: you have
a `primary` program that exposes functions to create a schema provider. You
have a `secondary` library that exposes a function to create a catalog provider
and the `secondary` library uses the schema provider of the `primary` program.
From the point of view of the `secondary` library, the schema provider is
foreign code.

Now when we register the `secondary` library with the `primary` program as a
catalog provider and we make calls to get a schema, the `secondary` library
will return a FFI wrapped schema provider back to the `primary` program. In
this case that schema provider is actually local code to the `primary` program
except that it is wrapped in the FFI code!

We work around this by the `library_marker_id` calls. What this does is it
creates a global variable within each library and returns a `u64` address
of that library. This is guaranteed to be unique for every library that contains
FFI code. By comparing these `u64` addresses we can determine if a FFI struct
is local or foreign.

In our example of the schema provider, if you were to make a call in your
primary program to get the schema provider, it would reach out to the foreign
catalog provider and send back a `FFI_SchemaProvider` object. By then
comparing the `library_marker_id` of this object to the `primary` program, we
determine it is local code. This means it is safe to access the underlying
private data.

## Testing Coverage

Since this library contains a large amount of `unsafe` code, it is important
to ensure proper test coverage. To generate a coverage report, you can use
[tarpaulin] as follows. It is necessary to use the `integration-tests` feature
in order to properly generate coverage.

```shell
cargo tarpaulin --package datafusion-ffi --tests --features integration-tests --out Html
```

While it is not normally required to check Rust code for memory leaks, this
crate does manual memory management due to the FFI boundary. You can test for
leaks using the generated unit tests. How you run these checks differs depending
on your OS.

### Linux

On Linux, you can install `cargo-valgrind`

```shell
cargo valgrind test --features integration-tests -p datafusion-ffi
```

### MacOS

You can find the generated binaries for your unit tests by running `cargo test`.

```shell
cargo test --features integration-tests -p datafusion-ffi --no-run
```

This should generate output that shows the path to the test binaries. Then
you can run commands such as the following. The specific paths of the tests
will vary.

```shell
leaks --atExit -- target/debug/deps/datafusion_ffi-e77a2604a85a8afe
leaks --atExit -- target/debug/deps/ffi_integration-e91b7127a59b71a7
# ...
```

[apache datafusion]: https://datafusion.apache.org/
[api docs]: http://docs.rs/datafusion-ffi/latest
[rust abi]: https://doc.rust-lang.org/reference/abi.html
Expand All @@ -110,3 +205,4 @@ the example in `FFI_TableProvider`.
[bindgen]: https://crates.io/crates/bindgen
[`datafusion-python`]: https://datafusion.apache.org/python/
[datafusion-contrib]: https://github.com/datafusion-contrib
[tarpaulin]: https://crates.io/crates/cargo-tarpaulin
Loading
Loading