diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 49ca79a00496..1e423c367cd8 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -512,39 +512,35 @@ macro_rules! invoke_if_unicode_expressions_feature_flag { }; } -/// Create a physical (function) expression. -/// This function errors when `args`' can't be coerced to a valid argument type of the function. -pub fn create_physical_expr( +/// Create a physical scalar function. +pub fn create_physical_fun( fun: &BuiltinScalarFunction, - args: &[Arc], - input_schema: &Schema, ctx_state: &ExecutionContextState, -) -> Result> { - let fun_expr: ScalarFunctionImplementation = Arc::new(match fun { +) -> Result { + Ok(match fun { // math functions - BuiltinScalarFunction::Abs => math_expressions::abs, - BuiltinScalarFunction::Acos => math_expressions::acos, - BuiltinScalarFunction::Asin => math_expressions::asin, - BuiltinScalarFunction::Atan => math_expressions::atan, - BuiltinScalarFunction::Ceil => math_expressions::ceil, - BuiltinScalarFunction::Cos => math_expressions::cos, - BuiltinScalarFunction::Exp => math_expressions::exp, - BuiltinScalarFunction::Floor => math_expressions::floor, - BuiltinScalarFunction::Log => math_expressions::log10, - BuiltinScalarFunction::Ln => math_expressions::ln, - BuiltinScalarFunction::Log10 => math_expressions::log10, - BuiltinScalarFunction::Log2 => math_expressions::log2, - BuiltinScalarFunction::Random => math_expressions::random, - BuiltinScalarFunction::Round => math_expressions::round, - BuiltinScalarFunction::Signum => math_expressions::signum, - BuiltinScalarFunction::Sin => math_expressions::sin, - BuiltinScalarFunction::Sqrt => math_expressions::sqrt, - BuiltinScalarFunction::Tan => math_expressions::tan, - BuiltinScalarFunction::Trunc => math_expressions::trunc, - + BuiltinScalarFunction::Abs => Arc::new(math_expressions::abs), + BuiltinScalarFunction::Acos => Arc::new(math_expressions::acos), + BuiltinScalarFunction::Asin => Arc::new(math_expressions::asin), + BuiltinScalarFunction::Atan => Arc::new(math_expressions::atan), + BuiltinScalarFunction::Ceil => Arc::new(math_expressions::ceil), + BuiltinScalarFunction::Cos => Arc::new(math_expressions::cos), + BuiltinScalarFunction::Exp => Arc::new(math_expressions::exp), + BuiltinScalarFunction::Floor => Arc::new(math_expressions::floor), + BuiltinScalarFunction::Log => Arc::new(math_expressions::log10), + BuiltinScalarFunction::Ln => Arc::new(math_expressions::ln), + BuiltinScalarFunction::Log10 => Arc::new(math_expressions::log10), + BuiltinScalarFunction::Log2 => Arc::new(math_expressions::log2), + BuiltinScalarFunction::Random => Arc::new(math_expressions::random), + BuiltinScalarFunction::Round => Arc::new(math_expressions::round), + BuiltinScalarFunction::Signum => Arc::new(math_expressions::signum), + BuiltinScalarFunction::Sin => Arc::new(math_expressions::sin), + BuiltinScalarFunction::Sqrt => Arc::new(math_expressions::sqrt), + BuiltinScalarFunction::Tan => Arc::new(math_expressions::tan), + BuiltinScalarFunction::Trunc => Arc::new(math_expressions::trunc), // string functions - BuiltinScalarFunction::Array => array_expressions::array, - BuiltinScalarFunction::Ascii => |args| match args[0].data_type() { + BuiltinScalarFunction::Array => Arc::new(array_expressions::array), + BuiltinScalarFunction::Ascii => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::ascii::)(args) } @@ -555,8 +551,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function ascii", other, ))), - }, - BuiltinScalarFunction::BitLength => |args| match &args[0] { + }), + BuiltinScalarFunction::BitLength => Arc::new(|args| match &args[0] { ColumnarValue::Array(v) => Ok(ColumnarValue::Array(bit_length(v.as_ref())?)), ColumnarValue::Scalar(v) => match v { ScalarValue::Utf8(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int32( @@ -567,8 +563,8 @@ pub fn create_physical_expr( )), _ => unreachable!(), }, - }, - BuiltinScalarFunction::Btrim => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Btrim => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::btrim::)(args) } @@ -579,55 +575,47 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function btrim", other, ))), - }, - BuiltinScalarFunction::CharacterLength => |args| match args[0].data_type() { - DataType::Utf8 => { - let func = invoke_if_unicode_expressions_feature_flag!( - character_length, - Int32Type, - "character_length" - ); - make_scalar_function(func)(args) - } - DataType::LargeUtf8 => { - let func = invoke_if_unicode_expressions_feature_flag!( - character_length, - Int64Type, - "character_length" - ); - make_scalar_function(func)(args) - } - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for function character_length", - other, - ))), - }, + }), + BuiltinScalarFunction::CharacterLength => { + Arc::new(|args| match args[0].data_type() { + DataType::Utf8 => { + let func = invoke_if_unicode_expressions_feature_flag!( + character_length, + Int32Type, + "character_length" + ); + make_scalar_function(func)(args) + } + DataType::LargeUtf8 => { + let func = invoke_if_unicode_expressions_feature_flag!( + character_length, + Int64Type, + "character_length" + ); + make_scalar_function(func)(args) + } + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function character_length", + other, + ))), + }) + } BuiltinScalarFunction::Chr => { - |args| make_scalar_function(string_expressions::chr)(args) + Arc::new(|args| make_scalar_function(string_expressions::chr)(args)) } - BuiltinScalarFunction::Concat => string_expressions::concat, + BuiltinScalarFunction::Concat => Arc::new(string_expressions::concat), BuiltinScalarFunction::ConcatWithSeparator => { - |args| make_scalar_function(string_expressions::concat_ws)(args) + Arc::new(|args| make_scalar_function(string_expressions::concat_ws)(args)) } - BuiltinScalarFunction::DatePart => datetime_expressions::date_part, - BuiltinScalarFunction::DateTrunc => datetime_expressions::date_trunc, + BuiltinScalarFunction::DatePart => Arc::new(datetime_expressions::date_part), + BuiltinScalarFunction::DateTrunc => Arc::new(datetime_expressions::date_trunc), BuiltinScalarFunction::Now => { // bind value for now at plan time - let fun_expr = Arc::new(datetime_expressions::make_now( + Arc::new(datetime_expressions::make_now( ctx_state.execution_props.query_execution_start_time, - )); - - // TODO refactor code to not return here, but instead fall through below - let args = vec![]; - let arg_types = vec![]; // has no args - return Ok(Arc::new(ScalarFunctionExpr::new( - &format!("{}", fun), - fun_expr, - args, - &return_type(fun, &arg_types)?, - ))); + )) } - BuiltinScalarFunction::InitCap => |args| match args[0].data_type() { + BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::initcap::)(args) } @@ -638,8 +626,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function initcap", other, ))), - }, - BuiltinScalarFunction::Left => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Left => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!(left, i32, "left"); make_scalar_function(func)(args) @@ -652,9 +640,9 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function left", other, ))), - }, - BuiltinScalarFunction::Lower => string_expressions::lower, - BuiltinScalarFunction::Lpad => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Lower => Arc::new(string_expressions::lower), + BuiltinScalarFunction::Lpad => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!(lpad, i32, "lpad"); make_scalar_function(func)(args) @@ -667,8 +655,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function lpad", other, ))), - }, - BuiltinScalarFunction::Ltrim => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Ltrim => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::ltrim::)(args) } @@ -679,12 +667,12 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function ltrim", other, ))), - }, + }), BuiltinScalarFunction::MD5 => { - invoke_if_crypto_expressions_feature_flag!(md5, "md5") + Arc::new(invoke_if_crypto_expressions_feature_flag!(md5, "md5")) } - BuiltinScalarFunction::NullIf => nullif_func, - BuiltinScalarFunction::OctetLength => |args| match &args[0] { + BuiltinScalarFunction::NullIf => Arc::new(nullif_func), + BuiltinScalarFunction::OctetLength => Arc::new(|args| match &args[0] { ColumnarValue::Array(v) => Ok(ColumnarValue::Array(length(v.as_ref())?)), ColumnarValue::Scalar(v) => match v { ScalarValue::Utf8(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int32( @@ -695,52 +683,56 @@ pub fn create_physical_expr( )), _ => unreachable!(), }, - }, - BuiltinScalarFunction::RegexpMatch => |args| match args[0].data_type() { - DataType::Utf8 => { - let func = invoke_if_regex_expressions_feature_flag!( - regexp_match, - i32, - "regexp_match" - ); - make_scalar_function(func)(args) - } - DataType::LargeUtf8 => { - let func = invoke_if_regex_expressions_feature_flag!( - regexp_match, - i64, - "regexp_match" - ); - make_scalar_function(func)(args) - } - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for function regexp_match", - other - ))), - }, - BuiltinScalarFunction::RegexpReplace => |args| match args[0].data_type() { - DataType::Utf8 => { - let func = invoke_if_regex_expressions_feature_flag!( - regexp_replace, - i32, - "regexp_replace" - ); - make_scalar_function(func)(args) - } - DataType::LargeUtf8 => { - let func = invoke_if_regex_expressions_feature_flag!( - regexp_replace, - i64, - "regexp_replace" - ); - make_scalar_function(func)(args) - } - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for function regexp_replace", - other, - ))), - }, - BuiltinScalarFunction::Repeat => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::RegexpMatch => { + Arc::new(|args| match args[0].data_type() { + DataType::Utf8 => { + let func = invoke_if_regex_expressions_feature_flag!( + regexp_match, + i32, + "regexp_match" + ); + make_scalar_function(func)(args) + } + DataType::LargeUtf8 => { + let func = invoke_if_regex_expressions_feature_flag!( + regexp_match, + i64, + "regexp_match" + ); + make_scalar_function(func)(args) + } + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function regexp_match", + other + ))), + }) + } + BuiltinScalarFunction::RegexpReplace => { + Arc::new(|args| match args[0].data_type() { + DataType::Utf8 => { + let func = invoke_if_regex_expressions_feature_flag!( + regexp_replace, + i32, + "regexp_replace" + ); + make_scalar_function(func)(args) + } + DataType::LargeUtf8 => { + let func = invoke_if_regex_expressions_feature_flag!( + regexp_replace, + i64, + "regexp_replace" + ); + make_scalar_function(func)(args) + } + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function regexp_replace", + other, + ))), + }) + } + BuiltinScalarFunction::Repeat => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::repeat::)(args) } @@ -751,8 +743,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function repeat", other, ))), - }, - BuiltinScalarFunction::Replace => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Replace => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::replace::)(args) } @@ -763,8 +755,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function replace", other, ))), - }, - BuiltinScalarFunction::Reverse => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Reverse => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!(reverse, i32, "reverse"); @@ -779,8 +771,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function reverse", other, ))), - }, - BuiltinScalarFunction::Right => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Right => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!(right, i32, "right"); @@ -795,8 +787,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function right", other, ))), - }, - BuiltinScalarFunction::Rpad => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Rpad => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!(rpad, i32, "rpad"); make_scalar_function(func)(args) @@ -809,8 +801,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function rpad", other, ))), - }, - BuiltinScalarFunction::Rtrim => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Rtrim => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::rtrim::)(args) } @@ -821,20 +813,20 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function rtrim", other, ))), - }, + }), BuiltinScalarFunction::SHA224 => { - invoke_if_crypto_expressions_feature_flag!(sha224, "sha224") + Arc::new(invoke_if_crypto_expressions_feature_flag!(sha224, "sha224")) } BuiltinScalarFunction::SHA256 => { - invoke_if_crypto_expressions_feature_flag!(sha256, "sha256") + Arc::new(invoke_if_crypto_expressions_feature_flag!(sha256, "sha256")) } BuiltinScalarFunction::SHA384 => { - invoke_if_crypto_expressions_feature_flag!(sha384, "sha384") + Arc::new(invoke_if_crypto_expressions_feature_flag!(sha384, "sha384")) } BuiltinScalarFunction::SHA512 => { - invoke_if_crypto_expressions_feature_flag!(sha512, "sha512") + Arc::new(invoke_if_crypto_expressions_feature_flag!(sha512, "sha512")) } - BuiltinScalarFunction::SplitPart => |args| match args[0].data_type() { + BuiltinScalarFunction::SplitPart => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::split_part::)(args) } @@ -845,8 +837,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function split_part", other, ))), - }, - BuiltinScalarFunction::StartsWith => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::StartsWith => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::starts_with::)(args) } @@ -857,8 +849,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function starts_with", other, ))), - }, - BuiltinScalarFunction::Strpos => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Strpos => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!( strpos, Int32Type, "strpos" @@ -875,8 +867,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function strpos", other, ))), - }, - BuiltinScalarFunction::Substr => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Substr => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!(substr, i32, "substr"); @@ -891,8 +883,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function substr", other, ))), - }, - BuiltinScalarFunction::ToHex => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::ToHex => Arc::new(|args| match args[0].data_type() { DataType::Int32 => { make_scalar_function(string_expressions::to_hex::)(args) } @@ -903,9 +895,11 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function to_hex", other, ))), - }, - BuiltinScalarFunction::ToTimestamp => datetime_expressions::to_timestamp, - BuiltinScalarFunction::Translate => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::ToTimestamp => { + Arc::new(datetime_expressions::to_timestamp) + } + BuiltinScalarFunction::Translate => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { let func = invoke_if_unicode_expressions_feature_flag!( translate, @@ -926,8 +920,8 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function translate", other, ))), - }, - BuiltinScalarFunction::Trim => |args| match args[0].data_type() { + }), + BuiltinScalarFunction::Trim => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::btrim::)(args) } @@ -938,10 +932,20 @@ pub fn create_physical_expr( "Unsupported data type {:?} for function trim", other, ))), - }, - BuiltinScalarFunction::Upper => string_expressions::upper, - }); - // coerce + }), + BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper), + }) +} + +/// Create a physical (function) expression. +/// This function errors when `args`' can't be coerced to a valid argument type of the function. +pub fn create_physical_expr( + fun: &BuiltinScalarFunction, + args: &[Arc], + input_schema: &Schema, + ctx_state: &ExecutionContextState, +) -> Result> { + let fun_expr = create_physical_fun(fun, ctx_state)?; let args = coerce(args, input_schema, &signature(fun))?; let arg_types = args