diff --git a/Cargo.lock b/Cargo.lock index 4a896ff25d2e..80bfc4edf793 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2275,7 +2275,10 @@ dependencies = [ "async-trait", "datafusion", "datafusion-common", + "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-proto", "datafusion-proto-common", "doc-comment", diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 6a5cb31fe3fb..01894247e27f 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -171,6 +171,10 @@ pub enum DataFusionError { /// to multiple receivers. For example, when the source of a repartition /// errors and the error is propagated to multiple consumers. Shared(Arc), + /// An error that originated during a foreign function interface call. + /// Transferring errors across the FFI boundary is difficult, so the original + /// error will be converted to a string. + Ffi(String), } #[macro_export] @@ -413,6 +417,7 @@ impl Error for DataFusionError { // can't be executed. DataFusionError::Collection(errs) => errs.first().map(|e| e as &dyn Error), DataFusionError::Shared(e) => Some(e.as_ref()), + DataFusionError::Ffi(_) => None, } } } @@ -544,6 +549,7 @@ impl DataFusionError { errs.first().expect("cannot construct DataFusionError::Collection with 0 errors, but got one such case").error_prefix() } DataFusionError::Shared(_) => "", + DataFusionError::Ffi(_) => "FFI error: ", } } @@ -596,6 +602,7 @@ impl DataFusionError { .expect("cannot construct DataFusionError::Collection with 0 errors") .message(), DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()), + DataFusionError::Ffi(ref desc) => Cow::Owned(desc.to_string()), } } @@ -969,6 +976,9 @@ make_error!(substrait_err, substrait_datafusion_err, Substrait); // Exposes a macro to create `DataFusionError::ResourcesExhausted` with optional backtrace make_error!(resources_err, resources_datafusion_err, ResourcesExhausted); +// Exposes a macro to create `DataFusionError::Ffi` with optional backtrace +make_error!(ffi_err, ffi_datafusion_err, Ffi); + // Exposes a macro to create `DataFusionError::SQL` with optional backtrace #[macro_export] macro_rules! sql_datafusion_err { diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 15538e68d980..f67eeac17177 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -103,9 +103,9 @@ pub use utils::project_schema; // https://github.com/rust-lang/rust/pull/52234#issuecomment-976702997 #[doc(hidden)] pub use error::{ - _config_datafusion_err, _exec_datafusion_err, _internal_datafusion_err, - _not_impl_datafusion_err, _plan_datafusion_err, _resources_datafusion_err, - _substrait_datafusion_err, + _config_datafusion_err, _exec_datafusion_err, _ffi_datafusion_err, + _internal_datafusion_err, _not_impl_datafusion_err, _plan_datafusion_err, + _resources_datafusion_err, _substrait_datafusion_err, }; // The HashMap and HashSet implementations that should be used as the uniform defaults diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index b797804731f4..126b59a16880 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -48,7 +48,10 @@ async-ffi = { version = "0.5.0", features = ["abi_stable"] } async-trait = { workspace = true } datafusion = { workspace = true, default-features = false } datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } datafusion-proto = { workspace = true } datafusion-proto-common = { workspace = true } futures = { workspace = true } diff --git a/datafusion/ffi/src/arrow_wrappers.rs b/datafusion/ffi/src/arrow_wrappers.rs index c60a055bafec..df3f1e3b4c49 100644 --- a/datafusion/ffi/src/arrow_wrappers.rs +++ b/datafusion/ffi/src/arrow_wrappers.rs @@ -24,6 +24,7 @@ use arrow::{ error::ArrowError, ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema}, }; +use datafusion_common::{DataFusionError, ScalarValue}; use log::error; /// This is a wrapper struct around FFI_ArrowSchema simply to indicate @@ -95,3 +96,21 @@ impl TryFrom<&ArrayRef> for WrappedArray { Ok(WrappedArray { array, schema }) } } + +impl TryFrom<&ScalarValue> for WrappedArray { + type Error = DataFusionError; + + fn try_from(value: &ScalarValue) -> Result { + let array = value.to_array()?; + WrappedArray::try_from(&array).map_err(Into::into) + } +} + +impl TryFrom for ScalarValue { + type Error = DataFusionError; + + fn try_from(value: WrappedArray) -> Result { + let array: ArrayRef = value.try_into()?; + ScalarValue::try_from_array(array.as_ref(), 0) + } +} diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index 00e8dc315811..8c3e4cbc15a1 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -29,6 +29,7 @@ use crate::{ schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider}, }; +use crate::util::FFIResult; use datafusion::error::Result; /// A stable struct for sharing [`CatalogProvider`] across FFI boundaries. @@ -43,19 +44,19 @@ pub struct FFI_CatalogProvider { name: RString, ) -> ROption, - pub register_schema: - unsafe extern "C" fn( - provider: &Self, - name: RString, - schema: &FFI_SchemaProvider, - ) -> RResult, RString>, + pub register_schema: unsafe extern "C" fn( + provider: &Self, + name: RString, + schema: &FFI_SchemaProvider, + ) + -> FFIResult>, - pub deregister_schema: - unsafe extern "C" fn( - provider: &Self, - name: RString, - cascade: bool, - ) -> RResult, RString>, + pub deregister_schema: unsafe extern "C" fn( + provider: &Self, + name: RString, + cascade: bool, + ) + -> FFIResult>, /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. @@ -118,7 +119,7 @@ unsafe extern "C" fn register_schema_fn_wrapper( provider: &FFI_CatalogProvider, name: RString, schema: &FFI_SchemaProvider, -) -> RResult, RString> { +) -> FFIResult> { let runtime = provider.runtime(); let provider = provider.inner(); let schema: Arc = schema.into(); @@ -135,7 +136,7 @@ unsafe extern "C" fn deregister_schema_fn_wrapper( provider: &FFI_CatalogProvider, name: RString, cascade: bool, -) -> RResult, RString> { +) -> FFIResult> { let runtime = provider.runtime(); let provider = provider.inner(); @@ -150,8 +151,10 @@ unsafe extern "C" fn deregister_schema_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) { + debug_assert!(!provider.private_data.is_null()); let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); drop(private_data); + provider.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper( diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index 429897269470..102e9e4ef56d 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -120,8 +120,10 @@ unsafe extern "C" fn catalog_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProviderList) { + debug_assert!(!provider.private_data.is_null()); let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); drop(private_data); + provider.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper( diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index d76dcd8dd0c9..023869a6c494 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -18,7 +18,7 @@ use std::{ffi::c_void, pin::Pin, sync::Arc}; use abi_stable::{ - std_types::{RResult, RString, RVec}, + std_types::{RString, RVec}, StableAbi, }; use datafusion::{ @@ -29,6 +29,7 @@ use datafusion::{ use datafusion::{error::Result, physical_plan::DisplayFormatType}; use tokio::runtime::Handle; +use crate::util::FFIResult; use crate::{ df_result, plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream, rresult, @@ -53,7 +54,7 @@ pub struct FFI_ExecutionPlan { pub execute: unsafe extern "C" fn( plan: &Self, partition: usize, - ) -> RResult, + ) -> FFIResult, /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. @@ -116,7 +117,7 @@ unsafe extern "C" fn children_fn_wrapper( unsafe extern "C" fn execute_fn_wrapper( plan: &FFI_ExecutionPlan, partition: usize, -) -> RResult { +) -> FFIResult { let private_data = plan.private_data as *const ExecutionPlanPrivateData; let plan = &(*private_data).plan; let ctx = &(*private_data).context; @@ -132,8 +133,10 @@ unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString { } unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { + debug_assert!(!plan.private_data.is_null()); let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData); drop(private_data); + plan.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan { diff --git a/datafusion/ffi/src/expr/columnar_value.rs b/datafusion/ffi/src/expr/columnar_value.rs new file mode 100644 index 000000000000..13f745f6de8d --- /dev/null +++ b/datafusion/ffi/src/expr/columnar_value.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use abi_stable::StableAbi; +use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_expr::ColumnarValue; + +use crate::arrow_wrappers::WrappedArray; + +/// A stable struct for sharing [`ColumnarValue`] across FFI boundaries. +/// Scalar values are passed as an Arrow array of length 1. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub enum FFI_ColumnarValue { + Array(WrappedArray), + Scalar(WrappedArray), +} + +impl TryFrom for FFI_ColumnarValue { + type Error = DataFusionError; + fn try_from(value: ColumnarValue) -> Result { + Ok(match value { + ColumnarValue::Array(v) => { + FFI_ColumnarValue::Array(WrappedArray::try_from(&v)?) + } + ColumnarValue::Scalar(v) => { + FFI_ColumnarValue::Scalar(WrappedArray::try_from(&v)?) + } + }) + } +} + +impl TryFrom for ColumnarValue { + type Error = DataFusionError; + fn try_from(value: FFI_ColumnarValue) -> Result { + Ok(match value { + FFI_ColumnarValue::Array(v) => ColumnarValue::Array(v.try_into()?), + FFI_ColumnarValue::Scalar(v) => { + ColumnarValue::Scalar(ScalarValue::try_from(v)?) + } + }) + } +} + +#[cfg(test)] +mod tests { + use arrow::array::create_array; + use datafusion_common::{DataFusionError, ScalarValue}; + use datafusion_expr::ColumnarValue; + + use crate::expr::columnar_value::FFI_ColumnarValue; + + #[test] + fn ffi_columnar_value_round_trip() -> Result<(), DataFusionError> { + let array = create_array!(Int32, [1, 2, 3, 4, 5]); + + for original in [ + ColumnarValue::Array(array), + ColumnarValue::Scalar(ScalarValue::Int32(Some(1))), + ] { + let ffi_variant = FFI_ColumnarValue::try_from(original.clone())?; + + let returned_value = ColumnarValue::try_from(ffi_variant)?; + + assert_eq!(format!("{returned_value:?}"), format!("{original:?}")); + } + + Ok(()) + } +} diff --git a/datafusion/ffi/src/expr/distribution.rs b/datafusion/ffi/src/expr/distribution.rs new file mode 100644 index 000000000000..c35bb4c0df18 --- /dev/null +++ b/datafusion/ffi/src/expr/distribution.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow_wrappers::WrappedArray; +use crate::expr::interval::FFI_Interval; +use abi_stable::StableAbi; +use datafusion_common::DataFusionError; +use datafusion_expr::statistics::{ + BernoulliDistribution, Distribution, ExponentialDistribution, GaussianDistribution, + GenericDistribution, UniformDistribution, +}; + +/// A stable struct for sharing [`Distribution`] across FFI boundaries. +/// See ['Distribution'] for the meaning of each variant. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +#[expect(clippy::large_enum_variant)] +pub enum FFI_Distribution { + Uniform(FFI_UniformDistribution), + Exponential(FFI_ExponentialDistribution), + Gaussian(FFI_GaussianDistribution), + Bernoulli(FFI_BernoulliDistribution), + Generic(FFI_GenericDistribution), +} + +impl TryFrom<&Distribution> for FFI_Distribution { + type Error = DataFusionError; + fn try_from(value: &Distribution) -> Result { + match value { + Distribution::Uniform(d) => Ok(FFI_Distribution::Uniform(d.try_into()?)), + Distribution::Exponential(d) => { + Ok(FFI_Distribution::Exponential(d.try_into()?)) + } + Distribution::Gaussian(d) => Ok(FFI_Distribution::Gaussian(d.try_into()?)), + Distribution::Bernoulli(d) => Ok(FFI_Distribution::Bernoulli(d.try_into()?)), + Distribution::Generic(d) => Ok(FFI_Distribution::Generic(d.try_into()?)), + } + } +} + +impl TryFrom for Distribution { + type Error = DataFusionError; + fn try_from(value: FFI_Distribution) -> Result { + match value { + FFI_Distribution::Uniform(d) => d.try_into(), + FFI_Distribution::Exponential(d) => d.try_into(), + FFI_Distribution::Gaussian(d) => d.try_into(), + FFI_Distribution::Bernoulli(d) => d.try_into(), + FFI_Distribution::Generic(d) => d.try_into(), + } + } +} + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_UniformDistribution { + interval: FFI_Interval, +} + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_ExponentialDistribution { + rate: WrappedArray, + offset: WrappedArray, + positive_tail: bool, +} + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_GaussianDistribution { + mean: WrappedArray, + variance: WrappedArray, +} + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_BernoulliDistribution { + p: WrappedArray, +} + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_GenericDistribution { + mean: WrappedArray, + median: WrappedArray, + variance: WrappedArray, + range: FFI_Interval, +} + +impl TryFrom<&UniformDistribution> for FFI_UniformDistribution { + type Error = DataFusionError; + fn try_from(value: &UniformDistribution) -> Result { + Ok(Self { + interval: value.range().try_into()?, + }) + } +} + +impl TryFrom<&ExponentialDistribution> for FFI_ExponentialDistribution { + type Error = DataFusionError; + fn try_from(value: &ExponentialDistribution) -> Result { + let rate = value.rate().try_into()?; + let offset = value.offset().try_into()?; + + Ok(Self { + rate, + offset, + positive_tail: value.positive_tail(), + }) + } +} + +impl TryFrom<&GaussianDistribution> for FFI_GaussianDistribution { + type Error = DataFusionError; + fn try_from(value: &GaussianDistribution) -> Result { + let mean = value.mean().try_into()?; + let variance = value.variance().try_into()?; + + Ok(Self { mean, variance }) + } +} + +impl TryFrom<&BernoulliDistribution> for FFI_BernoulliDistribution { + type Error = DataFusionError; + fn try_from(value: &BernoulliDistribution) -> Result { + let p = value.p_value().try_into()?; + + Ok(Self { p }) + } +} + +impl TryFrom<&GenericDistribution> for FFI_GenericDistribution { + type Error = DataFusionError; + fn try_from(value: &GenericDistribution) -> Result { + let mean = value.mean().try_into()?; + let median = value.median().try_into()?; + let variance = value.variance().try_into()?; + + Ok(Self { + mean, + median, + variance, + range: value.range().try_into()?, + }) + } +} + +impl TryFrom for Distribution { + type Error = DataFusionError; + fn try_from(value: FFI_UniformDistribution) -> Result { + let interval = value.interval.try_into()?; + Distribution::new_uniform(interval) + } +} + +impl TryFrom for Distribution { + type Error = DataFusionError; + fn try_from(value: FFI_ExponentialDistribution) -> Result { + let rate = value.rate.try_into()?; + let offset = value.offset.try_into()?; + + Distribution::new_exponential(rate, offset, value.positive_tail) + } +} + +impl TryFrom for Distribution { + type Error = DataFusionError; + fn try_from(value: FFI_GaussianDistribution) -> Result { + let mean = value.mean.try_into()?; + let variance = value.variance.try_into()?; + + Distribution::new_gaussian(mean, variance) + } +} + +impl TryFrom for Distribution { + type Error = DataFusionError; + fn try_from(value: FFI_BernoulliDistribution) -> Result { + let p = value.p.try_into()?; + + Distribution::new_bernoulli(p) + } +} + +impl TryFrom for Distribution { + type Error = DataFusionError; + fn try_from(value: FFI_GenericDistribution) -> Result { + let mean = value.mean.try_into()?; + let median = value.median.try_into()?; + let variance = value.variance.try_into()?; + let range = value.range.try_into()?; + + Distribution::new_generic(mean, median, variance, range) + } +} diff --git a/datafusion/ffi/src/expr/expr_properties.rs b/datafusion/ffi/src/expr/expr_properties.rs new file mode 100644 index 000000000000..a31bd34faa1c --- /dev/null +++ b/datafusion/ffi/src/expr/expr_properties.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use abi_stable::StableAbi; +use arrow_schema::SortOptions; +use datafusion_common::DataFusionError; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; + +use crate::expr::interval::FFI_Interval; + +/// A stable struct for sharing [`ExprProperties`] across FFI boundaries. +/// See [`ExprProperties`] for the meaning of each field. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_ExprProperties { + sort_properties: FFI_SortProperties, + range: FFI_Interval, + preserves_lex_ordering: bool, +} + +impl TryFrom<&ExprProperties> for FFI_ExprProperties { + type Error = DataFusionError; + fn try_from(value: &ExprProperties) -> Result { + let sort_properties = (&value.sort_properties).into(); + let range = value.range.clone().try_into()?; + + Ok(FFI_ExprProperties { + sort_properties, + range, + preserves_lex_ordering: value.preserves_lex_ordering, + }) + } +} + +impl TryFrom for ExprProperties { + type Error = DataFusionError; + fn try_from(value: FFI_ExprProperties) -> Result { + let sort_properties = (&value.sort_properties).into(); + let range = value.range.try_into()?; + Ok(ExprProperties { + sort_properties, + range, + preserves_lex_ordering: value.preserves_lex_ordering, + }) + } +} + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub enum FFI_SortProperties { + Ordered(FFI_SortOptions), + Unordered, + Singleton, +} + +impl From<&SortProperties> for FFI_SortProperties { + fn from(value: &SortProperties) -> Self { + match value { + SortProperties::Unordered => FFI_SortProperties::Unordered, + SortProperties::Singleton => FFI_SortProperties::Singleton, + SortProperties::Ordered(o) => FFI_SortProperties::Ordered(o.into()), + } + } +} + +impl From<&FFI_SortProperties> for SortProperties { + fn from(value: &FFI_SortProperties) -> Self { + match value { + FFI_SortProperties::Unordered => SortProperties::Unordered, + FFI_SortProperties::Singleton => SortProperties::Singleton, + FFI_SortProperties::Ordered(o) => SortProperties::Ordered(o.into()), + } + } +} + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_SortOptions { + pub descending: bool, + pub nulls_first: bool, +} + +impl From<&SortOptions> for FFI_SortOptions { + fn from(value: &SortOptions) -> Self { + Self { + descending: value.descending, + nulls_first: value.nulls_first, + } + } +} + +impl From<&FFI_SortOptions> for SortOptions { + fn from(value: &FFI_SortOptions) -> Self { + Self { + descending: value.descending, + nulls_first: value.nulls_first, + } + } +} diff --git a/datafusion/ffi/src/expr/interval.rs b/datafusion/ffi/src/expr/interval.rs new file mode 100644 index 000000000000..d636ded6ebdb --- /dev/null +++ b/datafusion/ffi/src/expr/interval.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow_wrappers::WrappedArray; +use abi_stable::StableAbi; +use datafusion_common::DataFusionError; +use datafusion_expr::interval_arithmetic::Interval; + +/// A stable struct for sharing [`Interval`] across FFI boundaries. +/// See [`Interval`] for the meaning of each field. Scalar values +/// are passed as Arrow arrays of length 1. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_Interval { + lower: WrappedArray, + upper: WrappedArray, +} + +impl TryFrom<&Interval> for FFI_Interval { + type Error = DataFusionError; + fn try_from(value: &Interval) -> Result { + let upper = value.upper().try_into()?; + let lower = value.lower().try_into()?; + + Ok(FFI_Interval { upper, lower }) + } +} +impl TryFrom for FFI_Interval { + type Error = DataFusionError; + fn try_from(value: Interval) -> Result { + FFI_Interval::try_from(&value) + } +} + +impl TryFrom for Interval { + type Error = DataFusionError; + fn try_from(value: FFI_Interval) -> Result { + let upper = value.upper.try_into()?; + let lower = value.lower.try_into()?; + + Interval::try_new(lower, upper) + } +} diff --git a/datafusion/ffi/src/expr/mod.rs b/datafusion/ffi/src/expr/mod.rs new file mode 100644 index 000000000000..e11d52a2a1e5 --- /dev/null +++ b/datafusion/ffi/src/expr/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod columnar_value; +pub mod distribution; +pub mod expr_properties; +pub mod interval; diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 7fb46a482080..dc68a742d768 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -31,7 +31,9 @@ pub mod arrow_wrappers; pub mod catalog_provider; pub mod catalog_provider_list; pub mod execution_plan; +pub mod expr; pub mod insert_op; +pub mod physical_expr; pub mod plan_properties; pub mod record_batch_stream; pub mod schema_provider; diff --git a/datafusion/ffi/src/physical_expr/mod.rs b/datafusion/ffi/src/physical_expr/mod.rs new file mode 100644 index 000000000000..720d83ce9a64 --- /dev/null +++ b/datafusion/ffi/src/physical_expr/mod.rs @@ -0,0 +1,962 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod partitioning; +pub(crate) mod sort; + +use std::{ + any::Any, + ffi::c_void, + fmt::{Display, Formatter}, + hash::{DefaultHasher, Hash, Hasher}, + sync::Arc, +}; + +use abi_stable::{ + std_types::{ROption, RResult, RString, RVec}, + StableAbi, +}; +use arrow::{ + array::{ArrayRef, BooleanArray, RecordBatch}, + datatypes::SchemaRef, +}; +use arrow_schema::{ffi::FFI_ArrowSchema, DataType, Field, FieldRef, Schema}; +use datafusion_common::{ffi_datafusion_err, Result}; +use datafusion_expr::{ + interval_arithmetic::Interval, sort_properties::ExprProperties, + statistics::Distribution, ColumnarValue, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::physical_expr::fmt_sql; + +use crate::{ + arrow_wrappers::{WrappedArray, WrappedSchema}, + df_result, + expr::{ + columnar_value::FFI_ColumnarValue, distribution::FFI_Distribution, + expr_properties::FFI_ExprProperties, interval::FFI_Interval, + }, + record_batch_stream::{record_batch_to_wrapped_array, wrapped_array_to_record_batch}, + rresult, rresult_return, + util::FFIResult, +}; + +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_PhysicalExpr { + pub data_type: unsafe extern "C" fn( + &Self, + input_schema: WrappedSchema, + ) -> FFIResult, + + pub nullable: + unsafe extern "C" fn(&Self, input_schema: WrappedSchema) -> FFIResult, + + pub evaluate: + unsafe extern "C" fn(&Self, batch: WrappedArray) -> FFIResult, + + pub return_field: unsafe extern "C" fn( + &Self, + input_schema: WrappedSchema, + ) -> FFIResult, + + pub evaluate_selection: unsafe extern "C" fn( + &Self, + batch: WrappedArray, + selection: WrappedArray, + ) -> FFIResult, + + pub children: unsafe extern "C" fn(&Self) -> RVec, + + pub new_with_children: + unsafe extern "C" fn(&Self, children: &RVec) -> FFIResult, + + pub evaluate_bounds: unsafe extern "C" fn( + &Self, + children: RVec, + ) -> FFIResult, + + pub propagate_constraints: + unsafe extern "C" fn( + &Self, + interval: FFI_Interval, + children: RVec, + ) -> FFIResult>>, + + pub evaluate_statistics: unsafe extern "C" fn( + &Self, + children: RVec, + ) -> FFIResult, + + pub propagate_statistics: + unsafe extern "C" fn( + &Self, + parent: FFI_Distribution, + children: RVec, + ) -> FFIResult>>, + + pub get_properties: unsafe extern "C" fn( + &Self, + children: RVec, + ) -> FFIResult, + + pub fmt_sql: unsafe extern "C" fn(&Self) -> FFIResult, + + pub snapshot: unsafe extern "C" fn(&Self) -> FFIResult>, + + pub snapshot_generation: unsafe extern "C" fn(&Self) -> u64, + + pub is_volatile_node: unsafe extern "C" fn(&Self) -> bool, + + // Display trait + pub display: unsafe extern "C" fn(&Self) -> RString, + + // Hash trait + pub hash: unsafe extern "C" fn(&Self) -> u64, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Return the major DataFusion version number of this provider. + pub version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignPhysicalExpr`] should never attempt to access this data. + pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> usize, +} + +unsafe impl Send for FFI_PhysicalExpr {} +unsafe impl Sync for FFI_PhysicalExpr {} + +impl FFI_PhysicalExpr { + fn inner(&self) -> &Arc { + unsafe { + let private_data = self.private_data as *const PhysicalExprPrivateData; + &(*private_data).expr + } + } +} + +struct PhysicalExprPrivateData { + expr: Arc, +} + +unsafe extern "C" fn data_type_fn_wrapper( + expr: &FFI_PhysicalExpr, + input_schema: WrappedSchema, +) -> FFIResult { + let expr = expr.inner(); + let schema: SchemaRef = input_schema.into(); + let data_type = expr + .data_type(&schema) + .and_then(|dt| FFI_ArrowSchema::try_from(dt).map_err(Into::into)) + .map(WrappedSchema); + rresult!(data_type) +} + +unsafe extern "C" fn nullable_fn_wrapper( + expr: &FFI_PhysicalExpr, + input_schema: WrappedSchema, +) -> FFIResult { + let expr = expr.inner(); + let schema: SchemaRef = input_schema.into(); + rresult!(expr.nullable(&schema)) +} + +unsafe extern "C" fn evaluate_fn_wrapper( + expr: &FFI_PhysicalExpr, + batch: WrappedArray, +) -> FFIResult { + let batch = rresult_return!(wrapped_array_to_record_batch(batch)); + rresult!(expr + .inner() + .evaluate(&batch) + .and_then(FFI_ColumnarValue::try_from)) +} + +unsafe extern "C" fn return_field_fn_wrapper( + expr: &FFI_PhysicalExpr, + input_schema: WrappedSchema, +) -> FFIResult { + let expr = expr.inner(); + let schema: SchemaRef = input_schema.into(); + rresult!(expr + .return_field(&schema) + .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(Into::into)) + .map(WrappedSchema)) +} + +unsafe extern "C" fn evaluate_selection_fn_wrapper( + expr: &FFI_PhysicalExpr, + batch: WrappedArray, + selection: WrappedArray, +) -> FFIResult { + let batch = rresult_return!(wrapped_array_to_record_batch(batch)); + let selection: ArrayRef = rresult_return!(selection.try_into()); + let selection = rresult_return!(selection + .as_any() + .downcast_ref::() + .ok_or(ffi_datafusion_err!("Unexpected selection array type"))); + rresult!(expr + .inner() + .evaluate_selection(&batch, selection) + .and_then(FFI_ColumnarValue::try_from)) +} + +unsafe extern "C" fn children_fn_wrapper( + expr: &FFI_PhysicalExpr, +) -> RVec { + let expr = expr.inner(); + let children = expr.children(); + children + .into_iter() + .map(|child| FFI_PhysicalExpr::from(Arc::clone(child))) + .collect() +} + +unsafe extern "C" fn new_with_children_fn_wrapper( + expr: &FFI_PhysicalExpr, + children: &RVec, +) -> FFIResult { + let expr = Arc::clone(expr.inner()); + let children = children.iter().map(Into::into).collect::>(); + rresult!(expr.with_new_children(children).map(FFI_PhysicalExpr::from)) +} + +unsafe extern "C" fn evaluate_bounds_fn_wrapper( + expr: &FFI_PhysicalExpr, + children: RVec, +) -> FFIResult { + let expr = expr.inner(); + let children = rresult_return!(children + .into_iter() + .map(Interval::try_from) + .collect::>>()); + let children_borrowed = children.iter().collect::>(); + + rresult!(expr + .evaluate_bounds(&children_borrowed) + .and_then(FFI_Interval::try_from)) +} + +unsafe extern "C" fn propagate_constraints_fn_wrapper( + expr: &FFI_PhysicalExpr, + interval: FFI_Interval, + children: RVec, +) -> FFIResult>> { + let expr = expr.inner(); + let interval = rresult_return!(Interval::try_from(interval)); + let children = rresult_return!(children + .into_iter() + .map(Interval::try_from) + .collect::>>()); + let children_borrowed = children.iter().collect::>(); + + let result = + rresult_return!(expr.propagate_constraints(&interval, &children_borrowed)); + + let result = rresult_return!(result + .map(|intervals| intervals + .into_iter() + .map(FFI_Interval::try_from) + .collect::>>()) + .transpose()); + + RResult::ROk(result.into()) +} + +unsafe extern "C" fn evaluate_statistics_fn_wrapper( + expr: &FFI_PhysicalExpr, + children: RVec, +) -> FFIResult { + let expr = expr.inner(); + let children = rresult_return!(children + .into_iter() + .map(Distribution::try_from) + .collect::>>()); + let children_borrowed = children.iter().collect::>(); + rresult!(expr + .evaluate_statistics(&children_borrowed) + .and_then(|dist| FFI_Distribution::try_from(&dist))) +} + +unsafe extern "C" fn propagate_statistics_fn_wrapper( + expr: &FFI_PhysicalExpr, + parent: FFI_Distribution, + children: RVec, +) -> FFIResult>> { + let expr = expr.inner(); + let parent = rresult_return!(Distribution::try_from(parent)); + let children = rresult_return!(children + .into_iter() + .map(Distribution::try_from) + .collect::>>()); + let children_borrowed = children.iter().collect::>(); + + let result = rresult_return!(expr.propagate_statistics(&parent, &children_borrowed)); + let result = rresult_return!(result + .map(|dists| dists + .iter() + .map(FFI_Distribution::try_from) + .collect::>>()) + .transpose()); + + RResult::ROk(result.into()) +} + +unsafe extern "C" fn get_properties_fn_wrapper( + expr: &FFI_PhysicalExpr, + children: RVec, +) -> FFIResult { + let expr = expr.inner(); + let children = rresult_return!(children + .into_iter() + .map(ExprProperties::try_from) + .collect::>>()); + rresult!(expr + .get_properties(&children) + .and_then(|p| FFI_ExprProperties::try_from(&p))) +} + +unsafe extern "C" fn fmt_sql_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFIResult { + let expr = expr.inner(); + let result = fmt_sql(expr.as_ref()).to_string(); + RResult::ROk(result.into()) +} + +unsafe extern "C" fn snapshot_fn_wrapper( + expr: &FFI_PhysicalExpr, +) -> FFIResult> { + let expr = expr.inner(); + rresult!(expr + .snapshot() + .map(|snapshot| snapshot.map(FFI_PhysicalExpr::from).into())) +} + +unsafe extern "C" fn snapshot_generation_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 { + let expr = expr.inner(); + expr.snapshot_generation() +} + +unsafe extern "C" fn is_volatile_node_fn_wrapper(expr: &FFI_PhysicalExpr) -> bool { + let expr = expr.inner(); + expr.is_volatile_node() +} +unsafe extern "C" fn display_fn_wrapper(expr: &FFI_PhysicalExpr) -> RString { + let expr = expr.inner(); + format!("{expr}").into() +} + +unsafe extern "C" fn hash_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 { + let expr = expr.inner(); + let mut hasher = DefaultHasher::new(); + expr.hash(&mut hasher); + hasher.finish() +} + +unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) { + debug_assert!(!expr.private_data.is_null()); + let private_data = Box::from_raw(expr.private_data as *mut PhysicalExprPrivateData); + drop(private_data); + expr.private_data = std::ptr::null_mut(); +} + +unsafe extern "C" fn clone_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_PhysicalExpr { + let old_private_data = expr.private_data as *const PhysicalExprPrivateData; + + let private_data = Box::into_raw(Box::new(PhysicalExprPrivateData { + expr: Arc::clone(&(*old_private_data).expr), + })) as *mut c_void; + + FFI_PhysicalExpr { + data_type: data_type_fn_wrapper, + nullable: nullable_fn_wrapper, + evaluate: evaluate_fn_wrapper, + return_field: return_field_fn_wrapper, + evaluate_selection: evaluate_selection_fn_wrapper, + children: children_fn_wrapper, + new_with_children: new_with_children_fn_wrapper, + evaluate_bounds: evaluate_bounds_fn_wrapper, + propagate_constraints: propagate_constraints_fn_wrapper, + evaluate_statistics: evaluate_statistics_fn_wrapper, + propagate_statistics: propagate_statistics_fn_wrapper, + get_properties: get_properties_fn_wrapper, + fmt_sql: fmt_sql_fn_wrapper, + snapshot: snapshot_fn_wrapper, + snapshot_generation: snapshot_generation_fn_wrapper, + is_volatile_node: is_volatile_node_fn_wrapper, + display: display_fn_wrapper, + hash: hash_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + library_marker_id: crate::get_library_marker_id, + } +} + +impl Drop for FFI_PhysicalExpr { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl From> for FFI_PhysicalExpr { + /// Creates a new [`FFI_PhysicalExpr`]. + fn from(expr: Arc) -> Self { + let private_data = Box::new(PhysicalExprPrivateData { expr }); + + Self { + data_type: data_type_fn_wrapper, + nullable: nullable_fn_wrapper, + evaluate: evaluate_fn_wrapper, + return_field: return_field_fn_wrapper, + evaluate_selection: evaluate_selection_fn_wrapper, + children: children_fn_wrapper, + new_with_children: new_with_children_fn_wrapper, + evaluate_bounds: evaluate_bounds_fn_wrapper, + propagate_constraints: propagate_constraints_fn_wrapper, + evaluate_statistics: evaluate_statistics_fn_wrapper, + propagate_statistics: propagate_statistics_fn_wrapper, + get_properties: get_properties_fn_wrapper, + fmt_sql: fmt_sql_fn_wrapper, + snapshot: snapshot_fn_wrapper, + snapshot_generation: snapshot_generation_fn_wrapper, + is_volatile_node: is_volatile_node_fn_wrapper, + display: display_fn_wrapper, + hash: hash_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + } + } +} + +/// This wrapper struct exists on the receiver side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_PhysicalExpr to interact with the expression. +#[derive(Debug)] +pub struct ForeignPhysicalExpr { + expr: FFI_PhysicalExpr, + children: Vec>, +} + +unsafe impl Send for ForeignPhysicalExpr {} +unsafe impl Sync for ForeignPhysicalExpr {} + +impl From<&FFI_PhysicalExpr> for Arc { + fn from(ffi_expr: &FFI_PhysicalExpr) -> Self { + if (ffi_expr.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(ffi_expr.inner()) + } else { + let children = unsafe { + (ffi_expr.children)(ffi_expr) + .into_iter() + .map(|expr| >::from(&expr)) + .collect() + }; + + Arc::new(ForeignPhysicalExpr { + expr: ffi_expr.clone(), + children, + }) + } + } +} + +impl Clone for FFI_PhysicalExpr { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl PhysicalExpr for ForeignPhysicalExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> Result { + unsafe { + let schema = WrappedSchema::from(Arc::new(input_schema.clone())); + df_result!((self.expr.data_type)(&self.expr, schema)) + .and_then(|d| DataType::try_from(&d.0).map_err(Into::into)) + } + } + + fn nullable(&self, input_schema: &Schema) -> Result { + unsafe { + let schema = WrappedSchema::from(Arc::new(input_schema.clone())); + df_result!((self.expr.nullable)(&self.expr, schema)) + } + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + unsafe { + let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?; + df_result!((self.expr.evaluate)(&self.expr, batch)) + .and_then(ColumnarValue::try_from) + } + } + + fn return_field(&self, input_schema: &Schema) -> Result { + unsafe { + let schema = WrappedSchema::from(Arc::new(input_schema.clone())); + let result = df_result!((self.expr.return_field)(&self.expr, schema))?; + Field::try_from(&result.0).map(Arc::new).map_err(Into::into) + } + } + + fn evaluate_selection( + &self, + batch: &RecordBatch, + selection: &BooleanArray, + ) -> Result { + unsafe { + let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?; + // This is not ideal - we are cloning the selection array + // This is not terrible since it will be a small array. + // The other alternative is to modify the trait signature. + let selection: ArrayRef = Arc::new(selection.clone()); + let selection = WrappedArray::try_from(&selection)?; + df_result!((self.expr.evaluate_selection)(&self.expr, batch, selection)) + .and_then(ColumnarValue::try_from) + } + } + + fn children(&self) -> Vec<&Arc> { + self.children.iter().collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + unsafe { + let children = children.into_iter().map(FFI_PhysicalExpr::from).collect(); + df_result!((self.expr.new_with_children)(&self.expr, &children) + .map(|expr| >::from(&expr))) + } + } + + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { + unsafe { + let children = children + .iter() + .map(|interval| FFI_Interval::try_from(*interval)) + .collect::>>()?; + df_result!((self.expr.evaluate_bounds)(&self.expr, children)) + .and_then(Interval::try_from) + } + } + + fn propagate_constraints( + &self, + interval: &Interval, + children: &[&Interval], + ) -> Result>> { + unsafe { + let interval = interval.try_into()?; + let children = children + .iter() + .map(|interval| FFI_Interval::try_from(*interval)) + .collect::>>()?; + let result = df_result!((self.expr.propagate_constraints)( + &self.expr, interval, children + ))?; + + let result: Option<_> = result + .map(|intervals| { + intervals + .into_iter() + .map(Interval::try_from) + .collect::>>() + }) + .into(); + result.transpose() + } + } + + fn evaluate_statistics(&self, children: &[&Distribution]) -> Result { + unsafe { + let children = children + .iter() + .map(|dist| FFI_Distribution::try_from(*dist)) + .collect::>>()?; + + let result = + df_result!((self.expr.evaluate_statistics)(&self.expr, children))?; + Distribution::try_from(result) + } + } + + fn propagate_statistics( + &self, + parent: &Distribution, + children: &[&Distribution], + ) -> Result>> { + unsafe { + let parent = FFI_Distribution::try_from(parent)?; + let children = children + .iter() + .map(|dist| FFI_Distribution::try_from(*dist)) + .collect::>>()?; + let result = df_result!((self.expr.propagate_statistics)( + &self.expr, parent, children + ))?; + + let result: Option>> = result + .map(|dists| { + dists + .into_iter() + .map(Distribution::try_from) + .collect::>>() + }) + .into(); + + result.transpose() + } + } + + fn get_properties(&self, children: &[ExprProperties]) -> Result { + unsafe { + let children = children + .iter() + .map(FFI_ExprProperties::try_from) + .collect::>>()?; + df_result!((self.expr.get_properties)(&self.expr, children)) + .and_then(ExprProperties::try_from) + } + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + unsafe { + match (self.expr.fmt_sql)(&self.expr) { + RResult::ROk(sql) => write!(f, "{sql}"), + RResult::RErr(_) => Err(std::fmt::Error), + } + } + } + + fn snapshot(&self) -> Result>> { + unsafe { + let result = df_result!((self.expr.snapshot)(&self.expr))?; + Ok(result + .map(|expr| >::from(&expr)) + .into()) + } + } + + fn snapshot_generation(&self) -> u64 { + unsafe { (self.expr.snapshot_generation)(&self.expr) } + } + + fn is_volatile_node(&self) -> bool { + unsafe { (self.expr.is_volatile_node)(&self.expr) } + } +} + +impl Eq for ForeignPhysicalExpr {} +impl PartialEq for ForeignPhysicalExpr { + fn eq(&self, other: &Self) -> bool { + // FFI_PhysicalExpr cannot be compared, so identity equality is the best we can do. + std::ptr::eq(self, other) + } +} +impl Hash for ForeignPhysicalExpr { + fn hash(&self, state: &mut H) { + let value = unsafe { (self.expr.hash)(&self.expr) }; + value.hash(state) + } +} + +impl Display for ForeignPhysicalExpr { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let display = unsafe { (self.expr.display)(&self.expr) }; + write!(f, "{display}") + } +} + +#[cfg(test)] +mod tests { + use std::{ + hash::{DefaultHasher, Hash, Hasher}, + sync::Arc, + }; + + use arrow::array::{record_batch, BooleanArray, RecordBatch}; + use datafusion_common::{tree_node::DynTreeNode, DataFusionError, ScalarValue}; + use datafusion_expr::{interval_arithmetic::Interval, statistics::Distribution}; + use datafusion_physical_expr::expressions::{Column, NegativeExpr, NotExpr}; + use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExpr}; + + use crate::physical_expr::FFI_PhysicalExpr; + + fn create_test_expr() -> (Arc, Arc) { + let original = Arc::new(Column::new("a", 0)) as Arc; + let mut ffi_expr = FFI_PhysicalExpr::from(Arc::clone(&original)); + ffi_expr.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_expr: Arc = (&ffi_expr).into(); + + (original, foreign_expr) + } + + fn test_record_batch() -> RecordBatch { + record_batch!(("a", Int32, [1, 2, 3])).unwrap() + } + + #[test] + fn ffi_physical_expr_fields() -> Result<(), DataFusionError> { + let (original, foreign_expr) = create_test_expr(); + let schema = test_record_batch().schema(); + + // Verify the mock marker worked, otherwise tests to follow are not useful + assert_ne!(original.as_ref(), foreign_expr.as_ref()); + + assert_eq!( + original.return_field(&schema)?, + foreign_expr.return_field(&schema)? + ); + + assert_eq!( + original.data_type(&schema)?, + foreign_expr.data_type(&schema)? + ); + assert_eq!(original.nullable(&schema)?, foreign_expr.nullable(&schema)?); + + Ok(()) + } + #[test] + fn ffi_physical_expr_evaluate() -> Result<(), DataFusionError> { + let (original, foreign_expr) = create_test_expr(); + let rb = test_record_batch(); + + assert_eq!( + original.evaluate(&rb)?.to_array(3)?.as_ref(), + foreign_expr.evaluate(&rb)?.to_array(3)?.as_ref() + ); + + Ok(()) + } + #[test] + fn ffi_physical_expr_selection() -> Result<(), DataFusionError> { + let (original, foreign_expr) = create_test_expr(); + let rb = test_record_batch(); + + let selection = BooleanArray::from(vec![true, false, true]); + + assert_eq!( + original + .evaluate_selection(&rb, &selection)? + .to_array(3)? + .as_ref(), + foreign_expr + .evaluate_selection(&rb, &selection)? + .to_array(3)? + .as_ref() + ); + Ok(()) + } + + #[test] + fn ffi_physical_expr_with_children() -> Result<(), DataFusionError> { + let (original, _) = create_test_expr(); + let not_expr = + Arc::new(NotExpr::new(Arc::clone(&original))) as Arc; + let mut ffi_not = FFI_PhysicalExpr::from(not_expr); + ffi_not.library_marker_id = crate::mock_foreign_marker_id; + let foreign_not: Arc = (&ffi_not).into(); + + let replacement = Arc::new(Column::new("b", 1)) as Arc; + let updated = + Arc::clone(&foreign_not).with_new_children(vec![Arc::clone(&replacement)])?; + assert_eq!( + format!("{updated:?}").as_str(), + "NotExpr { arg: Column { name: \"b\", index: 1 } }" + ); + + let updated = foreign_not + .with_new_arc_children(Arc::clone(&foreign_not), vec![replacement])?; + assert_eq!(format!("{updated}").as_str(), "NOT b@1"); + + Ok(()) + } + + fn create_test_negative_expr() -> (Arc, Arc) { + let (original, _) = create_test_expr(); + + let negative_expr = + Arc::new(NegativeExpr::new(Arc::clone(&original))) as Arc; + let mut ffi_neg = FFI_PhysicalExpr::from(Arc::clone(&negative_expr)); + ffi_neg.library_marker_id = crate::mock_foreign_marker_id; + let foreign_neg: Arc = (&ffi_neg).into(); + + (negative_expr, foreign_neg) + } + + #[test] + fn ffi_physical_expr_bounds() -> Result<(), DataFusionError> { + let (negative_expr, foreign_neg) = create_test_negative_expr(); + + let interval = + Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?; + let left = negative_expr.evaluate_bounds(&[&interval])?; + let right = foreign_neg.evaluate_bounds(&[&interval])?; + + assert_eq!(left, right); + + Ok(()) + } + + #[test] + fn ffi_physical_expr_constraints() -> Result<(), DataFusionError> { + let (negative_expr, foreign_neg) = create_test_negative_expr(); + + let interval = + Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?; + + let child = + Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?; + let left = negative_expr.propagate_constraints(&interval, &[&child])?; + let right = foreign_neg.propagate_constraints(&interval, &[&child])?; + + assert_eq!(left, right); + Ok(()) + } + + #[test] + fn ffi_physical_expr_statistics() -> Result<(), DataFusionError> { + let (negative_expr, foreign_neg) = create_test_negative_expr(); + let interval = + Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?; + + for distribution in [ + Distribution::new_uniform(interval.clone())?, + Distribution::new_exponential( + ScalarValue::Int32(Some(10)), + ScalarValue::Int32(Some(10)), + true, + )?, + Distribution::new_gaussian( + ScalarValue::Int32(Some(10)), + ScalarValue::Int32(Some(10)), + )?, + Distribution::new_generic( + ScalarValue::Int32(Some(10)), + ScalarValue::Int32(Some(10)), + ScalarValue::Int32(Some(10)), + interval, + )?, + ] { + let left = negative_expr.evaluate_statistics(&[&distribution])?; + let right = foreign_neg.evaluate_statistics(&[&distribution])?; + + assert_eq!(left, right); + + let left = + negative_expr.propagate_statistics(&distribution, &[&distribution])?; + let right = + foreign_neg.propagate_statistics(&distribution, &[&distribution])?; + + assert_eq!(left, right); + } + Ok(()) + } + + #[test] + fn ffi_physical_expr_properties() -> Result<(), DataFusionError> { + let (original, foreign_expr) = create_test_expr(); + + let left = original.get_properties(&[])?; + let right = foreign_expr.get_properties(&[])?; + + assert_eq!(left.sort_properties, right.sort_properties); + assert_eq!(left.range, right.range); + + Ok(()) + } + + #[test] + fn ffi_physical_formatting() { + let (original, foreign_expr) = create_test_expr(); + + let left = format!("{}", fmt_sql(original.as_ref())); + let right = format!("{}", fmt_sql(foreign_expr.as_ref())); + assert_eq!(left, right); + } + + #[test] + fn ffi_physical_expr_snapshots() -> Result<(), DataFusionError> { + let (original, foreign_expr) = create_test_expr(); + + let left = original.snapshot()?; + let right = foreign_expr.snapshot()?; + assert_eq!(left, right); + + assert_eq!( + original.snapshot_generation(), + foreign_expr.snapshot_generation() + ); + + Ok(()) + } + + #[test] + fn ffi_physical_expr_volatility() { + let (original, foreign_expr) = create_test_expr(); + assert_eq!(original.is_volatile_node(), foreign_expr.is_volatile_node()); + } + + #[test] + fn ffi_physical_expr_hash() { + let (_, foreign_1) = create_test_expr(); + let (_, foreign_2) = create_test_expr(); + + assert_ne!(&foreign_1, &foreign_2); + + let mut hasher = DefaultHasher::new(); + foreign_1.as_ref().hash(&mut hasher); + let hash_1 = hasher.finish(); + + let mut hasher = DefaultHasher::new(); + foreign_2.as_ref().hash(&mut hasher); + let hash_2 = hasher.finish(); + + // We cannot compare a local object and a foreign object + // so create two foreign objects that *should* be identical + // even though they were created differently. + assert_eq!(hash_1, hash_2); + } + + #[test] + fn ffi_physical_expr_display() { + let (original, foreign_expr) = create_test_expr(); + assert_eq!(format!("{original}"), format!("{foreign_expr}")); + } +} diff --git a/datafusion/ffi/src/physical_expr/partitioning.rs b/datafusion/ffi/src/physical_expr/partitioning.rs new file mode 100644 index 000000000000..32ca1f4835ea --- /dev/null +++ b/datafusion/ffi/src/physical_expr/partitioning.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use abi_stable::{std_types::RVec, StableAbi}; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +use crate::physical_expr::FFI_PhysicalExpr; + +/// A stable struct for sharing [`Partitioning`] across FFI boundaries. +/// See ['Partitioning'] for the meaning of each variant. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub enum FFI_Partitioning { + RoundRobinBatch(usize), + Hash(RVec, usize), + UnknownPartitioning(usize), +} + +impl From<&Partitioning> for FFI_Partitioning { + fn from(value: &Partitioning) -> Self { + match value { + Partitioning::RoundRobinBatch(size) => Self::RoundRobinBatch(*size), + Partitioning::Hash(exprs, size) => { + let exprs = exprs + .iter() + .map(Arc::clone) + .map(FFI_PhysicalExpr::from) + .collect(); + Self::Hash(exprs, *size) + } + Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size), + } + } +} + +impl From<&FFI_Partitioning> for Partitioning { + fn from(value: &FFI_Partitioning) -> Self { + match value { + FFI_Partitioning::RoundRobinBatch(size) => { + Partitioning::RoundRobinBatch(*size) + } + FFI_Partitioning::Hash(exprs, size) => { + let exprs = exprs.iter().map(>::from).collect(); + Self::Hash(exprs, *size) + } + FFI_Partitioning::UnknownPartitioning(size) => { + Self::UnknownPartitioning(*size) + } + } + } +} + +#[cfg(test)] +mod tests { + use datafusion_physical_expr::{expressions::lit, Partitioning}; + + use crate::physical_expr::partitioning::FFI_Partitioning; + + #[test] + fn round_trip_ffi_partitioning() { + for partitioning in [ + Partitioning::RoundRobinBatch(10), + Partitioning::Hash(vec![lit(1)], 10), + Partitioning::UnknownPartitioning(10), + ] { + let ffi_partitioning: FFI_Partitioning = (&partitioning).into(); + let returned: Partitioning = (&ffi_partitioning).into(); + + if let Partitioning::UnknownPartitioning(return_size) = returned { + let Partitioning::UnknownPartitioning(original_size) = partitioning + else { + panic!("Expected unknown partitioning") + }; + assert_eq!(return_size, original_size); + } else { + assert_eq!(partitioning, returned); + } + } + } +} diff --git a/datafusion/ffi/src/physical_expr/sort.rs b/datafusion/ffi/src/physical_expr/sort.rs new file mode 100644 index 000000000000..8c5df9f4aef8 --- /dev/null +++ b/datafusion/ffi/src/physical_expr/sort.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use abi_stable::StableAbi; +use arrow_schema::SortOptions; +use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +use crate::expr::expr_properties::FFI_SortOptions; +use crate::physical_expr::FFI_PhysicalExpr; + +/// A stable struct for sharing [`PhysicalSortExpr`] across FFI boundaries. +/// See [`PhysicalSortExpr`] for the meaning of each field. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_PhysicalSortExpr { + expr: FFI_PhysicalExpr, + options: FFI_SortOptions, +} + +impl From<&PhysicalSortExpr> for FFI_PhysicalSortExpr { + fn from(value: &PhysicalSortExpr) -> Self { + let expr = FFI_PhysicalExpr::from(value.clone().expr); + let options = FFI_SortOptions::from(&value.options); + + Self { expr, options } + } +} + +impl From<&FFI_PhysicalSortExpr> for PhysicalSortExpr { + fn from(value: &FFI_PhysicalSortExpr) -> Self { + let expr: Arc = (&value.expr).into(); + let options = SortOptions::from(&value.options); + + Self { expr, options } + } +} + +#[cfg(test)] +mod tests { + use crate::physical_expr::sort::FFI_PhysicalSortExpr; + use arrow_schema::SortOptions; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + use std::sync::Arc; + + #[test] + fn ffi_sort_expr_round_trip() { + let col_expr = Arc::new(Column::new("a", 0)) as Arc; + let expr = PhysicalSortExpr::new(col_expr, SortOptions::default()); + + let ffi_expr = FFI_PhysicalSortExpr::from(&expr); + let foreign_expr = PhysicalSortExpr::from(&ffi_expr); + + assert_eq!(expr, foreign_expr); + } +} diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 0b8177a41242..40ac82c39db3 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -18,10 +18,7 @@ use std::{ffi::c_void, sync::Arc}; use abi_stable::{ - std_types::{ - RResult::{self, ROk}, - RString, RVec, - }, + std_types::{RResult::ROk, RVec}, StableAbi, }; use arrow::datatypes::SchemaRef; @@ -44,6 +41,7 @@ use datafusion_proto::{ }; use prost::Message; +use crate::util::FFIResult; use crate::{arrow_wrappers::WrappedSchema, df_result, rresult_return}; /// A stable struct for sharing [`PlanProperties`] across FFI boundaries. @@ -53,8 +51,7 @@ use crate::{arrow_wrappers::WrappedSchema, df_result, rresult_return}; pub struct FFI_PlanProperties { /// The output partitioning is a [`Partitioning`] protobuf message serialized /// into bytes to pass across the FFI boundary. - pub output_partitioning: - unsafe extern "C" fn(plan: &Self) -> RResult, RString>, + pub output_partitioning: unsafe extern "C" fn(plan: &Self) -> FFIResult>, /// Return the emission type of the plan. pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType, @@ -64,7 +61,7 @@ pub struct FFI_PlanProperties { /// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message /// serialized into bytes to pass across the FFI boundary. - pub output_ordering: unsafe extern "C" fn(plan: &Self) -> RResult, RString>, + pub output_ordering: unsafe extern "C" fn(plan: &Self) -> FFIResult>, /// Return the schema of the plan. pub schema: unsafe extern "C" fn(plan: &Self) -> WrappedSchema, @@ -95,7 +92,7 @@ impl FFI_PlanProperties { unsafe extern "C" fn output_partitioning_fn_wrapper( properties: &FFI_PlanProperties, -) -> RResult, RString> { +) -> FFIResult> { let codec = DefaultPhysicalExtensionCodec {}; let partitioning_data = rresult_return!(serialize_partitioning( properties.inner().output_partitioning(), @@ -120,7 +117,7 @@ unsafe extern "C" fn boundedness_fn_wrapper( unsafe extern "C" fn output_ordering_fn_wrapper( properties: &FFI_PlanProperties, -) -> RResult, RString> { +) -> FFIResult> { let codec = DefaultPhysicalExtensionCodec {}; let output_ordering = match properties.inner().output_ordering() { Some(ordering) => { @@ -144,9 +141,11 @@ unsafe extern "C" fn schema_fn_wrapper(properties: &FFI_PlanProperties) -> Wrapp } unsafe extern "C" fn release_fn_wrapper(props: &mut FFI_PlanProperties) { + debug_assert!(!props.private_data.is_null()); let private_data = Box::from_raw(props.private_data as *mut PlanPropertiesPrivateData); drop(private_data); + props.private_data = std::ptr::null_mut(); } impl Drop for FFI_PlanProperties { diff --git a/datafusion/ffi/src/record_batch_stream.rs b/datafusion/ffi/src/record_batch_stream.rs index 1739235d1703..b515125f9aeb 100644 --- a/datafusion/ffi/src/record_batch_stream.rs +++ b/datafusion/ffi/src/record_batch_stream.rs @@ -18,7 +18,7 @@ use std::{ffi::c_void, task::Poll}; use abi_stable::{ - std_types::{ROption, RResult, RString}, + std_types::{ROption, RResult}, StableAbi, }; use arrow::array::{Array, RecordBatch}; @@ -32,10 +32,11 @@ use datafusion::{ error::DataFusionError, execution::{RecordBatchStream, SendableRecordBatchStream}, }; -use datafusion_common::{exec_datafusion_err, exec_err}; +use datafusion_common::{ffi_datafusion_err, ffi_err}; use futures::{Stream, TryStreamExt}; use tokio::runtime::Handle; +use crate::util::FFIResult; use crate::{ arrow_wrappers::{WrappedArray, WrappedSchema}, rresult, @@ -49,11 +50,10 @@ use crate::{ pub struct FFI_RecordBatchStream { /// This mirrors the `poll_next` of [`RecordBatchStream`] but does so /// in a FFI safe manner. - pub poll_next: - unsafe extern "C" fn( - stream: &Self, - cx: &mut FfiContext, - ) -> FfiPoll>>, + pub poll_next: unsafe extern "C" fn( + stream: &Self, + cx: &mut FfiContext, + ) -> FfiPoll>>, /// Return the schema of the record batch pub schema: unsafe extern "C" fn(stream: &Self) -> WrappedSchema, @@ -102,14 +102,16 @@ unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> Wrappe } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_RecordBatchStream) { + debug_assert!(!provider.private_data.is_null()); let private_data = Box::from_raw(provider.private_data as *mut RecordBatchStreamPrivateData); drop(private_data); + provider.private_data = std::ptr::null_mut(); } -fn record_batch_to_wrapped_array( +pub(crate) fn record_batch_to_wrapped_array( record_batch: RecordBatch, -) -> RResult { +) -> FFIResult { let struct_array = StructArray::from(record_batch); rresult!( to_ffi(&struct_array.to_data()).map(|(array, schema)| WrappedArray { @@ -122,7 +124,7 @@ fn record_batch_to_wrapped_array( // probably want to use pub unsafe fn from_ffi(array: FFI_ArrowArray, schema: &FFI_ArrowSchema) -> Result { fn maybe_record_batch_to_wrapped_stream( record_batch: Option>, -) -> ROption> { +) -> ROption> { match record_batch { Some(Ok(record_batch)) => { ROption::RSome(record_batch_to_wrapped_array(record_batch)) @@ -135,7 +137,7 @@ fn maybe_record_batch_to_wrapped_stream( unsafe extern "C" fn poll_next_fn_wrapper( stream: &FFI_RecordBatchStream, cx: &mut FfiContext, -) -> FfiPoll>> { +) -> FfiPoll>> { let private_data = stream.private_data as *mut RecordBatchStreamPrivateData; let stream = &mut (*private_data).rbs; @@ -157,14 +159,14 @@ impl RecordBatchStream for FFI_RecordBatchStream { } } -fn wrapped_array_to_record_batch(array: WrappedArray) -> Result { +pub(crate) fn wrapped_array_to_record_batch(array: WrappedArray) -> Result { let array_data = unsafe { from_ffi(array.array, &array.schema.0).map_err(DataFusionError::from)? }; let array = make_array(array_data); let struct_array = array .as_any() .downcast_ref::() - .ok_or_else(|| exec_datafusion_err!( + .ok_or_else(|| ffi_datafusion_err!( "Unexpected array type during record batch collection in FFI_RecordBatchStream - expected StructArray" ))?; @@ -172,13 +174,13 @@ fn wrapped_array_to_record_batch(array: WrappedArray) -> Result { } fn maybe_wrapped_array_to_record_batch( - array: ROption>, + array: ROption>, ) -> Option> { match array { ROption::RSome(RResult::ROk(wrapped_array)) => { Some(wrapped_array_to_record_batch(wrapped_array)) } - ROption::RSome(RResult::RErr(e)) => Some(exec_err!("FFI error: {e}")), + ROption::RSome(RResult::RErr(e)) => Some(ffi_err!("{e}")), ROption::RNone => None, } } @@ -198,7 +200,7 @@ impl Stream for FFI_RecordBatchStream { Poll::Ready(maybe_wrapped_array_to_record_batch(array)) } FfiPoll::Pending => Poll::Pending, - FfiPoll::Panicked => Poll::Ready(Some(exec_err!( + FfiPoll::Panicked => Poll::Ready(Some(ffi_err!( "Panic occurred during poll_next on FFI_RecordBatchStream" ))), } diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index d6feeb6b8fb3..eafdb2e69464 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -34,6 +34,7 @@ use crate::{ table_provider::{FFI_TableProvider, ForeignTableProvider}, }; +use crate::util::FFIResult; use datafusion::error::Result; /// A stable struct for sharing [`SchemaProvider`] across FFI boundaries. @@ -48,22 +49,21 @@ pub struct FFI_SchemaProvider { pub table: unsafe extern "C" fn( provider: &Self, name: RString, - ) -> FfiFuture< - RResult, RString>, - >, - - pub register_table: - unsafe extern "C" fn( - provider: &Self, - name: RString, - table: FFI_TableProvider, - ) -> RResult, RString>, - - pub deregister_table: - unsafe extern "C" fn( - provider: &Self, - name: RString, - ) -> RResult, RString>, + ) + -> FfiFuture>>, + + pub register_table: unsafe extern "C" fn( + provider: &Self, + name: RString, + table: FFI_TableProvider, + ) + -> FFIResult>, + + pub deregister_table: unsafe extern "C" fn( + provider: &Self, + name: RString, + ) + -> FFIResult>, pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> bool, @@ -119,7 +119,7 @@ unsafe extern "C" fn table_names_fn_wrapper( unsafe extern "C" fn table_fn_wrapper( provider: &FFI_SchemaProvider, name: RString, -) -> FfiFuture, RString>> { +) -> FfiFuture>> { let runtime = provider.runtime(); let provider = Arc::clone(provider.inner()); @@ -137,7 +137,7 @@ unsafe extern "C" fn register_table_fn_wrapper( provider: &FFI_SchemaProvider, name: RString, table: FFI_TableProvider, -) -> RResult, RString> { +) -> FFIResult> { let runtime = provider.runtime(); let provider = provider.inner(); @@ -152,7 +152,7 @@ unsafe extern "C" fn register_table_fn_wrapper( unsafe extern "C" fn deregister_table_fn_wrapper( provider: &FFI_SchemaProvider, name: RString, -) -> RResult, RString> { +) -> FFIResult> { let runtime = provider.runtime(); let provider = provider.inner(); @@ -170,8 +170,10 @@ unsafe extern "C" fn table_exist_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) { + debug_assert!(!provider.private_data.is_null()); let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); drop(private_data); + provider.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper( diff --git a/datafusion/ffi/src/session_config.rs b/datafusion/ffi/src/session_config.rs index a07b66c60196..ae28d3ac472d 100644 --- a/datafusion/ffi/src/session_config.rs +++ b/datafusion/ffi/src/session_config.rs @@ -78,9 +78,11 @@ unsafe extern "C" fn config_options_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) { + debug_assert!(!config.private_data.is_null()); let private_data = Box::from_raw(config.private_data as *mut SessionConfigPrivateData); drop(private_data); + config.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_SessionConfig { diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 10b44a147fa0..bc98eab518ae 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -18,7 +18,7 @@ use std::{any::Any, ffi::c_void, sync::Arc}; use abi_stable::{ - std_types::{ROption, RResult, RString, RVec}, + std_types::{ROption, RResult, RVec}, StableAbi, }; use arrow::datatypes::SchemaRef; @@ -53,6 +53,7 @@ use super::{ execution_plan::FFI_ExecutionPlan, insert_op::FFI_InsertOp, session_config::FFI_SessionConfig, }; +use crate::util::FFIResult; use datafusion::error::Result; /// A stable struct for sharing [`TableProvider`] across FFI boundaries. @@ -118,7 +119,7 @@ pub struct FFI_TableProvider { projections: RVec, filters_serialized: RVec, limit: ROption, - ) -> FfiFuture>, + ) -> FfiFuture>, /// Return the type of table. See [`TableType`] for options. pub table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType, @@ -130,17 +131,15 @@ pub struct FFI_TableProvider { unsafe extern "C" fn( provider: &FFI_TableProvider, filters_serialized: RVec, - ) - -> RResult, RString>, + ) -> FFIResult>, >, - pub insert_into: - unsafe extern "C" fn( - provider: &Self, - session_config: &FFI_SessionConfig, - input: &FFI_ExecutionPlan, - insert_op: FFI_InsertOp, - ) -> FfiFuture>, + pub insert_into: unsafe extern "C" fn( + provider: &Self, + session_config: &FFI_SessionConfig, + input: &FFI_ExecutionPlan, + insert_op: FFI_InsertOp, + ) -> FfiFuture>, /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. @@ -222,7 +221,7 @@ fn supports_filters_pushdown_internal( unsafe extern "C" fn supports_filters_pushdown_fn_wrapper( provider: &FFI_TableProvider, filters_serialized: RVec, -) -> RResult, RString> { +) -> FFIResult> { supports_filters_pushdown_internal(provider.inner(), &filters_serialized) .map_err(|e| e.to_string().into()) .into() @@ -234,7 +233,7 @@ unsafe extern "C" fn scan_fn_wrapper( projections: RVec, filters_serialized: RVec, limit: ROption, -) -> FfiFuture> { +) -> FfiFuture> { let runtime = provider.runtime().clone(); let internal_provider = Arc::clone(provider.inner()); let session_config = session_config.clone(); @@ -286,7 +285,7 @@ unsafe extern "C" fn insert_into_fn_wrapper( session_config: &FFI_SessionConfig, input: &FFI_ExecutionPlan, insert_op: FFI_InsertOp, -) -> FfiFuture> { +) -> FfiFuture> { let runtime = provider.runtime().clone(); let internal_provider = Arc::clone(provider.inner()); let session_config = session_config.clone(); @@ -320,8 +319,10 @@ unsafe extern "C" fn insert_into_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) { + debug_assert!(!provider.private_data.is_null()); let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); drop(private_data); + provider.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider { diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index 6ea58f75a8d3..c0ab0bdd5b7e 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -16,7 +16,7 @@ // under the License. use abi_stable::{ - std_types::{RResult, RString, RVec}, + std_types::{RResult, RVec}, StableAbi, }; use arrow::{array::ArrayRef, error::ArrowError}; @@ -29,6 +29,7 @@ use prost::Message; use std::ptr::null_mut; use std::{ffi::c_void, ops::Deref}; +use crate::util::FFIResult; use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; /// A stable struct for sharing [`Accumulator`] across FFI boundaries. @@ -41,26 +42,24 @@ pub struct FFI_Accumulator { pub update_batch: unsafe extern "C" fn( accumulator: &mut Self, values: RVec, - ) -> RResult<(), RString>, + ) -> FFIResult<()>, // Evaluate and return a ScalarValues as protobuf bytes - pub evaluate: - unsafe extern "C" fn(accumulator: &mut Self) -> RResult, RString>, + pub evaluate: unsafe extern "C" fn(accumulator: &mut Self) -> FFIResult>, pub size: unsafe extern "C" fn(accumulator: &Self) -> usize, - pub state: - unsafe extern "C" fn(accumulator: &mut Self) -> RResult>, RString>, + pub state: unsafe extern "C" fn(accumulator: &mut Self) -> FFIResult>>, pub merge_batch: unsafe extern "C" fn( accumulator: &mut Self, states: RVec, - ) -> RResult<(), RString>, + ) -> FFIResult<()>, pub retract_batch: unsafe extern "C" fn( accumulator: &mut Self, values: RVec, - ) -> RResult<(), RString>, + ) -> FFIResult<()>, pub supports_retract_batch: bool, @@ -101,7 +100,7 @@ impl FFI_Accumulator { unsafe extern "C" fn update_batch_fn_wrapper( accumulator: &mut FFI_Accumulator, values: RVec, -) -> RResult<(), RString> { +) -> FFIResult<()> { let accumulator = accumulator.inner_mut(); let values_arrays = values @@ -115,7 +114,7 @@ unsafe extern "C" fn update_batch_fn_wrapper( unsafe extern "C" fn evaluate_fn_wrapper( accumulator: &mut FFI_Accumulator, -) -> RResult, RString> { +) -> FFIResult> { let accumulator = accumulator.inner_mut(); let scalar_result = rresult_return!(accumulator.evaluate()); @@ -131,7 +130,7 @@ unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_Accumulator) -> usize { unsafe extern "C" fn state_fn_wrapper( accumulator: &mut FFI_Accumulator, -) -> RResult>, RString> { +) -> FFIResult>> { let accumulator = accumulator.inner_mut(); let state = rresult_return!(accumulator.state()); @@ -151,7 +150,7 @@ unsafe extern "C" fn state_fn_wrapper( unsafe extern "C" fn merge_batch_fn_wrapper( accumulator: &mut FFI_Accumulator, states: RVec, -) -> RResult<(), RString> { +) -> FFIResult<()> { let accumulator = accumulator.inner_mut(); let states = rresult_return!(states @@ -165,7 +164,7 @@ unsafe extern "C" fn merge_batch_fn_wrapper( unsafe extern "C" fn retract_batch_fn_wrapper( accumulator: &mut FFI_Accumulator, values: RVec, -) -> RResult<(), RString> { +) -> FFIResult<()> { let accumulator = accumulator.inner_mut(); let values_arrays = values @@ -182,6 +181,7 @@ unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_Accumulator) { let private_data = Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData); drop(private_data); + accumulator.private_data = null_mut(); } } diff --git a/datafusion/ffi/src/udaf/accumulator_args.rs b/datafusion/ffi/src/udaf/accumulator_args.rs index 6ac0a0b21d2d..d79ecba14c68 100644 --- a/datafusion/ffi/src/udaf/accumulator_args.rs +++ b/datafusion/ffi/src/udaf/accumulator_args.rs @@ -30,7 +30,7 @@ use datafusion::{ physical_expr::{PhysicalExpr, PhysicalSortExpr}, prelude::SessionContext, }; -use datafusion_common::exec_datafusion_err; +use datafusion_common::ffi_datafusion_err; use datafusion_proto::{ physical_plan::{ from_proto::{parse_physical_exprs, parse_physical_sort_exprs}, @@ -114,7 +114,7 @@ impl TryFrom for ForeignAccumulatorArgs { value.physical_expr_def.as_ref(), ) .map_err(|e| { - exec_datafusion_err!("Failed to decode PhysicalAggregateExprNode: {e}") + ffi_datafusion_err!("Failed to decode PhysicalAggregateExprNode: {e}") })?; let return_field = Arc::new((&value.return_field.0).try_into()?); diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 29f6a135c3d3..69113d8128c3 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. +use crate::util::FFIResult; use crate::{ arrow_wrappers::{WrappedArray, WrappedSchema}, df_result, rresult, rresult_return, }; use abi_stable::{ - std_types::{ROption, RResult, RString, RVec}, + std_types::{ROption, RVec}, StableAbi, }; use arrow::{ @@ -48,20 +49,20 @@ pub struct FFI_GroupsAccumulator { group_indices: RVec, opt_filter: ROption, total_num_groups: usize, - ) -> RResult<(), RString>, + ) -> FFIResult<()>, // Evaluate and return a ScalarValues as protobuf bytes pub evaluate: unsafe extern "C" fn( accumulator: &mut Self, emit_to: FFI_EmitTo, - ) -> RResult, + ) -> FFIResult, pub size: unsafe extern "C" fn(accumulator: &Self) -> usize, pub state: unsafe extern "C" fn( accumulator: &mut Self, emit_to: FFI_EmitTo, - ) -> RResult, RString>, + ) -> FFIResult>, pub merge_batch: unsafe extern "C" fn( accumulator: &mut Self, @@ -69,14 +70,13 @@ pub struct FFI_GroupsAccumulator { group_indices: RVec, opt_filter: ROption, total_num_groups: usize, - ) -> RResult<(), RString>, + ) -> FFIResult<()>, pub convert_to_state: unsafe extern "C" fn( accumulator: &Self, values: RVec, opt_filter: ROption, - ) - -> RResult, RString>, + ) -> FFIResult>, pub supports_convert_to_state: bool, @@ -136,7 +136,7 @@ unsafe extern "C" fn update_batch_fn_wrapper( group_indices: RVec, opt_filter: ROption, total_num_groups: usize, -) -> RResult<(), RString> { +) -> FFIResult<()> { let accumulator = accumulator.inner_mut(); let values = rresult_return!(process_values(values)); let group_indices: Vec = group_indices.into_iter().collect(); @@ -153,7 +153,7 @@ unsafe extern "C" fn update_batch_fn_wrapper( unsafe extern "C" fn evaluate_fn_wrapper( accumulator: &mut FFI_GroupsAccumulator, emit_to: FFI_EmitTo, -) -> RResult { +) -> FFIResult { let accumulator = accumulator.inner_mut(); let result = rresult_return!(accumulator.evaluate(emit_to.into())); @@ -169,7 +169,7 @@ unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_GroupsAccumulator) -> usi unsafe extern "C" fn state_fn_wrapper( accumulator: &mut FFI_GroupsAccumulator, emit_to: FFI_EmitTo, -) -> RResult, RString> { +) -> FFIResult> { let accumulator = accumulator.inner_mut(); let state = rresult_return!(accumulator.state(emit_to.into())); @@ -185,7 +185,7 @@ unsafe extern "C" fn merge_batch_fn_wrapper( group_indices: RVec, opt_filter: ROption, total_num_groups: usize, -) -> RResult<(), RString> { +) -> FFIResult<()> { let accumulator = accumulator.inner_mut(); let values = rresult_return!(process_values(values)); let group_indices: Vec = group_indices.into_iter().collect(); @@ -203,7 +203,7 @@ unsafe extern "C" fn convert_to_state_fn_wrapper( accumulator: &FFI_GroupsAccumulator, values: RVec, opt_filter: ROption, -) -> RResult, RString> { +) -> FFIResult> { let accumulator = accumulator.inner(); let values = rresult_return!(process_values(values)); let opt_filter = rresult_return!(process_opt_filter(opt_filter)); @@ -221,6 +221,7 @@ unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_GroupsAccumulator) let private_data = Box::from_raw(accumulator.private_data as *mut GroupsAccumulatorPrivateData); drop(private_data); + accumulator.private_data = null_mut(); } } diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index a416753c371b..ec7ebd29e571 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -37,13 +37,15 @@ use datafusion::{ error::Result, logical_expr::{AggregateUDF, AggregateUDFImpl, Signature}, }; -use datafusion_common::exec_datafusion_err; +use datafusion_common::ffi_datafusion_err; use datafusion_proto_common::from_proto::parse_proto_fields_to_fields; use groups_accumulator::FFI_GroupsAccumulator; use std::hash::{Hash, Hasher}; use std::{ffi::c_void, sync::Arc}; -use crate::util::{rvec_wrapped_to_vec_fieldref, vec_fieldref_to_rvec_wrapped}; +use crate::util::{ + rvec_wrapped_to_vec_fieldref, vec_fieldref_to_rvec_wrapped, FFIResult, +}; use crate::{ arrow_wrappers::WrappedSchema, df_result, rresult, rresult_return, @@ -75,7 +77,7 @@ pub struct FFI_AggregateUDF { pub return_field: unsafe extern "C" fn( udaf: &Self, arg_fields: RVec, - ) -> RResult, + ) -> FFIResult, /// FFI equivalent to the `is_nullable` of a [`AggregateUDF`] pub is_nullable: bool, @@ -88,14 +90,14 @@ pub struct FFI_AggregateUDF { pub accumulator: unsafe extern "C" fn( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, - ) -> RResult, + ) -> FFIResult, /// FFI equivalent to [`AggregateUDF::create_sliding_accumulator`] - pub create_sliding_accumulator: - unsafe extern "C" fn( - udaf: &FFI_AggregateUDF, - args: FFI_AccumulatorArgs, - ) -> RResult, + pub create_sliding_accumulator: unsafe extern "C" fn( + udaf: &FFI_AggregateUDF, + args: FFI_AccumulatorArgs, + ) + -> FFIResult, /// FFI equivalent to [`AggregateUDF::state_fields`] #[allow(clippy::type_complexity)] @@ -106,21 +108,21 @@ pub struct FFI_AggregateUDF { return_field: WrappedSchema, ordering_fields: RVec>, is_distinct: bool, - ) -> RResult>, RString>, + ) -> FFIResult>>, /// FFI equivalent to [`AggregateUDF::create_groups_accumulator`] pub create_groups_accumulator: unsafe extern "C" fn( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, - ) -> RResult, + ) -> FFIResult, /// FFI equivalent to [`AggregateUDF::with_beneficial_ordering`] pub with_beneficial_ordering: unsafe extern "C" fn( udaf: &FFI_AggregateUDF, beneficial_ordering: bool, - ) -> RResult, RString>, + ) -> FFIResult>, /// FFI equivalent to [`AggregateUDF::order_sensitivity`] pub order_sensitivity: @@ -133,7 +135,7 @@ pub struct FFI_AggregateUDF { pub coerce_types: unsafe extern "C" fn( udf: &Self, arg_types: RVec, - ) -> RResult, RString>, + ) -> FFIResult>, /// Used to create a clone on the provider of the udaf. This should /// only need to be called by the receiver of the udaf. @@ -169,7 +171,7 @@ impl FFI_AggregateUDF { unsafe extern "C" fn return_field_fn_wrapper( udaf: &FFI_AggregateUDF, arg_fields: RVec, -) -> RResult { +) -> FFIResult { let udaf = udaf.inner(); let arg_fields = rresult_return!(rvec_wrapped_to_vec_fieldref(&arg_fields)); @@ -187,7 +189,7 @@ unsafe extern "C" fn return_field_fn_wrapper( unsafe extern "C" fn accumulator_fn_wrapper( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, -) -> RResult { +) -> FFIResult { let udaf = udaf.inner(); let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); @@ -200,7 +202,7 @@ unsafe extern "C" fn accumulator_fn_wrapper( unsafe extern "C" fn create_sliding_accumulator_fn_wrapper( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, -) -> RResult { +) -> FFIResult { let udaf = udaf.inner(); let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); @@ -213,7 +215,7 @@ unsafe extern "C" fn create_sliding_accumulator_fn_wrapper( unsafe extern "C" fn create_groups_accumulator_fn_wrapper( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, -) -> RResult { +) -> FFIResult { let udaf = udaf.inner(); let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); @@ -240,7 +242,7 @@ unsafe extern "C" fn groups_accumulator_supported_fn_wrapper( unsafe extern "C" fn with_beneficial_ordering_fn_wrapper( udaf: &FFI_AggregateUDF, beneficial_ordering: bool, -) -> RResult, RString> { +) -> FFIResult> { let udaf = udaf.inner().as_ref().clone(); let result = rresult_return!(udaf.with_beneficial_ordering(beneficial_ordering)); @@ -260,7 +262,7 @@ unsafe extern "C" fn state_fields_fn_wrapper( return_field: WrappedSchema, ordering_fields: RVec>, is_distinct: bool, -) -> RResult>, RString> { +) -> FFIResult>> { let udaf = udaf.inner(); let input_fields = &rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields)); @@ -307,7 +309,7 @@ unsafe extern "C" fn order_sensitivity_fn_wrapper( unsafe extern "C" fn coerce_types_fn_wrapper( udaf: &FFI_AggregateUDF, arg_types: RVec, -) -> RResult, RString> { +) -> FFIResult> { let udaf = udaf.inner(); let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); @@ -326,8 +328,10 @@ unsafe extern "C" fn coerce_types_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(udaf: &mut FFI_AggregateUDF) { + debug_assert!(!udaf.private_data.is_null()); let private_data = Box::from_raw(udaf.private_data as *mut AggregateUDFPrivateData); drop(private_data); + udaf.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper(udaf: &FFI_AggregateUDF) -> FFI_AggregateUDF { @@ -497,13 +501,13 @@ impl AggregateUDFImpl for ForeignAggregateUDF { .into_iter() .map(|field_bytes| { datafusion_proto_common::Field::decode(field_bytes.as_ref()) - .map_err(|e| exec_datafusion_err!("{e}")) + .map_err(|e| ffi_datafusion_err!("{e}")) }) .collect::>>()?; parse_proto_fields_to_fields(fields.iter()) .map(|fields| fields.into_iter().map(Arc::new).collect()) - .map_err(|e| exec_datafusion_err!("{e}")) + .map_err(|e| ffi_datafusion_err!("{e}")) } } diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index b90cc267e0bd..dcb6afb6fd1a 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::util::FFIResult; use crate::{ arrow_wrappers::{WrappedArray, WrappedSchema}, df_result, rresult, rresult_return, @@ -71,8 +72,7 @@ pub struct FFI_ScalarUDF { pub return_field_from_args: unsafe extern "C" fn( udf: &Self, args: FFI_ReturnFieldArgs, - ) - -> RResult, + ) -> FFIResult, /// Execute the underlying [`ScalarUDF`] and return the result as a `FFI_ArrowArray` /// within an AbiStable wrapper. @@ -83,7 +83,7 @@ pub struct FFI_ScalarUDF { arg_fields: RVec, num_rows: usize, return_field: WrappedSchema, - ) -> RResult, + ) -> FFIResult, /// See [`ScalarUDFImpl`] for details on short_circuits pub short_circuits: bool, @@ -95,7 +95,7 @@ pub struct FFI_ScalarUDF { pub coerce_types: unsafe extern "C" fn( udf: &Self, arg_types: RVec, - ) -> RResult, RString>, + ) -> FFIResult>, /// Used to create a clone on the provider of the udf. This should /// only need to be called by the receiver of the udf. @@ -131,7 +131,7 @@ impl FFI_ScalarUDF { unsafe extern "C" fn return_field_from_args_fn_wrapper( udf: &FFI_ScalarUDF, args: FFI_ReturnFieldArgs, -) -> RResult { +) -> FFIResult { let args: ForeignReturnFieldArgsOwned = rresult_return!((&args).try_into()); let args_ref: ForeignReturnFieldArgs = (&args).into(); @@ -147,7 +147,7 @@ unsafe extern "C" fn return_field_from_args_fn_wrapper( unsafe extern "C" fn coerce_types_fn_wrapper( udf: &FFI_ScalarUDF, arg_types: RVec, -) -> RResult, RString> { +) -> FFIResult> { let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); let return_types = @@ -162,7 +162,7 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper( arg_fields: RVec, number_rows: usize, return_field: WrappedSchema, -) -> RResult { +) -> FFIResult { let args = args .into_iter() .map(|arr| { @@ -207,8 +207,10 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(udf: &mut FFI_ScalarUDF) { + debug_assert!(!udf.private_data.is_null()); let private_data = Box::from_raw(udf.private_data as *mut ScalarUDFPrivateData); drop(private_data); + udf.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper(udf: &FFI_ScalarUDF) -> FFI_ScalarUDF { diff --git a/datafusion/ffi/src/udf/return_type_args.rs b/datafusion/ffi/src/udf/return_type_args.rs index c437c9537be6..c9396b797183 100644 --- a/datafusion/ffi/src/udf/return_type_args.rs +++ b/datafusion/ffi/src/udf/return_type_args.rs @@ -21,7 +21,7 @@ use abi_stable::{ }; use arrow_schema::FieldRef; use datafusion::{ - common::exec_datafusion_err, error::DataFusionError, logical_expr::ReturnFieldArgs, + common::ffi_datafusion_err, error::DataFusionError, logical_expr::ReturnFieldArgs, scalar::ScalarValue, }; @@ -91,7 +91,7 @@ impl TryFrom<&FFI_ReturnFieldArgs> for ForeignReturnFieldArgsOwned { let maybe_arg = maybe_arg.as_ref().map(|arg| { let proto_value = datafusion_proto::protobuf::ScalarValue::decode(arg.as_ref()) - .map_err(|err| exec_datafusion_err!("{}", err))?; + .map_err(|err| ffi_datafusion_err!("{}", err))?; let scalar_value: ScalarValue = (&proto_value).try_into()?; Ok(scalar_value) }); diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index e603b9234c33..f15429595801 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -18,7 +18,7 @@ use std::{ffi::c_void, sync::Arc}; use abi_stable::{ - std_types::{RResult, RString, RVec}, + std_types::{RResult, RVec}, StableAbi, }; @@ -36,6 +36,7 @@ use datafusion_proto::{ use prost::Message; use tokio::runtime::Handle; +use crate::util::FFIResult; use crate::{df_result, rresult_return, table_provider::FFI_TableProvider}; /// A stable struct for sharing a [`TableFunctionImpl`] across FFI boundaries. @@ -45,10 +46,8 @@ use crate::{df_result, rresult_return, table_provider::FFI_TableProvider}; pub struct FFI_TableFunction { /// Equivalent to the `call` function of the TableFunctionImpl. /// The arguments are Expr passed as protobuf encoded bytes. - pub call: unsafe extern "C" fn( - udtf: &Self, - args: RVec, - ) -> RResult, + pub call: + unsafe extern "C" fn(udtf: &Self, args: RVec) -> FFIResult, /// Used to create a clone on the provider of the udtf. This should /// only need to be called by the receiver of the udtf. @@ -90,7 +89,7 @@ impl FFI_TableFunction { unsafe extern "C" fn call_fn_wrapper( udtf: &FFI_TableFunction, args: RVec, -) -> RResult { +) -> FFIResult { let runtime = udtf.runtime(); let udtf = udtf.inner(); @@ -107,8 +106,10 @@ unsafe extern "C" fn call_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(udtf: &mut FFI_TableFunction) { + debug_assert!(!udtf.private_data.is_null()); let private_data = Box::from_raw(udtf.private_data as *mut TableFunctionPrivateData); drop(private_data); + udtf.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper(udtf: &FFI_TableFunction) -> FFI_TableFunction { diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index d961ffa5b59b..0d90c90ce181 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -15,46 +15,47 @@ // specific language governing permissions and limitations // under the License. +use std::{ + ffi::c_void, + hash::{Hash, Hasher}, + sync::Arc, +}; + use abi_stable::{ std_types::{ROption, RResult, RString, RVec}, StableAbi, }; -use arrow::datatypes::Schema; use arrow::{ compute::SortOptions, - datatypes::{DataType, SchemaRef}, + datatypes::{DataType, Schema, SchemaRef}, }; use arrow_schema::{Field, FieldRef}; -use datafusion::logical_expr::LimitEffect; -use datafusion::physical_expr::PhysicalExpr; use datafusion::{ - error::DataFusionError, + error::{DataFusionError, Result}, logical_expr::{ function::WindowUDFFieldArgs, type_coercion::functions::fields_with_window_udf, - PartitionEvaluator, + LimitEffect, PartitionEvaluator, Signature, WindowUDF, WindowUDFImpl, }, + physical_expr::PhysicalExpr, }; -use datafusion::{ - error::Result, - logical_expr::{Signature, WindowUDF, WindowUDFImpl}, -}; -use datafusion_common::exec_err; +use datafusion_common::ffi_err; use partition_evaluator::FFI_PartitionEvaluator; use partition_evaluator_args::{ FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs, }; -use std::hash::{Hash, Hasher}; -use std::{ffi::c_void, sync::Arc}; mod partition_evaluator; mod partition_evaluator_args; mod range; -use crate::util::{rvec_wrapped_to_vec_fieldref, vec_fieldref_to_rvec_wrapped}; +use crate::util::FFIResult; use crate::{ arrow_wrappers::WrappedSchema, df_result, rresult, rresult_return, - util::{rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped}, + util::{ + rvec_wrapped_to_vec_datatype, rvec_wrapped_to_vec_fieldref, + vec_datatype_to_rvec_wrapped, vec_fieldref_to_rvec_wrapped, + }, volatility::FFI_Volatility, }; @@ -72,17 +73,17 @@ pub struct FFI_WindowUDF { /// FFI equivalent to the `volatility` of a [`WindowUDF`] pub volatility: FFI_Volatility, - pub partition_evaluator: - unsafe extern "C" fn( - udwf: &Self, - args: FFI_PartitionEvaluatorArgs, - ) -> RResult, + pub partition_evaluator: unsafe extern "C" fn( + udwf: &Self, + args: FFI_PartitionEvaluatorArgs, + ) + -> FFIResult, pub field: unsafe extern "C" fn( udwf: &Self, input_types: RVec, display_name: RString, - ) -> RResult, + ) -> FFIResult, /// Performs type coercion. To simply this interface, all UDFs are treated as having /// user defined signatures, which will in turn call coerce_types to be called. This @@ -91,7 +92,7 @@ pub struct FFI_WindowUDF { pub coerce_types: unsafe extern "C" fn( udf: &Self, arg_types: RVec, - ) -> RResult, RString>, + ) -> FFIResult>, pub sort_options: ROption, @@ -129,7 +130,7 @@ impl FFI_WindowUDF { unsafe extern "C" fn partition_evaluator_fn_wrapper( udwf: &FFI_WindowUDF, args: FFI_PartitionEvaluatorArgs, -) -> RResult { +) -> FFIResult { let inner = udwf.inner(); let args = rresult_return!(ForeignPartitionEvaluatorArgs::try_from(args)); @@ -143,7 +144,7 @@ unsafe extern "C" fn field_fn_wrapper( udwf: &FFI_WindowUDF, input_fields: RVec, display_name: RString, -) -> RResult { +) -> FFIResult { let inner = udwf.inner(); let input_fields = rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields)); @@ -161,7 +162,7 @@ unsafe extern "C" fn field_fn_wrapper( unsafe extern "C" fn coerce_types_fn_wrapper( udwf: &FFI_WindowUDF, arg_types: RVec, -) -> RResult, RString> { +) -> FFIResult> { let inner = udwf.inner(); let arg_fields = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)) @@ -180,8 +181,10 @@ unsafe extern "C" fn coerce_types_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(udwf: &mut FFI_WindowUDF) { + debug_assert!(!udwf.private_data.is_null()); let private_data = Box::from_raw(udwf.private_data as *mut WindowUDFPrivateData); drop(private_data); + udwf.private_data = std::ptr::null_mut(); } unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { @@ -347,7 +350,7 @@ impl WindowUDFImpl for ForeignWindowUDF { let schema: SchemaRef = schema.into(); match schema.fields().is_empty() { - true => exec_err!( + true => ffi_err!( "Unable to retrieve field in WindowUDF via FFI - schema has no fields" ), false => Ok(schema.field(0).to_owned().into()), @@ -394,15 +397,20 @@ impl From<&FFI_SortOptions> for SortOptions { #[cfg(test)] #[cfg(feature = "integration-tests")] mod tests { - use crate::tests::create_record_batch; - use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; - use arrow::array::{create_array, ArrayRef}; - use datafusion::functions_window::lead_lag::{lag_udwf, WindowShift}; - use datafusion::logical_expr::expr::Sort; - use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF, WindowUDFImpl}; - use datafusion::prelude::SessionContext; use std::sync::Arc; + use arrow::array::{create_array, ArrayRef}; + use datafusion::{ + functions_window::lead_lag::{lag_udwf, WindowShift}, + logical_expr::{col, expr::Sort, ExprFunctionExt, WindowUDF, WindowUDFImpl}, + prelude::SessionContext, + }; + + use crate::{ + tests::create_record_batch, + udwf::{FFI_WindowUDF, ForeignWindowUDF}, + }; + fn create_test_foreign_udwf( original_udwf: impl WindowUDFImpl + 'static, ) -> datafusion::common::Result { diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index 8217dbb800da..57cc8f73aa93 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -17,11 +17,11 @@ use std::{ffi::c_void, ops::Range}; +use super::range::FFI_Range; +use crate::util::FFIResult; use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; -use abi_stable::{ - std_types::{RResult, RString, RVec}, - StableAbi, -}; +use abi_stable::std_types::RResult; +use abi_stable::{std_types::RVec, StableAbi}; use arrow::{array::ArrayRef, error::ArrowError}; use datafusion::{ error::{DataFusionError, Result}, @@ -30,8 +30,6 @@ use datafusion::{ }; use prost::Message; -use super::range::FFI_Range; - /// A stable struct for sharing [`PartitionEvaluator`] across FFI boundaries. /// For an explanation of each field, see the corresponding function /// defined in [`PartitionEvaluator`]. @@ -43,26 +41,25 @@ pub struct FFI_PartitionEvaluator { evaluator: &mut Self, values: RVec, num_rows: usize, - ) -> RResult, + ) -> FFIResult, pub evaluate: unsafe extern "C" fn( evaluator: &mut Self, values: RVec, range: FFI_Range, - ) -> RResult, RString>, + ) -> FFIResult>, pub evaluate_all_with_rank: unsafe extern "C" fn( evaluator: &Self, num_rows: usize, ranks_in_partition: RVec, - ) - -> RResult, + ) -> FFIResult, pub get_range: unsafe extern "C" fn( evaluator: &Self, idx: usize, n_rows: usize, - ) -> RResult, + ) -> FFIResult, pub is_causal: bool, @@ -106,7 +103,7 @@ unsafe extern "C" fn evaluate_all_fn_wrapper( evaluator: &mut FFI_PartitionEvaluator, values: RVec, num_rows: usize, -) -> RResult { +) -> FFIResult { let inner = evaluator.inner_mut(); let values_arrays = values @@ -126,7 +123,7 @@ unsafe extern "C" fn evaluate_fn_wrapper( evaluator: &mut FFI_PartitionEvaluator, values: RVec, range: FFI_Range, -) -> RResult, RString> { +) -> FFIResult> { let inner = evaluator.inner_mut(); let values_arrays = values @@ -148,7 +145,7 @@ unsafe extern "C" fn evaluate_all_with_rank_fn_wrapper( evaluator: &FFI_PartitionEvaluator, num_rows: usize, ranks_in_partition: RVec, -) -> RResult { +) -> FFIResult { let inner = evaluator.inner(); let ranks_in_partition = ranks_in_partition @@ -167,7 +164,7 @@ unsafe extern "C" fn get_range_fn_wrapper( evaluator: &FFI_PartitionEvaluator, idx: usize, n_rows: usize, -) -> RResult { +) -> FFIResult { let inner = evaluator.inner(); let range = inner.get_range(idx, n_rows).map(FFI_Range::from); @@ -179,6 +176,7 @@ unsafe extern "C" fn release_fn_wrapper(evaluator: &mut FFI_PartitionEvaluator) let private_data = Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); drop(private_data); + evaluator.private_data = std::ptr::null_mut(); } } diff --git a/datafusion/ffi/src/udwf/partition_evaluator_args.rs b/datafusion/ffi/src/udwf/partition_evaluator_args.rs index cd2641256437..93090af6f773 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator_args.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator_args.rs @@ -31,7 +31,7 @@ use datafusion::{ physical_plan::{expressions::Column, PhysicalExpr}, prelude::SessionContext, }; -use datafusion_common::exec_datafusion_err; +use datafusion_common::ffi_datafusion_err; use datafusion_proto::{ physical_plan::{ from_proto::parse_physical_expr, to_proto::serialize_physical_exprs, @@ -146,7 +146,7 @@ impl TryFrom for ForeignPartitionEvaluatorArgs { .into_iter() .map(|input_expr_bytes| PhysicalExprNode::decode(input_expr_bytes.as_ref())) .collect::, prost::DecodeError>>() - .map_err(|e| exec_datafusion_err!("Failed to decode PhysicalExprNode: {e}"))? + .map_err(|e| ffi_datafusion_err!("Failed to decode PhysicalExprNode: {e}"))? .iter() .map(|expr_node| { parse_physical_expr(expr_node, &default_ctx.task_ctx(), &schema, &codec) diff --git a/datafusion/ffi/src/util.rs b/datafusion/ffi/src/util.rs index 151464dc9745..640da7c04292 100644 --- a/datafusion/ffi/src/util.rs +++ b/datafusion/ffi/src/util.rs @@ -16,12 +16,19 @@ // under the License. use crate::arrow_wrappers::WrappedSchema; -use abi_stable::std_types::RVec; +use abi_stable::std_types::{RResult, RString, RVec}; use arrow::datatypes::Field; use arrow::{datatypes::DataType, ffi::FFI_ArrowSchema}; use arrow_schema::FieldRef; use std::sync::Arc; +/// Convenience type for results passed through the FFI boundary. Since the +/// `DataFusionError` enum is complex and little value is gained from creating +/// a FFI safe variant of it, we convert errors to strings when passing results +/// back. These are converted back and forth using the `df_result`, `rresult`, +/// and `rresult_return` macros. +pub type FFIResult = RResult; + /// This macro is a helpful conversion utility to convert from an abi_stable::RResult to a /// DataFusion result. #[macro_export] @@ -29,8 +36,8 @@ macro_rules! df_result { ( $x:expr ) => { match $x { abi_stable::std_types::RResult::ROk(v) => Ok(v), - abi_stable::std_types::RResult::RErr(e) => { - datafusion_common::exec_err!("FFI error: {}", e) + abi_stable::std_types::RResult::RErr(err) => { + datafusion_common::ffi_err!("{err}") } } }; @@ -118,10 +125,11 @@ pub fn rvec_wrapped_to_vec_datatype( #[cfg(test)] mod tests { + use crate::util::FFIResult; use abi_stable::std_types::{RResult, RString}; use datafusion::error::DataFusionError; - fn wrap_result(result: Result) -> RResult { + fn wrap_result(result: Result) -> FFIResult { RResult::ROk(rresult_return!(result)) } @@ -130,9 +138,9 @@ mod tests { const VALID_VALUE: &str = "valid_value"; const ERROR_VALUE: &str = "error_value"; - let ok_r_result: RResult = + let ok_r_result: FFIResult = RResult::ROk(VALID_VALUE.to_string().into()); - let err_r_result: RResult = + let err_r_result: FFIResult = RResult::RErr(ERROR_VALUE.to_string().into()); let returned_ok_result = df_result!(ok_r_result); @@ -143,12 +151,12 @@ mod tests { assert!(returned_err_result.is_err()); assert!( returned_err_result.unwrap_err().strip_backtrace() - == format!("Execution error: FFI error: {ERROR_VALUE}") + == format!("FFI error: {ERROR_VALUE}") ); let ok_result: Result = Ok(VALID_VALUE.to_string()); let err_result: Result = - datafusion_common::exec_err!("{ERROR_VALUE}"); + datafusion_common::ffi_err!("{ERROR_VALUE}"); let returned_ok_r_result = wrap_result(ok_result); assert!(returned_ok_r_result == RResult::ROk(VALID_VALUE.into())); @@ -157,6 +165,6 @@ mod tests { assert!(returned_err_r_result.is_err()); assert!(returned_err_r_result .unwrap_err() - .starts_with(format!("Execution error: {ERROR_VALUE}").as_str())); + .starts_with(format!("FFI error: {ERROR_VALUE}").as_str())); } }