Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 249 additions & 2 deletions datafusion/ffi/src/physical_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{PhysicalOptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
use stabby::string::String as SString;
use tokio::runtime::Handle;

Expand All @@ -31,6 +32,99 @@ use crate::execution_plan::FFI_ExecutionPlan;
use crate::util::FFI_Result;
use crate::{df_result, sresult_return};

/// A stable struct for sharing [`PhysicalOptimizerContext`] across FFI boundaries.
///
/// This provides access to configuration options and an optional statistics registry
/// for optimizer rules that need extended context.
#[repr(C)]
#[derive(Debug)]
pub struct FFI_PhysicalOptimizerContext {
pub config_options:
unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) -> FFI_ConfigOptions,

/// Returns true if a statistics registry is available.
pub has_statistics_registry:
unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) -> bool,

/// Release the memory of the private data.
pub release: unsafe extern "C" fn(&mut FFI_PhysicalOptimizerContext),

/// Internal data. Only accessed by the provider.
pub private_data: *const c_void,
}

unsafe impl Send for FFI_PhysicalOptimizerContext {}
unsafe impl Sync for FFI_PhysicalOptimizerContext {}

struct OptimizerContextPrivateData {
config: ConfigOptions,
statistics_registry: Option<StatisticsRegistry>,
}

impl FFI_PhysicalOptimizerContext {
pub fn new(context: &dyn PhysicalOptimizerContext) -> Self {
let private_data = Box::new(OptimizerContextPrivateData {
config: context.config_options().clone(),
statistics_registry: context.statistics_registry().cloned(),
});
let private_data = Box::into_raw(private_data) as *const c_void;

Self {
config_options: context_config_options_fn,
has_statistics_registry: context_has_statistics_registry_fn,
release: context_release_fn,
private_data,
}
}

fn inner(&self) -> &OptimizerContextPrivateData {
unsafe { &*(self.private_data as *const OptimizerContextPrivateData) }
}
}

impl Drop for FFI_PhysicalOptimizerContext {
fn drop(&mut self) {
unsafe { (self.release)(self) }
}
}

unsafe extern "C" fn context_config_options_fn(
ctx: &FFI_PhysicalOptimizerContext,
) -> FFI_ConfigOptions {
FFI_ConfigOptions::from(&ctx.inner().config)
}

unsafe extern "C" fn context_has_statistics_registry_fn(
ctx: &FFI_PhysicalOptimizerContext,
) -> bool {
ctx.inner().statistics_registry.is_some()
}

unsafe extern "C" fn context_release_fn(ctx: &mut FFI_PhysicalOptimizerContext) {
if !ctx.private_data.is_null() {
unsafe {
let _ = Box::from_raw(ctx.private_data as *mut OptimizerContextPrivateData);
}
ctx.private_data = std::ptr::null();
}
}

/// Reconstructed [`PhysicalOptimizerContext`] on the consumer side of FFI.
struct ForeignOptimizerContext {
config: ConfigOptions,
statistics_registry: Option<StatisticsRegistry>,
}

impl PhysicalOptimizerContext for ForeignOptimizerContext {
fn config_options(&self) -> &ConfigOptions {
&self.config
}

fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
self.statistics_registry.as_ref()
}
}

/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries.
#[repr(C)]
#[derive(Debug)]
Expand All @@ -41,6 +135,12 @@ pub struct FFI_PhysicalOptimizerRule {
config: FFI_ConfigOptions,
) -> FFI_Result<FFI_ExecutionPlan>,

pub optimize_with_context: unsafe extern "C" fn(
&Self,
plan: &FFI_ExecutionPlan,
context: &FFI_PhysicalOptimizerContext,
) -> FFI_Result<FFI_ExecutionPlan>,

pub name: unsafe extern "C" fn(&Self) -> SString,

pub schema_check: unsafe extern "C" fn(&Self) -> bool,
Expand Down Expand Up @@ -98,6 +198,38 @@ unsafe extern "C" fn optimize_fn_wrapper(
FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
}

