Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ pub enum DataFusionError {
/// to multiple receivers. For example, when the source of a repartition
/// errors and the error is propagated to multiple consumers.
Shared(Arc<DataFusionError>),
/// An error that originated during a foreign function interface call.
/// Transferring errors across the FFI boundary is difficult, so the original
/// error will be converted to a string.
Ffi(String),
}

#[macro_export]
Expand Down Expand Up @@ -413,6 +417,7 @@ impl Error for DataFusionError {
// can't be executed.
DataFusionError::Collection(errs) => errs.first().map(|e| e as &dyn Error),
DataFusionError::Shared(e) => Some(e.as_ref()),
DataFusionError::Ffi(_) => None,
}
}
}
Expand Down Expand Up @@ -544,6 +549,7 @@ impl DataFusionError {
errs.first().expect("cannot construct DataFusionError::Collection with 0 errors, but got one such case").error_prefix()
}
DataFusionError::Shared(_) => "",
DataFusionError::Ffi(_) => "FFI error: ",
}
}

Expand Down Expand Up @@ -596,6 +602,7 @@ impl DataFusionError {
.expect("cannot construct DataFusionError::Collection with 0 errors")
.message(),
DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::Ffi(ref desc) => Cow::Owned(desc.to_string()),
}
}

Expand Down Expand Up @@ -969,6 +976,9 @@ make_error!(substrait_err, substrait_datafusion_err, Substrait);
// Exposes a macro to create `DataFusionError::ResourcesExhausted` with optional backtrace
make_error!(resources_err, resources_datafusion_err, ResourcesExhausted);

// Exposes a macro to create `DataFusionError::Ffi` with optional backtrace
make_error!(ffi_err, ffi_datafusion_err, Ffi);

// Exposes a macro to create `DataFusionError::SQL` with optional backtrace
#[macro_export]
macro_rules! sql_datafusion_err {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ pub use utils::project_schema;
// https://github.com/rust-lang/rust/pull/52234#issuecomment-976702997
#[doc(hidden)]
pub use error::{
_config_datafusion_err, _exec_datafusion_err, _internal_datafusion_err,
_not_impl_datafusion_err, _plan_datafusion_err, _resources_datafusion_err,
_substrait_datafusion_err,
_config_datafusion_err, _exec_datafusion_err, _ffi_datafusion_err,
_internal_datafusion_err, _not_impl_datafusion_err, _plan_datafusion_err,
_resources_datafusion_err, _substrait_datafusion_err,
};

// The HashMap and HashSet implementations that should be used as the uniform defaults
Expand Down
3 changes: 3 additions & 0 deletions datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ async-ffi = { version = "0.5.0", features = ["abi_stable"] }
async-trait = { workspace = true }
datafusion = { workspace = true, default-features = false }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
Comment on lines +51 to +54
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this and the following PRs I am introducing more of these crates, even when they are re-exported in datafusion core crate so that it will have a smaller PR when we remove the core crate at the end of this work epic.

datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
futures = { workspace = true }
Expand Down
19 changes: 19 additions & 0 deletions datafusion/ffi/src/arrow_wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::{
error::ArrowError,
ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema},
};
use datafusion_common::{DataFusionError, ScalarValue};
use log::error;

/// This is a wrapper struct around FFI_ArrowSchema simply to indicate
Expand Down Expand Up @@ -95,3 +96,21 @@ impl TryFrom<&ArrayRef> for WrappedArray {
Ok(WrappedArray { array, schema })
}
}

impl TryFrom<&ScalarValue> for WrappedArray {
type Error = DataFusionError;

fn try_from(value: &ScalarValue) -> Result<Self, Self::Error> {
let array = value.to_array()?;
WrappedArray::try_from(&array).map_err(Into::into)
}
}

impl TryFrom<WrappedArray> for ScalarValue {
type Error = DataFusionError;

fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
let array: ArrayRef = value.try_into()?;
ScalarValue::try_from_array(array.as_ref(), 0)
}
}
31 changes: 17 additions & 14 deletions datafusion/ffi/src/catalog_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider},
};

use crate::util::FFIResult;
use datafusion::error::Result;

/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
Expand All @@ -43,19 +44,19 @@ pub struct FFI_CatalogProvider {
name: RString,
) -> ROption<FFI_SchemaProvider>,

pub register_schema:
unsafe extern "C" fn(
provider: &Self,
name: RString,
schema: &FFI_SchemaProvider,
) -> RResult<ROption<FFI_SchemaProvider>, RString>,
pub register_schema: unsafe extern "C" fn(
provider: &Self,
name: RString,
schema: &FFI_SchemaProvider,
)
-> FFIResult<ROption<FFI_SchemaProvider>>,

pub deregister_schema:
unsafe extern "C" fn(
provider: &Self,
name: RString,
cascade: bool,
) -> RResult<ROption<FFI_SchemaProvider>, RString>,
pub deregister_schema: unsafe extern "C" fn(
provider: &Self,
name: RString,
cascade: bool,
)
-> FFIResult<ROption<FFI_SchemaProvider>>,

