diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index a0b180bf4020..0582fe0aed20 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -473,6 +473,11 @@ impl SessionConfig { self.options.execution.enforce_batch_size_in_joins } + pub fn with_enable_ansi_mode(mut self, enable_ansi_mode: bool) -> Self { + self.options_mut().execution.enable_ansi_mode = enable_ansi_mode; + self + } + /// Convert configuration options to name-value pairs with values /// converted to strings. /// diff --git a/datafusion/functions/src/math/abs.rs b/datafusion/functions/src/math/abs.rs index 35d0f3eccf57..c7af424a120a 100644 --- a/datafusion/functions/src/math/abs.rs +++ b/datafusion/functions/src/math/abs.rs @@ -50,6 +50,7 @@ macro_rules! make_abs_function { }}; } +#[macro_export] macro_rules! make_try_abs_function { ($ARRAY_TYPE:ident) => {{ |input: &ArrayRef| { @@ -62,7 +63,8 @@ macro_rules! make_try_abs_function { x )) }) - })?; + }) + .and_then(|v| Ok(v.with_data_type(input.data_type().clone())))?; // maintain decimal's precision and scale Ok(Arc::new(res) as ArrayRef) } }}; diff --git a/datafusion/spark/src/function/math/abs.rs b/datafusion/spark/src/function/math/abs.rs index f48f8964c28c..5602d3d3b061 100644 --- a/datafusion/spark/src/function/math/abs.rs +++ b/datafusion/spark/src/function/math/abs.rs @@ -17,12 +17,14 @@ use arrow::array::*; use arrow::datatypes::DataType; +use arrow::error::ArrowError; use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::{ - downcast_named_arg, make_abs_function, make_wrapping_abs_function, + downcast_named_arg, make_abs_function, make_try_abs_function, + make_wrapping_abs_function, }; use std::any::Any; use std::sync::Arc; @@ -33,8 +35,9 @@ use std::sync::Arc; /// Returns the absolute value of input /// Returns NULL if input is NULL, returns NaN if input is NaN. /// -/// TODOs: +/// Differences with DataFusion abs: /// - Spark's ANSI-compliant dialect, when off (i.e. `spark.sql.ansi.enabled=false`), taking absolute value on the minimal value of a signed integer returns the value as is. DataFusion's abs throws "DataFusion error: Arrow error: Compute error" on arithmetic overflow +/// TODOs: /// - Spark's abs also supports ANSI interval types: YearMonthIntervalType and DayTimeIntervalType. DataFusion's abs doesn't. /// #[derive(Debug, PartialEq, Eq, Hash)] @@ -74,19 +77,39 @@ impl ScalarUDFImpl for SparkAbs { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - spark_abs(&args.args) + spark_abs(&args.args, args.config_options.execution.enable_ansi_mode) } } macro_rules! scalar_compute_op { - ($INPUT:ident, $SCALAR_TYPE:ident) => {{ - let result = $INPUT.wrapping_abs(); + ($ENABLE_ANSI_MODE:expr, $INPUT:ident, $SCALAR_TYPE:ident) => {{ + let result = if $ENABLE_ANSI_MODE { + $INPUT.checked_abs().ok_or_else(|| { + ArrowError::ComputeError(format!( + "{} overflow on abs({:?})", + stringify!($SCALAR_TYPE), + $INPUT + )) + })? + } else { + $INPUT.wrapping_abs() + }; Ok(ColumnarValue::Scalar(ScalarValue::$SCALAR_TYPE(Some( result, )))) }}; - ($INPUT:ident, $PRECISION:expr, $SCALE:expr, $SCALAR_TYPE:ident) => {{ - let result = $INPUT.wrapping_abs(); + ($ENABLE_ANSI_MODE:expr, $INPUT:ident, $PRECISION:expr, $SCALE:expr, $SCALAR_TYPE:ident) => {{ + let result = if $ENABLE_ANSI_MODE { + $INPUT.checked_abs().ok_or_else(|| { + ArrowError::ComputeError(format!( + "{} overflow on abs({:?})", + stringify!($SCALAR_TYPE), + $INPUT + )) + })? + } else { + $INPUT.wrapping_abs() + }; Ok(ColumnarValue::Scalar(ScalarValue::$SCALAR_TYPE( Some(result), $PRECISION, @@ -95,7 +118,10 @@ macro_rules! scalar_compute_op { }}; } -pub fn spark_abs(args: &[ColumnarValue]) -> Result { +pub fn spark_abs( + args: &[ColumnarValue], + enable_ansi_mode: bool, +) -> Result { if args.len() != 1 { return internal_err!("abs takes exactly 1 argument, but got: {}", args.len()); } @@ -108,19 +134,35 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result Ok(args[0].clone()), DataType::Int8 => { - let abs_fun = make_wrapping_abs_function!(Int8Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int8Array) + } else { + make_wrapping_abs_function!(Int8Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Int16 => { - let abs_fun = make_wrapping_abs_function!(Int16Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int16Array) + } else { + make_wrapping_abs_function!(Int16Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Int32 => { - let abs_fun = make_wrapping_abs_function!(Int32Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int32Array) + } else { + make_wrapping_abs_function!(Int32Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Int64 => { - let abs_fun = make_wrapping_abs_function!(Int64Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Int64Array) + } else { + make_wrapping_abs_function!(Int64Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Float32 => { @@ -132,11 +174,19 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result { - let abs_fun = make_wrapping_abs_function!(Decimal128Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Decimal128Array) + } else { + make_wrapping_abs_function!(Decimal128Array) + }; abs_fun(array).map(ColumnarValue::Array) } DataType::Decimal256(_, _) => { - let abs_fun = make_wrapping_abs_function!(Decimal256Array); + let abs_fun = if enable_ansi_mode { + make_try_abs_function!(Decimal256Array) + } else { + make_wrapping_abs_function!(Decimal256Array) + }; abs_fun(array).map(ColumnarValue::Array) } dt => internal_err!("Not supported datatype for Spark ABS: {dt}"), @@ -148,10 +198,10 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result Ok(args[0].clone()), sv if sv.is_null() => Ok(args[0].clone()), - ScalarValue::Int8(Some(v)) => scalar_compute_op!(v, Int8), - ScalarValue::Int16(Some(v)) => scalar_compute_op!(v, Int16), - ScalarValue::Int32(Some(v)) => scalar_compute_op!(v, Int32), - ScalarValue::Int64(Some(v)) => scalar_compute_op!(v, Int64), + ScalarValue::Int8(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int8), + ScalarValue::Int16(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int16), + ScalarValue::Int32(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int32), + ScalarValue::Int64(Some(v)) => scalar_compute_op!(enable_ansi_mode, v, Int64), ScalarValue::Float32(Some(v)) => { Ok(ColumnarValue::Scalar(ScalarValue::Float32(Some(v.abs())))) } @@ -159,10 +209,10 @@ pub fn spark_abs(args: &[ColumnarValue]) -> Result { - scalar_compute_op!(v, *precision, *scale, Decimal128) + scalar_compute_op!(enable_ansi_mode, v, *precision, *scale, Decimal128) } ScalarValue::Decimal256(Some(v), precision, scale) => { - scalar_compute_op!(v, *precision, *scale, Decimal256) + scalar_compute_op!(enable_ansi_mode, v, *precision, *scale, Decimal256) } dt => internal_err!("Not supported datatype for Spark ABS: {dt}"), }, @@ -177,7 +227,7 @@ mod tests { macro_rules! eval_legacy_mode { ($TYPE:ident, $VAL:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL))); - match spark_abs(&[args]) { + match spark_abs(&[args], false) { Ok(ColumnarValue::Scalar(ScalarValue::$TYPE(Some(result)))) => { assert_eq!(result, $VAL); } @@ -186,7 +236,7 @@ mod tests { }}; ($TYPE:ident, $VAL:expr, $RESULT:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL))); - match spark_abs(&[args]) { + match spark_abs(&[args], false) { Ok(ColumnarValue::Scalar(ScalarValue::$TYPE(Some(result)))) => { assert_eq!(result, $RESULT); } @@ -196,7 +246,7 @@ mod tests { ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL), $PRECISION, $SCALE)); - match spark_abs(&[args]) { + match spark_abs(&[args], false) { Ok(ColumnarValue::Scalar(ScalarValue::$TYPE( Some(result), precision, @@ -212,7 +262,7 @@ mod tests { ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr, $RESULT:expr) => {{ let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL), $PRECISION, $SCALE)); - match spark_abs(&[args]) { + match spark_abs(&[args], false) { Ok(ColumnarValue::Scalar(ScalarValue::$TYPE( Some(result), precision, @@ -262,12 +312,102 @@ mod tests { eval_legacy_mode!(Float64, -0.0f64, 0.0f64); } + macro_rules! eval_ansi_mode { + ($TYPE:ident, $VAL:expr) => {{ + let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL))); + match spark_abs(&[args], true) { + Err(e) => { + assert!( + e.to_string().contains("overflow on abs"), + "Error message did not match. Actual message: {e}" + ); + } + _ => unreachable!(), + } + }}; + ($TYPE:ident, $VAL:expr, $RESULT:expr) => {{ + let args = ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL))); + match spark_abs(&[args], true) { + Ok(ColumnarValue::Scalar(ScalarValue::$TYPE(Some(result)))) => { + assert_eq!(result, $RESULT); + } + _ => unreachable!(), + } + }}; + ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr) => {{ + let args = + ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL), $PRECISION, $SCALE)); + match spark_abs(&[args], true) { + Err(e) => { + assert!( + e.to_string().contains("overflow on abs"), + "Error message did not match. Actual message: {e}" + ); + } + _ => unreachable!(), + } + }}; + ($TYPE:ident, $VAL:expr, $PRECISION:expr, $SCALE:expr, $RESULT:expr) => {{ + let args = + ColumnarValue::Scalar(ScalarValue::$TYPE(Some($VAL), $PRECISION, $SCALE)); + match spark_abs(&[args], true) { + Ok(ColumnarValue::Scalar(ScalarValue::$TYPE( + Some(result), + precision, + scale, + ))) => { + assert_eq!(result, $RESULT); + assert_eq!(precision, $PRECISION); + assert_eq!(scale, $SCALE); + } + _ => unreachable!(), + } + }}; + } + + #[test] + fn test_abs_scalar_ansi_mode() { + eval_ansi_mode!(Int8, i8::MIN); + eval_ansi_mode!(Int16, i16::MIN); + eval_ansi_mode!(Int32, i32::MIN); + eval_ansi_mode!(Int64, i64::MIN); + eval_ansi_mode!(Decimal128, i128::MIN, 18, 10); + eval_ansi_mode!(Decimal256, i256::MIN, 10, 2); + + eval_ansi_mode!(UInt8, u8::MIN, u8::MIN); + eval_ansi_mode!(UInt16, u16::MIN, u16::MIN); + eval_ansi_mode!(UInt32, u32::MIN, u32::MIN); + eval_ansi_mode!(UInt64, u64::MIN, u64::MIN); + eval_ansi_mode!(Float32, f32::MIN, f32::MAX); + eval_ansi_mode!(Float64, f64::MIN, f64::MAX); + + // NumericType, not MIN + eval_ansi_mode!(Int8, -1i8, 1i8); + eval_ansi_mode!(Int16, -1i16, 1i16); + eval_ansi_mode!(Int32, -1i32, 1i32); + eval_ansi_mode!(Int64, -1i64, 1i64); + eval_ansi_mode!(Decimal128, -1i128, 18, 10, 1i128); + eval_ansi_mode!(Decimal128, i128::MIN + 1, 38, 3, i128::MAX); + eval_ansi_mode!(Decimal256, i256::from(-1i8), 10, 2, i256::from(1i8)); + eval_ansi_mode!(Decimal256, i256::MIN + i256::from(1), 76, 7, i256::MAX); + + // Float32, Float64 + eval_ansi_mode!(Float32, f32::NEG_INFINITY, f32::INFINITY); + eval_ansi_mode!(Float32, f32::INFINITY, f32::INFINITY); + eval_ansi_mode!(Float32, 0.0f32, 0.0f32); + eval_ansi_mode!(Float32, -0.0f32, 0.0f32); + eval_ansi_mode!(Float64, f64::NEG_INFINITY, f64::INFINITY); + eval_ansi_mode!(Float64, f64::INFINITY, f64::INFINITY); + eval_ansi_mode!(Float64, 0.0f64, 0.0f64); + eval_ansi_mode!(Float64, -0.0f64, 0.0f64); + } + macro_rules! eval_array_legacy_mode { ($INPUT:expr, $OUTPUT:expr, $FUNC:ident) => {{ let input = $INPUT; let args = ColumnarValue::Array(Arc::new(input)); let expected = $OUTPUT; - match spark_abs(&[args]) { + match spark_abs(&[args], false) { Ok(ColumnarValue::Array(result)) => { let actual = datafusion_common::cast::$FUNC(&result).unwrap(); assert_eq!(actual, &expected); @@ -356,23 +496,182 @@ mod tests { ); eval_array_legacy_mode!( - Decimal128Array::from(vec![Some(i128::MIN), None]) + Decimal128Array::from(vec![Some(i128::MIN), Some(i128::MIN + 1), None]) .with_precision_and_scale(38, 37) .unwrap(), - Decimal128Array::from(vec![Some(i128::MIN), None]) + Decimal128Array::from(vec![Some(i128::MIN), Some(i128::MAX), None]) .with_precision_and_scale(38, 37) .unwrap(), as_decimal128_array ); eval_array_legacy_mode!( - Decimal256Array::from(vec![Some(i256::MIN), None]) - .with_precision_and_scale(5, 2) + Decimal256Array::from(vec![ + Some(i256::MIN), + Some(i256::MINUS_ONE), + Some(i256::MIN + i256::from(1)), + None + ]) + .with_precision_and_scale(5, 2) + .unwrap(), + Decimal256Array::from(vec![ + Some(i256::MIN), + Some(i256::ONE), + Some(i256::MAX), + None + ]) + .with_precision_and_scale(5, 2) + .unwrap(), + as_decimal256_array + ); + } + + macro_rules! eval_array_ansi_mode { + ($INPUT:expr) => {{ + let input = $INPUT; + let args = ColumnarValue::Array(Arc::new(input)); + match spark_abs(&[args], true) { + Err(e) => { + assert!( + e.to_string().contains("overflow on abs"), + "Error message did not match. Actual message: {e}" + ); + } + _ => unreachable!(), + } + }}; + ($INPUT:expr, $OUTPUT:expr, $FUNC:ident) => {{ + let input = $INPUT; + let args = ColumnarValue::Array(Arc::new(input)); + let expected = $OUTPUT; + match spark_abs(&[args], true) { + Ok(ColumnarValue::Array(result)) => { + let actual = datafusion_common::cast::$FUNC(&result).unwrap(); + assert_eq!(actual, &expected); + } + _ => unreachable!(), + } + }}; + } + #[test] + fn test_abs_array_ansi_mode() { + eval_array_ansi_mode!( + UInt64Array::from(vec![Some(u64::MIN), Some(u64::MAX), None]), + UInt64Array::from(vec![Some(u64::MIN), Some(u64::MAX), None]), + as_uint64_array + ); + + eval_array_ansi_mode!(Int8Array::from(vec![ + Some(-1), + Some(i8::MIN), + Some(i8::MAX), + None + ])); + eval_array_ansi_mode!(Int16Array::from(vec![ + Some(-1), + Some(i16::MIN), + Some(i16::MAX), + None + ])); + eval_array_ansi_mode!(Int32Array::from(vec![ + Some(-1), + Some(i32::MIN), + Some(i32::MAX), + None + ])); + eval_array_ansi_mode!(Int64Array::from(vec![ + Some(-1), + Some(i64::MIN), + Some(i64::MAX), + None + ])); + eval_array_ansi_mode!( + Float32Array::from(vec![ + Some(-1f32), + Some(f32::MIN), + Some(f32::MAX), + None, + Some(f32::NAN), + Some(f32::INFINITY), + Some(f32::NEG_INFINITY), + Some(0.0), + Some(-0.0), + ]), + Float32Array::from(vec![ + Some(1f32), + Some(f32::MAX), + Some(f32::MAX), + None, + Some(f32::NAN), + Some(f32::INFINITY), + Some(f32::INFINITY), + Some(0.0), + Some(0.0), + ]), + as_float32_array + ); + + eval_array_ansi_mode!( + Float64Array::from(vec![ + Some(-1f64), + Some(f64::MIN), + Some(f64::MAX), + None, + Some(f64::NAN), + Some(f64::INFINITY), + Some(f64::NEG_INFINITY), + Some(0.0), + Some(-0.0), + ]), + Float64Array::from(vec![ + Some(1f64), + Some(f64::MAX), + Some(f64::MAX), + None, + Some(f64::NAN), + Some(f64::INFINITY), + Some(f64::INFINITY), + Some(0.0), + Some(0.0), + ]), + as_float64_array + ); + + // decimal: no arithmetic overflow + eval_array_ansi_mode!( + Decimal128Array::from(vec![Some(-1), Some(-2), Some(i128::MIN + 1)]) + .with_precision_and_scale(38, 37) .unwrap(), - Decimal256Array::from(vec![Some(i256::MIN), None]) - .with_precision_and_scale(5, 2) + Decimal128Array::from(vec![Some(1), Some(2), Some(i128::MAX)]) + .with_precision_and_scale(38, 37) .unwrap(), + as_decimal128_array + ); + + eval_array_ansi_mode!( + Decimal256Array::from(vec![ + Some(i256::MINUS_ONE), + Some(i256::from(-2)), + Some(i256::MIN + i256::from(1)) + ]) + .with_precision_and_scale(18, 7) + .unwrap(), + Decimal256Array::from(vec![ + Some(i256::ONE), + Some(i256::from(2)), + Some(i256::MAX) + ]) + .with_precision_and_scale(18, 7) + .unwrap(), as_decimal256_array ); + + // decimal: arithmetic overflow + eval_array_ansi_mode!(Decimal128Array::from(vec![Some(i128::MIN), None]) + .with_precision_and_scale(38, 37) + .unwrap()); + eval_array_ansi_mode!(Decimal256Array::from(vec![Some(i256::MIN), None]) + .with_precision_and_scale(5, 2) + .unwrap()); } } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index b499401e5589..0e34dabbdf2b 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -78,7 +78,8 @@ impl TestContext { pub async fn try_new_for_test_file(relative_path: &Path) -> Option { let config = SessionConfig::new() // hardcode target partitions so plans are deterministic - .with_target_partitions(4); + .with_target_partitions(4) + .with_enable_ansi_mode(true); let runtime = Arc::new(RuntimeEnv::default()); let mut state = SessionStateBuilder::new() .with_config(config) diff --git a/datafusion/sqllogictest/test_files/spark/math/abs.slt b/datafusion/sqllogictest/test_files/spark/math/abs.slt index 19ca902ea3de..b1275350a6b4 100644 --- a/datafusion/sqllogictest/test_files/spark/math/abs.slt +++ b/datafusion/sqllogictest/test_files/spark/math/abs.slt @@ -33,10 +33,17 @@ SELECT abs(-127::TINYINT), abs(-32767::SMALLINT), abs(-2147483647::INT), abs(-92 # See https://github.com/apache/datafusion/issues/18794 for operator precedence # abs: signed int minimal values -query IIII -select abs((-128)::TINYINT), abs((-32768)::SMALLINT), abs((-2147483648)::INT), abs((-9223372036854775808)::BIGINT) ----- --128 -32768 -2147483648 -9223372036854775808 +query error DataFusion error: Arrow error: Compute error: Int8 overflow on abs\(\-128\) +select abs((-128)::TINYINT) + +query error DataFusion error: Arrow error: Compute error: Int16 overflow on abs\(\-32768\) +select abs((-32768)::SMALLINT) + +query error DataFusion error: Arrow error: Compute error: Int32 overflow on abs\(\-2147483648\) +select abs((-2147483648)::INT) + +query error DataFusion error: Arrow error: Compute error: Int64 overflow on abs\(\-9223372036854775808\) +select abs((-9223372036854775808)::BIGINT) # abs: floats, NULL, NaN, -0, infinity, -infinity query RRRRRRRRRRRR