-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Serde for ScalarUDF in Physical Expressions #9436
Changes from all commits
9ab42e2
8d7c866
ee9c36f
6ecfb11
6b154e0
ecde242
b33656a
1738529
93d9750
376905b
d8d1eb2
623e56c
1dfce4a
767b53d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ use arrow_array::Array; | |
use datafusion_common::{exec_err, Result, ScalarValue}; | ||
use datafusion_expr::execution_props::ExecutionProps; | ||
pub use datafusion_expr::FuncMonotonicity; | ||
use datafusion_expr::ScalarFunctionDefinition; | ||
use datafusion_expr::{ | ||
type_coercion::functions::data_types, BuiltinScalarFunction, ColumnarValue, | ||
ScalarFunctionImplementation, | ||
|
@@ -57,7 +58,7 @@ pub fn create_physical_expr( | |
fun: &BuiltinScalarFunction, | ||
input_phy_exprs: &[Arc<dyn PhysicalExpr>], | ||
input_schema: &Schema, | ||
execution_props: &ExecutionProps, | ||
_execution_props: &ExecutionProps, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. weird, seems like it never needed this param There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it doesn't need this param now, but before it needs this for BuiltInFunction to create physical function for some Date function like to_date. Since these functions has been ported, it will not need this param anymore. |
||
) -> Result<Arc<dyn PhysicalExpr>> { | ||
let input_expr_types = input_phy_exprs | ||
.iter() | ||
|
@@ -69,14 +70,12 @@ pub fn create_physical_expr( | |
|
||
let data_type = fun.return_type(&input_expr_types)?; | ||
|
||
let fun_expr: ScalarFunctionImplementation = | ||
create_physical_fun(fun, execution_props)?; | ||
|
||
let monotonicity = fun.monotonicity(); | ||
|
||
let fun_def = ScalarFunctionDefinition::BuiltIn(*fun); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I just pass the |
||
Ok(Arc::new(ScalarFunctionExpr::new( | ||
&format!("{fun}"), | ||
fun_expr, | ||
fun_def, | ||
input_phy_exprs.to_vec(), | ||
data_type, | ||
monotonicity, | ||
|
@@ -195,7 +194,6 @@ where | |
/// Create a physical scalar function. | ||
pub fn create_physical_fun( | ||
fun: &BuiltinScalarFunction, | ||
_execution_props: &ExecutionProps, | ||
) -> Result<ScalarFunctionImplementation> { | ||
Ok(match fun { | ||
// math functions | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,22 +34,22 @@ use std::fmt::{self, Debug, Formatter}; | |
use std::hash::{Hash, Hasher}; | ||
use std::sync::Arc; | ||
|
||
use crate::functions::out_ordering; | ||
use crate::functions::{create_physical_fun, out_ordering}; | ||
use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal}; | ||
use crate::sort_properties::SortProperties; | ||
use crate::PhysicalExpr; | ||
|
||
use arrow::datatypes::{DataType, Schema}; | ||
use arrow::record_batch::RecordBatch; | ||
use datafusion_common::Result; | ||
use datafusion_common::{internal_err, Result}; | ||
use datafusion_expr::{ | ||
expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity, | ||
ScalarFunctionImplementation, | ||
ScalarFunctionDefinition, | ||
}; | ||
|
||
/// Physical expression of a scalar function | ||
pub struct ScalarFunctionExpr { | ||
fun: ScalarFunctionImplementation, | ||
fun: ScalarFunctionDefinition, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, this should be |
||
name: String, | ||
args: Vec<Arc<dyn PhysicalExpr>>, | ||
return_type: DataType, | ||
|
@@ -79,7 +79,7 @@ impl ScalarFunctionExpr { | |
/// Create a new Scalar function | ||
pub fn new( | ||
name: &str, | ||
fun: ScalarFunctionImplementation, | ||
fun: ScalarFunctionDefinition, | ||
args: Vec<Arc<dyn PhysicalExpr>>, | ||
return_type: DataType, | ||
monotonicity: Option<FuncMonotonicity>, | ||
|
@@ -96,7 +96,7 @@ impl ScalarFunctionExpr { | |
} | ||
|
||
/// Get the scalar function implementation | ||
pub fn fun(&self) -> &ScalarFunctionImplementation { | ||
pub fn fun(&self) -> &ScalarFunctionDefinition { | ||
&self.fun | ||
} | ||
|
||
|
@@ -172,8 +172,18 @@ impl PhysicalExpr for ScalarFunctionExpr { | |
}; | ||
|
||
// evaluate the function | ||
let fun = self.fun.as_ref(); | ||
(fun)(&inputs) | ||
match self.fun { | ||
ScalarFunctionDefinition::BuiltIn(ref fun) => { | ||
let fun = create_physical_fun(fun)?; | ||
(fun)(&inputs) | ||
} | ||
ScalarFunctionDefinition::UDF(ref fun) => fun.invoke(&inputs), | ||
ScalarFunctionDefinition::Name(_) => { | ||
internal_err!( | ||
"Name function must be resolved to one of the other variants prior to physical planning" | ||
) | ||
} | ||
} | ||
} | ||
|
||
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> { | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since here
ScalarFunctionExpr
needs aScalarFunctionDefinition
, I created one for each. Don't know whether there's a better way to do it.