From b8b7e5835a5f598c161a5933ca92e0e861c55833 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 5 Jul 2024 17:40:23 -0400 Subject: [PATCH 1/3] Add user_defined_sql_planners(..) to FunctionRegistry --- datafusion/core/src/execution/context/mod.rs | 4 ++++ datafusion/core/src/execution/session_state.rs | 4 ++++ datafusion/execution/src/task.rs | 10 +++++++--- datafusion/expr/src/registry.rs | 7 +++++++ datafusion/proto/src/bytes/mod.rs | 5 +++++ datafusion/proto/src/bytes/registry.rs | 5 +++++ 6 files changed, 32 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 4685f194fe29..4b7e7438b5e2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1392,6 +1392,10 @@ impl FunctionRegistry for SessionContext { self.state.write().register_function_rewrite(rewrite) } + fn user_defined_sql_planners(&self) -> Vec> { + self.state.read().user_defined_sql_planners() + } + fn register_user_defined_sql_planner( &mut self, user_defined_sql_planner: Arc, diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index a831f92def50..a397a5c959c5 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -1181,6 +1181,10 @@ impl FunctionRegistry for SessionState { Ok(()) } + fn user_defined_sql_planners(&self) -> Vec> { + self.user_defined_sql_planners.clone() + } + fn register_user_defined_sql_planner( &mut self, user_defined_sql_planner: Arc, diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index c21ce3d21da1..a80b4ffc6f3b 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -20,15 +20,15 @@ use std::{ sync::Arc, }; -use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; -use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; - use crate::{ config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry, runtime_env::{RuntimeConfig, RuntimeEnv}, }; +use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; +use datafusion_expr::planner::UserDefinedSQLPlanner; +use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; /// Task Execution Context /// @@ -191,6 +191,10 @@ impl FunctionRegistry for TaskContext { }); Ok(self.scalar_functions.insert(udf.name().into(), udf)) } + + fn user_defined_sql_planners(&self) -> Vec> { + vec![] + } } #[cfg(test)] diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index c276fe30f897..64616d9b33f7 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -110,6 +110,9 @@ pub trait FunctionRegistry { not_impl_err!("Registering FunctionRewrite") } + /// Set of all registered [`UserDefinedSQLPlanner`]s + fn user_defined_sql_planners(&self) -> Vec>; + /// Registers a new [`UserDefinedSQLPlanner`] with the registry. fn register_user_defined_sql_planner( &mut self, @@ -192,4 +195,8 @@ impl FunctionRegistry for MemoryFunctionRegistry { fn register_udwf(&mut self, udaf: Arc) -> Result>> { Ok(self.udwfs.insert(udaf.name().into(), udaf)) } + + fn user_defined_sql_planners(&self) -> Vec> { + vec![] + } } diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 901aa2455e16..020f60d38992 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -39,6 +39,7 @@ use std::sync::Arc; use datafusion::execution::registry::FunctionRegistry; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; +use datafusion_expr::planner::UserDefinedSQLPlanner; mod registry; @@ -165,6 +166,10 @@ impl Serializeable for Expr { "register_udwf called in Placeholder Registry!" ) } + + fn user_defined_sql_planners(&self) -> Vec> { + vec![] + } } Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?; diff --git a/datafusion/proto/src/bytes/registry.rs b/datafusion/proto/src/bytes/registry.rs index 4bf2bb3d7b79..a952b147b2b7 100644 --- a/datafusion/proto/src/bytes/registry.rs +++ b/datafusion/proto/src/bytes/registry.rs @@ -20,6 +20,7 @@ use std::{collections::HashSet, sync::Arc}; use datafusion::execution::registry::FunctionRegistry; use datafusion_common::plan_err; use datafusion_common::Result; +use datafusion_expr::planner::UserDefinedSQLPlanner; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; /// A default [`FunctionRegistry`] registry that does not resolve any @@ -54,4 +55,8 @@ impl FunctionRegistry for NoRegistry { fn register_udwf(&mut self, udwf: Arc) -> Result>> { plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Window Function '{}'", udwf.inner().name()) } + + fn user_defined_sql_planners(&self) -> Vec> { + vec![] + } } From 6f27fea4425d38d969e44a5ed42c9f85f2caeb61 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 5 Jul 2024 17:50:26 -0400 Subject: [PATCH 2/3] Adding simple test for user_defined_sql_planners --- .../user_defined/user_defined_scalar_functions.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 5e3c44c039ab..36f5c16fefe2 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -571,6 +571,17 @@ async fn test_user_defined_functions_cast_to_i64() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_user_defined_sql_functions() -> Result<()> { + let ctx = SessionContext::new(); + + let sql_planners = ctx.user_defined_sql_planners(); + + assert!(!sql_planners.is_empty()); + + Ok(()) +} + #[tokio::test] async fn deregister_udf() -> Result<()> { let cast2i64 = ScalarUDF::from(CastToI64UDF::new()); From eaeca462a4e21c0b37a2f1fb21338e4376b31c1c Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sat, 6 Jul 2024 10:09:47 -0400 Subject: [PATCH 3/3] Renamed user_defined_sql_planners to expr_planners --- datafusion/core/src/execution/context/mod.rs | 4 ++-- datafusion/core/src/execution/session_state.rs | 2 +- .../core/tests/user_defined/user_defined_scalar_functions.rs | 2 +- datafusion/execution/src/task.rs | 2 +- datafusion/expr/src/registry.rs | 4 ++-- datafusion/proto/src/bytes/mod.rs | 2 +- datafusion/proto/src/bytes/registry.rs | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 4b7e7438b5e2..04debf498aa9 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1392,8 +1392,8 @@ impl FunctionRegistry for SessionContext { self.state.write().register_function_rewrite(rewrite) } - fn user_defined_sql_planners(&self) -> Vec> { - self.state.read().user_defined_sql_planners() + fn expr_planners(&self) -> Vec> { + self.state.read().expr_planners() } fn register_user_defined_sql_planner( diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index cbddb091a3cc..ad557b12255c 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -1183,7 +1183,7 @@ impl FunctionRegistry for SessionState { Ok(()) } - fn user_defined_sql_planners(&self) -> Vec> { + fn expr_planners(&self) -> Vec> { self.user_defined_sql_planners.clone() } diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 36f5c16fefe2..ae8a009c6292 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -575,7 +575,7 @@ async fn test_user_defined_functions_cast_to_i64() -> Result<()> { async fn test_user_defined_sql_functions() -> Result<()> { let ctx = SessionContext::new(); - let sql_planners = ctx.user_defined_sql_planners(); + let sql_planners = ctx.expr_planners(); assert!(!sql_planners.is_empty()); diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index a80b4ffc6f3b..24d61e6a8b72 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -192,7 +192,7 @@ impl FunctionRegistry for TaskContext { Ok(self.scalar_functions.insert(udf.name().into(), udf)) } - fn user_defined_sql_planners(&self) -> Vec> { + fn expr_planners(&self) -> Vec> { vec![] } } diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 64616d9b33f7..6a27c05bb451 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -111,7 +111,7 @@ pub trait FunctionRegistry { } /// Set of all registered [`UserDefinedSQLPlanner`]s - fn user_defined_sql_planners(&self) -> Vec>; + fn expr_planners(&self) -> Vec>; /// Registers a new [`UserDefinedSQLPlanner`] with the registry. fn register_user_defined_sql_planner( @@ -196,7 +196,7 @@ impl FunctionRegistry for MemoryFunctionRegistry { Ok(self.udwfs.insert(udaf.name().into(), udaf)) } - fn user_defined_sql_planners(&self) -> Vec> { + fn expr_planners(&self) -> Vec> { vec![] } } diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 020f60d38992..83210cb4e41f 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -167,7 +167,7 @@ impl Serializeable for Expr { ) } - fn user_defined_sql_planners(&self) -> Vec> { + fn expr_planners(&self) -> Vec> { vec![] } } diff --git a/datafusion/proto/src/bytes/registry.rs b/datafusion/proto/src/bytes/registry.rs index a952b147b2b7..075993e2ba76 100644 --- a/datafusion/proto/src/bytes/registry.rs +++ b/datafusion/proto/src/bytes/registry.rs @@ -56,7 +56,7 @@ impl FunctionRegistry for NoRegistry { plan_err!("No function registry provided to deserialize, so can not deserialize User Defined Window Function '{}'", udwf.inner().name()) } - fn user_defined_sql_planners(&self) -> Vec> { + fn expr_planners(&self) -> Vec> { vec![] } }