/// Used to create a clone on the provider of the execution plan. This should
/// only need to be called by the receiver of the plan.
Expand Down Expand Up @@ -118,7 +119,7 @@ unsafe extern "C" fn register_schema_fn_wrapper(
provider: &FFI_CatalogProvider,
name: RString,
schema: &FFI_SchemaProvider,
) -> RResult<ROption<FFI_SchemaProvider>, RString> {
) -> FFIResult<ROption<FFI_SchemaProvider>> {
let runtime = provider.runtime();
let provider = provider.inner();
let schema: Arc<dyn SchemaProvider + Send> = schema.into();
Expand All @@ -135,7 +136,7 @@ unsafe extern "C" fn deregister_schema_fn_wrapper(
provider: &FFI_CatalogProvider,
name: RString,
cascade: bool,
) -> RResult<ROption<FFI_SchemaProvider>, RString> {
) -> FFIResult<ROption<FFI_SchemaProvider>> {
let runtime = provider.runtime();
let provider = provider.inner();

Expand All @@ -150,8 +151,10 @@ unsafe extern "C" fn deregister_schema_fn_wrapper(
}

unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) {
debug_assert!(!provider.private_data.is_null());
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
drop(private_data);
provider.private_data = std::ptr::null_mut();
}

unsafe extern "C" fn clone_fn_wrapper(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/ffi/src/catalog_provider_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ unsafe extern "C" fn catalog_fn_wrapper(
}

unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProviderList) {
debug_assert!(!provider.private_data.is_null());
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
drop(private_data);
provider.private_data = std::ptr::null_mut();
}

unsafe extern "C" fn clone_fn_wrapper(
Expand Down
9 changes: 6 additions & 3 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::{ffi::c_void, pin::Pin, sync::Arc};

use abi_stable::{
std_types::{RResult, RString, RVec},
std_types::{RString, RVec},
StableAbi,
};
use datafusion::{
Expand All @@ -29,6 +29,7 @@ use datafusion::{
use datafusion::{error::Result, physical_plan::DisplayFormatType};
use tokio::runtime::Handle;

use crate::util::FFIResult;
use crate::{
df_result, plan_properties::FFI_PlanProperties,
record_batch_stream::FFI_RecordBatchStream, rresult,
Expand All @@ -53,7 +54,7 @@ pub struct FFI_ExecutionPlan {
pub execute: unsafe extern "C" fn(
plan: &Self,
partition: usize,
) -> RResult<FFI_RecordBatchStream, RString>,
) -> FFIResult<FFI_RecordBatchStream>,

/// Used to create a clone on the provider of the execution plan. This should
/// only need to be called by the receiver of the plan.
Expand Down Expand Up @@ -116,7 +117,7 @@ unsafe extern "C" fn children_fn_wrapper(
unsafe extern "C" fn execute_fn_wrapper(
plan: &FFI_ExecutionPlan,
partition: usize,
) -> RResult<FFI_RecordBatchStream, RString> {
) -> FFIResult<FFI_RecordBatchStream> {
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
let ctx = &(*private_data).context;
Expand All @@ -132,8 +133,10 @@ unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
}

unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
debug_assert!(!plan.private_data.is_null());
let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
drop(private_data);
plan.private_data = std::ptr::null_mut();
}

unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
Expand Down
85 changes: 85 additions & 0 deletions datafusion/ffi/src/expr/columnar_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use abi_stable::StableAbi;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::ColumnarValue;

use crate::arrow_wrappers::WrappedArray;

/// A stable struct for sharing [`ColumnarValue`] across FFI boundaries.
/// Scalar values are passed as an Arrow array of length 1.
#[repr(C)]
#[derive(Debug, StableAbi)]
#[allow(non_camel_case_types)]
pub enum FFI_ColumnarValue {
Array(WrappedArray),
Scalar(WrappedArray),
}

impl TryFrom<ColumnarValue> for FFI_ColumnarValue {
type Error = DataFusionError;
fn try_from(value: ColumnarValue) -> Result<Self, Self::Error> {
Ok(match value {
ColumnarValue::Array(v) => {
FFI_ColumnarValue::Array(WrappedArray::try_from(&v)?)
}
ColumnarValue::Scalar(v) => {
FFI_ColumnarValue::Scalar(WrappedArray::try_from(&v)?)
}
})
}
}

impl TryFrom<FFI_ColumnarValue> for ColumnarValue {
type Error = DataFusionError;
fn try_from(value: FFI_ColumnarValue) -> Result<Self, Self::Error> {
Ok(match value {
FFI_ColumnarValue::Array(v) => ColumnarValue::Array(v.try_into()?),
FFI_ColumnarValue::Scalar(v) => {
ColumnarValue::Scalar(ScalarValue::try_from(v)?)
}
})
}
}

#[cfg(test)]
mod tests {
use arrow::array::create_array;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::ColumnarValue;

use crate::expr::columnar_value::FFI_ColumnarValue;

#[test]
fn ffi_columnar_value_round_trip() -> Result<(), DataFusionError> {
let array = create_array!(Int32, [1, 2, 3, 4, 5]);

for original in [
ColumnarValue::Array(array),
ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could also consider a null scalar since that is a bit of an edge case

let original = ScalarValue::Int32(None);

] {
let ffi_variant = FFI_ColumnarValue::try_from(original.clone())?;

let returned_value = ColumnarValue::try_from(ffi_variant)?;

assert_eq!(format!("{returned_value:?}"), format!("{original:?}"));
}

Ok(())
}
}
Loading