unsafe extern "C" fn optimize_with_context_fn_wrapper(
rule: &FFI_PhysicalOptimizerRule,
plan: &FFI_ExecutionPlan,
context: &FFI_PhysicalOptimizerContext,
) -> FFI_Result<FFI_ExecutionPlan> {
let runtime = rule.runtime();
let inner = rule.inner();
let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into());
let config = sresult_return!(ConfigOptions::try_from(unsafe {
(context.config_options)(context)
}));
let has_registry = unsafe { (context.has_statistics_registry)(context) };
let registry = if has_registry {
Some(
context
.inner()
.statistics_registry
.clone()
.unwrap_or_default(),
)
} else {
None
};
let foreign_ctx = ForeignOptimizerContext {
config,
statistics_registry: registry,
};
let optimized_plan = sresult_return!(inner.optimize_with_context(plan, &foreign_ctx));

FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
}

unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> SString {
let rule = rule.inner();
rule.name().into()
Expand Down Expand Up @@ -127,6 +259,7 @@ unsafe extern "C" fn clone_fn_wrapper(

FFI_PhysicalOptimizerRule {
optimize: optimize_fn_wrapper,
optimize_with_context: optimize_with_context_fn_wrapper,
name: name_fn_wrapper,
schema_check: schema_check_fn_wrapper,
clone: clone_fn_wrapper,
Expand Down Expand Up @@ -160,6 +293,7 @@ impl FFI_PhysicalOptimizerRule {

Self {
optimize: optimize_fn_wrapper,
optimize_with_context: optimize_with_context_fn_wrapper,
name: name_fn_wrapper,
schema_check: schema_check_fn_wrapper,
clone: clone_fn_wrapper,
Expand Down Expand Up @@ -220,6 +354,24 @@ impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
(&optimized_plan).try_into()
}

fn optimize_with_context(
&self,
plan: Arc<dyn ExecutionPlan>,
context: &dyn PhysicalOptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let ffi_context = FFI_PhysicalOptimizerContext::new(context);
let plan = FFI_ExecutionPlan::new(plan, None);

let optimized_plan = unsafe {
df_result!((self.rule.optimize_with_context)(
&self.rule,
&plan,
&ffi_context
))?
};
(&optimized_plan).try_into()
}

fn name(&self) -> &str {
&self.name
}
Expand All @@ -236,8 +388,11 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{
ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule,
};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;

use super::*;
use crate::execution_plan::tests::EmptyExec;
Expand Down Expand Up @@ -265,6 +420,39 @@ mod tests {
}
}

/// A rule that returns an error from `optimize` but succeeds when
/// called via `optimize_with_context`, proving the context path is taken.
#[derive(Debug)]
struct ContextAwareRule;

impl PhysicalOptimizerRule for ContextAwareRule {
fn optimize(
&self,
_plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(datafusion_common::DataFusionError::Plan(
"optimize should not be called directly".to_string(),
))
}

fn optimize_with_context(
&self,
plan: Arc<dyn ExecutionPlan>,
_context: &dyn PhysicalOptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(plan)
}

fn name(&self) -> &str {
"context_aware_rule"
}

fn schema_check(&self) -> bool {
true
}
}

fn create_test_plan() -> Arc<dyn ExecutionPlan> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
Expand Down Expand Up @@ -374,4 +562,63 @@ mod tests {

Ok(())
}

#[test]
fn test_optimize_with_context_round_trip() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(ContextAwareRule);

let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;

let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
(&ffi_rule).into();

let plan = create_test_plan();
let config = ConfigOptions::new();
let context = ConfigOnlyContext::new(&config);

let optimized = foreign_rule.optimize_with_context(plan, &context)?;
assert_eq!(optimized.name(), "empty-exec");

Ok(())
}

#[test]
fn test_optimize_with_context_with_registry() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(ContextAwareRule);

let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;

let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
(&ffi_rule).into();

struct ContextWithRegistry {
config: ConfigOptions,
registry: StatisticsRegistry,
}

impl PhysicalOptimizerContext for ContextWithRegistry {
fn config_options(&self) -> &ConfigOptions {
&self.config
}

fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
Some(&self.registry)
}
}

let ctx = ContextWithRegistry {
config: ConfigOptions::new(),
registry: StatisticsRegistry::default_with_builtin_providers(),
};

let plan = create_test_plan();
let optimized = foreign_rule.optimize_with_context(plan, &ctx)?;
assert_eq!(optimized.name(), "empty-exec");

Ok(())
}
}
Loading