diff --git a/datafusion/ffi/src/physical_optimizer.rs b/datafusion/ffi/src/physical_optimizer.rs index 84dc40ce8f46c..4d623aca77e96 100644 --- a/datafusion/ffi/src/physical_optimizer.rs +++ b/datafusion/ffi/src/physical_optimizer.rs @@ -21,8 +21,9 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; -use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::{PhysicalOptimizerContext, PhysicalOptimizerRule}; use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::operator_statistics::StatisticsRegistry; use stabby::string::String as SString; use tokio::runtime::Handle; @@ -31,6 +32,99 @@ use crate::execution_plan::FFI_ExecutionPlan; use crate::util::FFI_Result; use crate::{df_result, sresult_return}; +/// A stable struct for sharing [`PhysicalOptimizerContext`] across FFI boundaries. +/// +/// This provides access to configuration options and an optional statistics registry +/// for optimizer rules that need extended context. +#[repr(C)] +#[derive(Debug)] +pub struct FFI_PhysicalOptimizerContext { + pub config_options: + unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) -> FFI_ConfigOptions, + + /// Returns true if a statistics registry is available. + pub has_statistics_registry: + unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) -> bool, + + /// Release the memory of the private data. + pub release: unsafe extern "C" fn(&mut FFI_PhysicalOptimizerContext), + + /// Internal data. Only accessed by the provider. + pub private_data: *const c_void, +} + +unsafe impl Send for FFI_PhysicalOptimizerContext {} +unsafe impl Sync for FFI_PhysicalOptimizerContext {} + +struct OptimizerContextPrivateData { + config: ConfigOptions, + statistics_registry: Option, +} + +impl FFI_PhysicalOptimizerContext { + pub fn new(context: &dyn PhysicalOptimizerContext) -> Self { + let private_data = Box::new(OptimizerContextPrivateData { + config: context.config_options().clone(), + statistics_registry: context.statistics_registry().cloned(), + }); + let private_data = Box::into_raw(private_data) as *const c_void; + + Self { + config_options: context_config_options_fn, + has_statistics_registry: context_has_statistics_registry_fn, + release: context_release_fn, + private_data, + } + } + + fn inner(&self) -> &OptimizerContextPrivateData { + unsafe { &*(self.private_data as *const OptimizerContextPrivateData) } + } +} + +impl Drop for FFI_PhysicalOptimizerContext { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +unsafe extern "C" fn context_config_options_fn( + ctx: &FFI_PhysicalOptimizerContext, +) -> FFI_ConfigOptions { + FFI_ConfigOptions::from(&ctx.inner().config) +} + +unsafe extern "C" fn context_has_statistics_registry_fn( + ctx: &FFI_PhysicalOptimizerContext, +) -> bool { + ctx.inner().statistics_registry.is_some() +} + +unsafe extern "C" fn context_release_fn(ctx: &mut FFI_PhysicalOptimizerContext) { + if !ctx.private_data.is_null() { + unsafe { + let _ = Box::from_raw(ctx.private_data as *mut OptimizerContextPrivateData); + } + ctx.private_data = std::ptr::null(); + } +} + +/// Reconstructed [`PhysicalOptimizerContext`] on the consumer side of FFI. +struct ForeignOptimizerContext { + config: ConfigOptions, + statistics_registry: Option, +} + +impl PhysicalOptimizerContext for ForeignOptimizerContext { + fn config_options(&self) -> &ConfigOptions { + &self.config + } + + fn statistics_registry(&self) -> Option<&StatisticsRegistry> { + self.statistics_registry.as_ref() + } +} + /// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries. #[repr(C)] #[derive(Debug)] @@ -41,6 +135,12 @@ pub struct FFI_PhysicalOptimizerRule { config: FFI_ConfigOptions, ) -> FFI_Result, + pub optimize_with_context: unsafe extern "C" fn( + &Self, + plan: &FFI_ExecutionPlan, + context: &FFI_PhysicalOptimizerContext, + ) -> FFI_Result, + pub name: unsafe extern "C" fn(&Self) -> SString, pub schema_check: unsafe extern "C" fn(&Self) -> bool, @@ -98,6 +198,38 @@ unsafe extern "C" fn optimize_fn_wrapper( FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime)) } +unsafe extern "C" fn optimize_with_context_fn_wrapper( + rule: &FFI_PhysicalOptimizerRule, + plan: &FFI_ExecutionPlan, + context: &FFI_PhysicalOptimizerContext, +) -> FFI_Result { + let runtime = rule.runtime(); + let inner = rule.inner(); + let plan: Arc = sresult_return!(plan.try_into()); + let config = sresult_return!(ConfigOptions::try_from(unsafe { + (context.config_options)(context) + })); + let has_registry = unsafe { (context.has_statistics_registry)(context) }; + let registry = if has_registry { + Some( + context + .inner() + .statistics_registry + .clone() + .unwrap_or_default(), + ) + } else { + None + }; + let foreign_ctx = ForeignOptimizerContext { + config, + statistics_registry: registry, + }; + let optimized_plan = sresult_return!(inner.optimize_with_context(plan, &foreign_ctx)); + + FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime)) +} + unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> SString { let rule = rule.inner(); rule.name().into() @@ -127,6 +259,7 @@ unsafe extern "C" fn clone_fn_wrapper( FFI_PhysicalOptimizerRule { optimize: optimize_fn_wrapper, + optimize_with_context: optimize_with_context_fn_wrapper, name: name_fn_wrapper, schema_check: schema_check_fn_wrapper, clone: clone_fn_wrapper, @@ -160,6 +293,7 @@ impl FFI_PhysicalOptimizerRule { Self { optimize: optimize_fn_wrapper, + optimize_with_context: optimize_with_context_fn_wrapper, name: name_fn_wrapper, schema_check: schema_check_fn_wrapper, clone: clone_fn_wrapper, @@ -220,6 +354,24 @@ impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule { (&optimized_plan).try_into() } + fn optimize_with_context( + &self, + plan: Arc, + context: &dyn PhysicalOptimizerContext, + ) -> Result> { + let ffi_context = FFI_PhysicalOptimizerContext::new(context); + let plan = FFI_ExecutionPlan::new(plan, None); + + let optimized_plan = unsafe { + df_result!((self.rule.optimize_with_context)( + &self.rule, + &plan, + &ffi_context + ))? + }; + (&optimized_plan).try_into() + } + fn name(&self) -> &str { &self.name } @@ -236,8 +388,11 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; - use datafusion_physical_optimizer::PhysicalOptimizerRule; + use datafusion_physical_optimizer::{ + ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule, + }; use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::operator_statistics::StatisticsRegistry; use super::*; use crate::execution_plan::tests::EmptyExec; @@ -265,6 +420,39 @@ mod tests { } } + /// A rule that returns an error from `optimize` but succeeds when + /// called via `optimize_with_context`, proving the context path is taken. + #[derive(Debug)] + struct ContextAwareRule; + + impl PhysicalOptimizerRule for ContextAwareRule { + fn optimize( + &self, + _plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + Err(datafusion_common::DataFusionError::Plan( + "optimize should not be called directly".to_string(), + )) + } + + fn optimize_with_context( + &self, + plan: Arc, + _context: &dyn PhysicalOptimizerContext, + ) -> Result> { + Ok(plan) + } + + fn name(&self) -> &str { + "context_aware_rule" + } + + fn schema_check(&self) -> bool { + true + } + } + fn create_test_plan() -> Arc { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); @@ -374,4 +562,63 @@ mod tests { Ok(()) } + + #[test] + fn test_optimize_with_context_round_trip() -> Result<()> { + let rule: Arc = + Arc::new(ContextAwareRule); + + let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None); + ffi_rule.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_rule: Arc = + (&ffi_rule).into(); + + let plan = create_test_plan(); + let config = ConfigOptions::new(); + let context = ConfigOnlyContext::new(&config); + + let optimized = foreign_rule.optimize_with_context(plan, &context)?; + assert_eq!(optimized.name(), "empty-exec"); + + Ok(()) + } + + #[test] + fn test_optimize_with_context_with_registry() -> Result<()> { + let rule: Arc = + Arc::new(ContextAwareRule); + + let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None); + ffi_rule.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_rule: Arc = + (&ffi_rule).into(); + + struct ContextWithRegistry { + config: ConfigOptions, + registry: StatisticsRegistry, + } + + impl PhysicalOptimizerContext for ContextWithRegistry { + fn config_options(&self) -> &ConfigOptions { + &self.config + } + + fn statistics_registry(&self) -> Option<&StatisticsRegistry> { + Some(&self.registry) + } + } + + let ctx = ContextWithRegistry { + config: ConfigOptions::new(), + registry: StatisticsRegistry::default_with_builtin_providers(), + }; + + let plan = create_test_plan(); + let optimized = foreign_rule.optimize_with_context(plan, &ctx)?; + assert_eq!(optimized.name(), "empty-exec"); + + Ok(()) + } }