From 91d0d78bb892268d3966dc69e3cf3d9c58b69371 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Mon, 22 Sep 2025 23:37:38 +0200 Subject: [PATCH 01/13] feat(spark): implement Spark make_dt_interval function --- .../src/function/datetime/make_dt_interval.rs | 482 ++++++++++++++++++ datafusion/spark/src/function/datetime/mod.rs | 10 +- 2 files changed, 491 insertions(+), 1 deletion(-) create mode 100644 datafusion/spark/src/function/datetime/make_dt_interval.rs diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs new file mode 100644 index 000000000000..42379df10d6a --- /dev/null +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -0,0 +1,482 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, AsArray, DurationMicrosecondBuilder, PrimitiveArray}; +use arrow::datatypes::TimeUnit::Microsecond; +use arrow::datatypes::{DataType, Float64Type, Int32Type}; +use datafusion_common::{exec_err, plan_datafusion_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility}; +use datafusion_functions::utils::make_scalar_function; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkMakeDtInterval { + signature: Signature, +} + +impl Default for SparkMakeDtInterval { + fn default() -> Self { + Self::new() + } +} + +impl SparkMakeDtInterval { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkMakeDtInterval { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "make_dt_interval" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Duration(Microsecond)) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + if args.args.is_empty() { + return Ok(ColumnarValue::Scalar(ScalarValue::DurationMicrosecond( + Some(0), + ))); + } + make_scalar_function(make_dt_interval_kernel,vec![])(&args.args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + if arg_types.len() > 4 { + return exec_err!( + "make_dt_interval expects between 0 and 4 arguments, got {}", + arg_types.len() + ); + } + + Ok((0..arg_types.len()) + .map(|i| { + if i == 3 { + DataType::Float64 + } else { + DataType::Int32 + } + }) + .collect()) + } +} + +fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result { + + let n_rows = args[0].len(); + let days =args[0] + .as_primitive_opt::() + .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[0] must be Int32"))?; + let hours: Option<&PrimitiveArray> = args + .get(1) + .map(|a| { + a.as_primitive_opt::() + .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[1] must be Int32")) + }) + .transpose()?; + let mins: Option<&PrimitiveArray> = args + .get(2) + .map(|a| { + a.as_primitive_opt::() + .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[2] must be Int32")) + }) + .transpose()?; + let secs: Option<&PrimitiveArray> = args + .get(3) + .map(|a| { + a.as_primitive_opt::() + .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[3] must be Float64")) + }) + .transpose()?; + let mut builder = DurationMicrosecondBuilder::with_capacity(n_rows); + + for i in 0..n_rows { + // if one column is NULL → result NULL + let any_null_present = days.is_null(i) + || hours.as_ref().is_some_and(|a| a.is_null(i)) + || mins.as_ref().is_some_and(|a| a.is_null(i)) + || secs + .as_ref() + .is_some_and(|a| a.is_null(i) || a.value(i).is_infinite() || a.value(i).is_nan()); + + if any_null_present { + builder.append_null(); + continue; + } + + // default values 0 or 0.0 + let d = days.value(i); + let h = hours.as_ref().map_or(0, |a| a.value(i)); + let mi = mins.as_ref().map_or(0, |a| a.value(i)); + let s = secs.as_ref().map_or(0.0, |a| a.value(i)); + + match make_interval_dt_nano(d, h, mi, s)? { + Some(v) => builder.append_value(v), + None => { + builder.append_null(); + continue; + } + } + } + + Ok(Arc::new(builder.finish())) +} +pub fn make_interval_dt_nano(day: i32, hour: i32, min: i32, sec: f64) -> Result> { + const HOURS_PER_DAY: i32 = 24; + const MINS_PER_HOUR: i32 = 60; + const SECS_PER_MINUTE: i64 = 60; + const MICROS_PER_SEC: i64 = 1_000_000; + + let total_hours: i32 = match day + .checked_mul(HOURS_PER_DAY) + .and_then(|v| v.checked_add(hour)) + { + Some(v) => v, + None => { + return Err(DataFusionError::Execution( + "make_dt_interval: long overflow".into(), + )) + } + }; + + let total_mins: i32 = match total_hours + .checked_mul(MINS_PER_HOUR) + .and_then(|v| v.checked_add(min)) + { + Some(v) => v, + None => { + return Err(DataFusionError::Execution( + "make_dt_interval: long overflow".into(), + )) + } + }; + + let mut sec_whole: i64 = sec.trunc() as i64; + let sec_frac: f64 = sec - (sec_whole as f64); + let mut frac_us: i64 = (sec_frac * (MICROS_PER_SEC as f64)).round() as i64; + + if frac_us.abs() >= MICROS_PER_SEC { + if frac_us > 0 { + frac_us -= MICROS_PER_SEC; + sec_whole = match sec_whole.checked_add(1) { + Some(v) => v, + None => { + return Err(DataFusionError::Execution( + "make_dt_interval: long overflow".into(), + )) + } + }; + } else { + frac_us += MICROS_PER_SEC; + sec_whole = match sec_whole.checked_sub(1) { + Some(v) => v, + None => { + return Err(DataFusionError::Execution( + "make_dt_interval: long overflow".into(), + )) + } + }; + } + } + + let total_secs: i64 = match (total_mins as i64) + .checked_mul(SECS_PER_MINUTE) + .and_then(|v| v.checked_add(sec_whole)) + { + Some(v) => v, + None => { + return Err(DataFusionError::Execution( + "make_dt_interval: long overflow".into(), + )) + } + }; + + let total_us = total_secs + .checked_mul(MICROS_PER_SEC) + .and_then(|v| v.checked_add(frac_us)); + + Ok(total_us) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{DurationMicrosecondArray, Float64Array, Int32Array}; + use arrow::datatypes::DataType::Duration; + use arrow::datatypes::Field; + use arrow::datatypes::TimeUnit::Microsecond; + use datafusion_common::{DataFusionError, Result}; + use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; + + use super::*; + + fn run_make_dt_interval(arrs: Vec) -> Result { + make_dt_interval_kernel(&arrs) + } + + #[test] + fn nulls_propagate_per_row() -> Result<()> { + let days = Arc::new(Int32Array::from(vec![ + None, + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + Some(7), + ])) as ArrayRef; + + let hours = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(3), + Some(4), + Some(5), + Some(6), + Some(7), + ])) as ArrayRef; + + let mins = Arc::new(Int32Array::from(vec![ + Some(1), + Some(2), + None, + Some(4), + Some(5), + Some(6), + Some(7), + ])) as ArrayRef; + + let secs = Arc::new(Float64Array::from(vec![ + Some(1.0), + Some(2.0), + Some(3.0), + None, + Some(f64::NAN), + Some(f64::INFINITY), + Some(f64::NEG_INFINITY), + ])) as ArrayRef; + + let out = run_make_dt_interval(vec![days, hours, mins, secs])?; + let out = out + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + + for i in 0..out.len() { + assert!(out.is_null(i), "row {i} should be NULL"); + } + Ok(()) + } + + #[test] + fn overflow_should_error() -> Result<()> { + let days = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef; + let hours = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef; + let mins = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef; + let secs = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef; + + let res = run_make_dt_interval(vec![days, hours, mins, secs]); + + assert!( + matches!(res, Err(DataFusionError::Execution(_))), + "expected Execution error due to overflow, got: {res:?}" + ); + + Ok(()) + } + + fn invoke_make_dt_interval_with_args( + args: Vec, + number_rows: usize, + ) -> Result { + let arg_fields = args + .iter() + .map(|arg| Field::new("a", arg.data_type(), true).into()) + .collect::>(); + let args = ScalarFunctionArgs { + args, + arg_fields, + number_rows, + return_field: Field::new("f", Duration(Microsecond), true).into(), + config_options: Arc::new(Default::default()), + }; + SparkMakeDtInterval::new().invoke_with_args(args) + } + + #[test] + fn zero_args_returns_zero_duration() -> Result<()> { + let number_rows: usize = 3; + + let res: ColumnarValue = invoke_make_dt_interval_with_args(vec![], number_rows)?; + let arr = res.into_array(number_rows)?; + let arr = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + + assert_eq!(arr.len(), number_rows); + for i in 0..number_rows { + assert!(!arr.is_null(i)); + assert_eq!(arr.value(i), 0_i64); + } + Ok(()) + } + + #[test] + fn one_day_minus_24_hours_equals_zero() -> Result<()> { + let arr_days = Arc::new(Int32Array::from(vec![Some(1), Some(-1)])) as ArrayRef; + let arr_hours = Arc::new(Int32Array::from(vec![Some(-24), Some(24)])) as ArrayRef; + let arr_mins = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let arr_secs = Arc::new(Float64Array::from(vec![Some(0.0), Some(0.0)])) as ArrayRef; + + let out = run_make_dt_interval(vec![arr_days, arr_hours, arr_mins, arr_secs])?; + let out = out + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + + assert_eq!(out.len(), 2); + assert_eq!(out.null_count(), 0); + assert_eq!(out.value(0), 0_i64); + assert_eq!(out.value(1), 0_i64); + Ok(()) + } + + #[test] + fn one_hour_minus_60_mins_equals_zero() -> Result<()> { + let arr_days = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let arr_hours = Arc::new(Int32Array::from(vec![Some(-1), Some(1)])) as ArrayRef; + let arr_mins = Arc::new(Int32Array::from(vec![Some(60), Some(-60)])) as ArrayRef; + let arr_secs = Arc::new(Float64Array::from(vec![Some(0.0), Some(0.0)])) as ArrayRef; + + let out = run_make_dt_interval(vec![arr_days, arr_hours, arr_mins, arr_secs])?; + let out = out + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + + assert_eq!(out.len(), 2); + assert_eq!(out.null_count(), 0); + assert_eq!(out.value(0), 0_i64); + assert_eq!(out.value(1), 0_i64); + Ok(()) + } + + #[test] + fn one_mins_minus_60_secs_equals_zero() -> Result<()> { + let arr_days = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let arr_hours = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let arr_mins = Arc::new(Int32Array::from(vec![Some(-1), Some(1)])) as ArrayRef; + let arr_secs = Arc::new(Float64Array::from(vec![Some(60.0), Some(-60.0)])) as ArrayRef; + + let out = run_make_dt_interval(vec![arr_days, arr_hours, arr_mins, arr_secs])?; + let out = out + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + + assert_eq!(out.len(), 2); + assert_eq!(out.null_count(), 0); + assert_eq!(out.value(0), 0_i64); + assert_eq!(out.value(1), 0_i64); + Ok(()) + } + + #[test] + fn frac_carries_up_to_next_second_positive() -> Result<()> { + // 0.9999995s → 1_000_000 µs (carry a +1s) + let days = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let hours = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let mins = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let secs = Arc::new(Float64Array::from(vec![ + Some(0.999_999_5), + Some(0.999_999_4), + ])) as ArrayRef; + + let out = run_make_dt_interval(vec![days, hours, mins, secs])?; + let out = out + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + + assert_eq!(out.len(), 2); + assert_eq!(out.value(0), 1_000_000); + assert_eq!(out.value(1), 999_999); + Ok(()) + } + + #[test] + fn frac_carries_down_to_prev_second_negative() -> Result<()> { + // -0.9999995s → -1_000_000 µs (carry a −1s) + let days = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let hours = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let mins = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; + let secs = Arc::new(Float64Array::from(vec![ + Some(-0.999_999_5), + Some(-0.999_999_4), + ])) as ArrayRef; + + let out = run_make_dt_interval(vec![days, hours, mins, secs])?; + let out = out + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + + assert_eq!(out.len(), 2); + assert_eq!(out.value(0), -1_000_000); + assert_eq!(out.value(1), -999_999); + Ok(()) + } + + #[test] + fn no_more_than_4_params() -> Result<()> { + let udf = SparkMakeDtInterval::new(); + + let arg_types = vec![ + DataType::Int32, + DataType::Int32, + DataType::Int32, + DataType::Float64, + DataType::Int32, + ]; + + let res = udf.coerce_types(&arg_types); + + assert!( + matches!(res, Err(DataFusionError::Execution(_))), + "make_dt_interval expects between 0 and 4 arguments, got 5" + ); + + Ok(()) + } +} diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index c4dee81a2cd2..30931431f0c7 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -20,6 +20,7 @@ pub mod date_sub; pub mod last_day; pub mod make_interval; pub mod next_day; +pub mod make_dt_interval; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; @@ -28,6 +29,7 @@ use std::sync::Arc; make_udf_function!(date_add::SparkDateAdd, date_add); make_udf_function!(date_sub::SparkDateSub, date_sub); make_udf_function!(last_day::SparkLastDay, last_day); +make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval); make_udf_function!(make_interval::SparkMakeInterval, make_interval); make_udf_function!(next_day::SparkNextDay, next_day); @@ -49,10 +51,15 @@ pub mod expr_fn { "Returns the last day of the month which the date belongs to.", arg1 )); + export_functions!(( + make_dt_interval, + "Make dt interval from weeks, days, hours, mins and secs.", + arg1 arg2 arg3 arg4 arg5 + )); export_functions!(( make_interval, "Make interval from years, months, weeks, days, hours, mins and secs.", - arg1 arg2 + arg1 arg2 arg3 arg4 arg5 arg6 arg7 )); // TODO: add once ANSI support is added: // "When both of the input parameters are not NULL and day_of_week is an invalid input, the function throws SparkIllegalArgumentException if spark.sql.ansi.enabled is set to true, otherwise NULL." @@ -68,6 +75,7 @@ pub fn functions() -> Vec> { date_add(), date_sub(), last_day(), + make_dt_interval(), make_interval(), next_day(), ] From 2da5ef68b48ca4b0ae848f8c0f0c97e1b497253d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Mon, 22 Sep 2025 23:43:00 +0200 Subject: [PATCH 02/13] fmt --- .../src/function/datetime/make_dt_interval.rs | 90 ++++++++++++------- datafusion/spark/src/function/datetime/mod.rs | 2 +- 2 files changed, 61 insertions(+), 31 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 42379df10d6a..3fab2819ba6f 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -18,11 +18,17 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, AsArray, DurationMicrosecondBuilder, PrimitiveArray}; +use arrow::array::{ + Array, ArrayRef, AsArray, DurationMicrosecondBuilder, PrimitiveArray, +}; use arrow::datatypes::TimeUnit::Microsecond; use arrow::datatypes::{DataType, Float64Type, Int32Type}; -use datafusion_common::{exec_err, plan_datafusion_err, DataFusionError, Result, ScalarValue}; -use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility}; +use datafusion_common::{ + exec_err, plan_datafusion_err, DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; use datafusion_functions::utils::make_scalar_function; #[derive(Debug, PartialEq, Eq, Hash)] @@ -67,15 +73,15 @@ impl ScalarUDFImpl for SparkMakeDtInterval { Some(0), ))); } - make_scalar_function(make_dt_interval_kernel,vec![])(&args.args) + make_scalar_function(make_dt_interval_kernel, vec![])(&args.args) } fn coerce_types(&self, arg_types: &[DataType]) -> Result> { if arg_types.len() > 4 { - return exec_err!( - "make_dt_interval expects between 0 and 4 arguments, got {}", - arg_types.len() - ); + return exec_err!( + "make_dt_interval expects between 0 and 4 arguments, got {}", + arg_types.len() + ); } Ok((0..arg_types.len()) @@ -91,30 +97,32 @@ impl ScalarUDFImpl for SparkMakeDtInterval { } fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result { - let n_rows = args[0].len(); - let days =args[0] + let days = args[0] .as_primitive_opt::() .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[0] must be Int32"))?; let hours: Option<&PrimitiveArray> = args .get(1) .map(|a| { - a.as_primitive_opt::() - .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[1] must be Int32")) + a.as_primitive_opt::().ok_or_else(|| { + plan_datafusion_err!("make_dt_interval arg[1] must be Int32") + }) }) .transpose()?; let mins: Option<&PrimitiveArray> = args .get(2) .map(|a| { - a.as_primitive_opt::() - .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[2] must be Int32")) + a.as_primitive_opt::().ok_or_else(|| { + plan_datafusion_err!("make_dt_interval arg[2] must be Int32") + }) }) .transpose()?; let secs: Option<&PrimitiveArray> = args .get(3) .map(|a| { - a.as_primitive_opt::() - .ok_or_else(|| plan_datafusion_err!("make_dt_interval arg[3] must be Float64")) + a.as_primitive_opt::().ok_or_else(|| { + plan_datafusion_err!("make_dt_interval arg[3] must be Float64") + }) }) .transpose()?; let mut builder = DurationMicrosecondBuilder::with_capacity(n_rows); @@ -124,9 +132,9 @@ fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result Result Result> { +pub fn make_interval_dt_nano( + day: i32, + hour: i32, + min: i32, + sec: f64, +) -> Result> { const HOURS_PER_DAY: i32 = 24; const MINS_PER_HOUR: i32 = 60; const SECS_PER_MINUTE: i64 = 60; @@ -290,7 +303,9 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; for i in 0..out.len() { assert!(out.is_null(i), "row {i} should be NULL"); @@ -342,7 +357,9 @@ mod tests { let arr = arr .as_any() .downcast_ref::() - .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; assert_eq!(arr.len(), number_rows); for i in 0..number_rows { @@ -357,13 +374,16 @@ mod tests { let arr_days = Arc::new(Int32Array::from(vec![Some(1), Some(-1)])) as ArrayRef; let arr_hours = Arc::new(Int32Array::from(vec![Some(-24), Some(24)])) as ArrayRef; let arr_mins = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; - let arr_secs = Arc::new(Float64Array::from(vec![Some(0.0), Some(0.0)])) as ArrayRef; + let arr_secs = + Arc::new(Float64Array::from(vec![Some(0.0), Some(0.0)])) as ArrayRef; let out = run_make_dt_interval(vec![arr_days, arr_hours, arr_mins, arr_secs])?; let out = out .as_any() .downcast_ref::() - .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; assert_eq!(out.len(), 2); assert_eq!(out.null_count(), 0); @@ -377,13 +397,16 @@ mod tests { let arr_days = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; let arr_hours = Arc::new(Int32Array::from(vec![Some(-1), Some(1)])) as ArrayRef; let arr_mins = Arc::new(Int32Array::from(vec![Some(60), Some(-60)])) as ArrayRef; - let arr_secs = Arc::new(Float64Array::from(vec![Some(0.0), Some(0.0)])) as ArrayRef; + let arr_secs = + Arc::new(Float64Array::from(vec![Some(0.0), Some(0.0)])) as ArrayRef; let out = run_make_dt_interval(vec![arr_days, arr_hours, arr_mins, arr_secs])?; let out = out .as_any() .downcast_ref::() - .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; assert_eq!(out.len(), 2); assert_eq!(out.null_count(), 0); @@ -397,13 +420,16 @@ mod tests { let arr_days = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; let arr_hours = Arc::new(Int32Array::from(vec![Some(0), Some(0)])) as ArrayRef; let arr_mins = Arc::new(Int32Array::from(vec![Some(-1), Some(1)])) as ArrayRef; - let arr_secs = Arc::new(Float64Array::from(vec![Some(60.0), Some(-60.0)])) as ArrayRef; + let arr_secs = + Arc::new(Float64Array::from(vec![Some(60.0), Some(-60.0)])) as ArrayRef; let out = run_make_dt_interval(vec![arr_days, arr_hours, arr_mins, arr_secs])?; let out = out .as_any() .downcast_ref::() - .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; assert_eq!(out.len(), 2); assert_eq!(out.null_count(), 0); @@ -427,7 +453,9 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; assert_eq!(out.len(), 2); assert_eq!(out.value(0), 1_000_000); @@ -450,7 +478,9 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| DataFusionError::Internal("expected DurationMicrosecondArray".into()))?; + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; assert_eq!(out.len(), 2); assert_eq!(out.value(0), -1_000_000); diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 30931431f0c7..f4a21a25da12 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -18,9 +18,9 @@ pub mod date_add; pub mod date_sub; pub mod last_day; +pub mod make_dt_interval; pub mod make_interval; pub mod next_day; -pub mod make_dt_interval; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; From fcd15ba4741309313fa21e192bb87911a7f2e646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Mon, 22 Sep 2025 23:57:03 +0200 Subject: [PATCH 03/13] delete pub --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 2 +- datafusion/spark/src/function/datetime/make_interval.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 3fab2819ba6f..188c07513135 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -158,7 +158,7 @@ fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result Result Ok(Arc::new(builder.finish())) } -pub fn make_interval_month_day_nano( +fn make_interval_month_day_nano( year: i32, month: i32, week: i32, From c5c2a1989bbf641915d6c7fc61f86649a59cd729 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Tue, 23 Sep 2025 09:02:17 +0200 Subject: [PATCH 04/13] test slt --- .../spark/datetime/make_dt_interval.slt | 138 +++++++++++++++++- 1 file changed, 132 insertions(+), 6 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt b/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt index e5c69cfbb8c9..b4c101087d94 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt @@ -23,15 +23,141 @@ ## Original Query: SELECT make_dt_interval(1, 12, 30, 01.001001); ## PySpark 3.5.5 Result: {'make_dt_interval(1, 12, 30, 1.001001)': datetime.timedelta(days=1, seconds=45001, microseconds=1001), 'typeof(make_dt_interval(1, 12, 30, 1.001001))': 'interval day to second', 'typeof(1)': 'int', 'typeof(12)': 'int', 'typeof(30)': 'int', 'typeof(1.001001)': 'decimal(7,6)'} -#query -#SELECT make_dt_interval(1::int, 12::int, 30::int, 1.001001::decimal(7,6)); +query ? +SELECT make_dt_interval(1::int, 12::int, 30::int, 1.001001::decimal(7,6)); +---- +1 days 12 hours 30 mins 1.001001 secs ## Original Query: SELECT make_dt_interval(100, null, 3); ## PySpark 3.5.5 Result: {'make_dt_interval(100, NULL, 3, 0.000000)': None, 'typeof(make_dt_interval(100, NULL, 3, 0.000000))': 'interval day to second', 'typeof(100)': 'int', 'typeof(NULL)': 'void', 'typeof(3)': 'int'} -#query -#SELECT make_dt_interval(100::int, NULL::void, 3::int); +query error DataFusion error: This feature is not implemented: Unsupported SQL type void +SELECT make_dt_interval(100::int, NULL::void, 3::int); ## Original Query: SELECT make_dt_interval(2); ## PySpark 3.5.5 Result: {'make_dt_interval(2, 0, 0, 0.000000)': datetime.timedelta(days=2), 'typeof(make_dt_interval(2, 0, 0, 0.000000))': 'interval day to second', 'typeof(2)': 'int'} -#query -#SELECT make_dt_interval(2::int); +query ? +SELECT make_dt_interval(2::int); +---- +2 days 0 hours 0 mins 0.000000 secs + +# null +query ? +SELECT (make_dt_interval(null, 0, 0, 0)) +---- +NULL + +query ? +SELECT (make_dt_interval(0, null, 0, 0)) +---- +NULL + +query ? +SELECT (make_dt_interval(0, 0, null, 0)) +---- +NULL + +query ? +SELECT (make_dt_interval(0, 0, 0, null)) +---- +NULL + +# missing params +query ? +SELECT (make_dt_interval()) AS make_dt_interval +---- +0 days 0 hours 0 mins 0.000000 secs + +query ? +SELECT (make_dt_interval(1)) AS make_dt_interval +---- +1 days 0 hours 0 mins 0.000000 secs + +query ? +SELECT (make_dt_interval(1, 1)) AS make_dt_interval +---- +1 days 1 hours 0 mins 0.000000 secs + +query ? +SELECT (make_dt_interval(1, 1, 1)) AS make_dt_interval +---- +1 days 1 hours 1 mins 0.000000 secs + +query ? +SELECT (make_dt_interval(1, 1, 1, 1)) AS make_dt_interval +---- +1 days 1 hours 1 mins 1.000000 secs + + +# all 0 values +query ? +SELECT (make_dt_interval(0, 0, 0, 0)) +---- +0 days 0 hours 0 mins 0.000000 secs + +query ? +SELECT (make_dt_interval(-1, 24, 0, 0)) df +---- +0 days 0 hours 0 mins 0.000000 secs + +query ? +SELECT (make_dt_interval(1, -24, 0, 0)) dt +---- +0 days 0 hours 0 mins 0.000000 secs + +query ? +SELECT (make_dt_interval(0, 0, 0, 0.1)) +---- +0 days 0 hours 0 mins 0.100000 secs + + +# doctest https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.make_dt_interval.html + +query IIIR? +SELECT day, + hour, + min, + sec, + MAKE_DT_INTERVAL(day) AS interval_val +FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); +---- +1 12 30 1.001001 1 days 0 hours 0 mins 0.000000 secs + +query IIIR? +SELECT day, + hour, + min, + sec, + MAKE_DT_INTERVAL(day, hour) AS interval_val +FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); +---- +1 12 30 1.001001 1 days 12 hours 0 mins 0.000000 secs + +query IIIR? +SELECT day, + hour, + min, + sec, + MAKE_DT_INTERVAL(day, hour, min) AS interval_val +FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); +---- +1 12 30 1.001001 1 days 12 hours 30 mins 0.000000 secs + +query IIIR? +SELECT day, + hour, + min, + sec, + MAKE_DT_INTERVAL(day, hour, min, sec) AS interval_val +FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); +---- +1 12 30 1.001001 1 days 12 hours 30 mins 1.001001 secs + +query ? +SELECT MAKE_DT_INTERVAL(1, 12, 30, 1.001001) +---- +1 days 12 hours 30 mins 1.001001 secs + +query ? +SELECT MAKE_DT_INTERVAL(1, 12, 30, 1.001001); +---- +1 days 12 hours 30 mins 1.001001 secs From 594f1f8770cd05f54635098ad21c244a8e730f27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Tue, 23 Sep 2025 09:06:45 +0200 Subject: [PATCH 05/13] fmt --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 188c07513135..3ac1302ec109 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -158,12 +158,7 @@ fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result Result> { +fn make_interval_dt_nano(day: i32, hour: i32, min: i32, sec: f64) -> Result> { const HOURS_PER_DAY: i32 = 24; const MINS_PER_HOUR: i32 = 60; const SECS_PER_MINUTE: i64 = 60; From 85313477cc8a18eab66dffe0a782a53ba7e8629a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Tue, 23 Sep 2025 10:06:25 +0200 Subject: [PATCH 06/13] overflow -> null --- .../src/function/datetime/make_dt_interval.rs | 90 +++++++------------ 1 file changed, 31 insertions(+), 59 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 3ac1302ec109..fac48d297511 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -147,7 +147,7 @@ fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result builder.append_value(v), None => { builder.append_null(); @@ -158,35 +158,19 @@ fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result Result> { +fn make_interval_dt_nano(day: i32, hour: i32, min: i32, sec: f64) -> Option { const HOURS_PER_DAY: i32 = 24; const MINS_PER_HOUR: i32 = 60; const SECS_PER_MINUTE: i64 = 60; const MICROS_PER_SEC: i64 = 1_000_000; - let total_hours: i32 = match day + let total_hours: i32 = day .checked_mul(HOURS_PER_DAY) - .and_then(|v| v.checked_add(hour)) - { - Some(v) => v, - None => { - return Err(DataFusionError::Execution( - "make_dt_interval: long overflow".into(), - )) - } - }; + .and_then(|v| v.checked_add(hour))?; - let total_mins: i32 = match total_hours + let total_mins: i32 = total_hours .checked_mul(MINS_PER_HOUR) - .and_then(|v| v.checked_add(min)) - { - Some(v) => v, - None => { - return Err(DataFusionError::Execution( - "make_dt_interval: long overflow".into(), - )) - } - }; + .and_then(|v| v.checked_add(min))?; let mut sec_whole: i64 = sec.trunc() as i64; let sec_frac: f64 = sec - (sec_whole as f64); @@ -195,44 +179,22 @@ fn make_interval_dt_nano(day: i32, hour: i32, min: i32, sec: f64) -> Result= MICROS_PER_SEC { if frac_us > 0 { frac_us -= MICROS_PER_SEC; - sec_whole = match sec_whole.checked_add(1) { - Some(v) => v, - None => { - return Err(DataFusionError::Execution( - "make_dt_interval: long overflow".into(), - )) - } - }; + sec_whole = sec_whole.checked_add(1)?; } else { frac_us += MICROS_PER_SEC; - sec_whole = match sec_whole.checked_sub(1) { - Some(v) => v, - None => { - return Err(DataFusionError::Execution( - "make_dt_interval: long overflow".into(), - )) - } - }; + sec_whole = sec_whole.checked_sub(1)?; } } - let total_secs: i64 = match (total_mins as i64) + let total_secs: i64 = (total_mins as i64) .checked_mul(SECS_PER_MINUTE) - .and_then(|v| v.checked_add(sec_whole)) - { - Some(v) => v, - None => { - return Err(DataFusionError::Execution( - "make_dt_interval: long overflow".into(), - )) - } - }; + .and_then(|v| v.checked_add(sec_whole))?; let total_us = total_secs .checked_mul(MICROS_PER_SEC) - .and_then(|v| v.checked_add(frac_us)); + .and_then(|v| v.checked_add(frac_us))?; - Ok(total_us) + Some(total_us) } #[cfg(test)] @@ -309,18 +271,28 @@ mod tests { } #[test] - fn overflow_should_error() -> Result<()> { + fn error_months_overflow_should_be_null() -> Result<()> { + // months = year*12 + month → NULL + let days = Arc::new(Int32Array::from(vec![Some(i32::MAX)])) as ArrayRef; - let hours = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef; - let mins = Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef; - let secs = Arc::new(Float64Array::from(vec![Some(0.0)])) as ArrayRef; - let res = run_make_dt_interval(vec![days, hours, mins, secs]); + let hours = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef; - assert!( - matches!(res, Err(DataFusionError::Execution(_))), - "expected Execution error due to overflow, got: {res:?}" - ); + let mins = Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef; + + let secs = Arc::new(Float64Array::from(vec![Some(1.0)])) as ArrayRef; + + let out = run_make_dt_interval(vec![days, hours, mins, secs])?; + let out = out + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal("expected DurationMicrosecondArray".into()) + })?; + + for i in 0..out.len() { + assert!(out.is_null(i), "row {i} should be NULL"); + } Ok(()) } From 130fa4bd4e33be3572e01062ca990c4854583a99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Wed, 24 Sep 2025 08:59:02 +0200 Subject: [PATCH 07/13] sugested changes --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 2 +- datafusion/spark/src/function/datetime/mod.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index fac48d297511..04f12e53a1dd 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -133,7 +133,7 @@ fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result Date: Wed, 24 Sep 2025 08:59:33 +0200 Subject: [PATCH 08/13] fmt --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 04f12e53a1dd..195c78b3e183 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -132,9 +132,9 @@ fn make_dt_interval_kernel(args: &[ArrayRef]) -> Result Date: Wed, 24 Sep 2025 09:08:54 +0200 Subject: [PATCH 09/13] only res in slt --- .../spark/datetime/make_dt_interval.slt | 41 ++++++------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt b/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt index b4c101087d94..fadcdcbd83ac 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt @@ -111,46 +111,31 @@ SELECT (make_dt_interval(0, 0, 0, 0.1)) # doctest https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.make_dt_interval.html +# extract only the value make_dt_interval -query IIIR? -SELECT day, - hour, - min, - sec, - MAKE_DT_INTERVAL(day) AS interval_val +query ? +SELECT MAKE_DT_INTERVAL(day) AS interval_val FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); ---- -1 12 30 1.001001 1 days 0 hours 0 mins 0.000000 secs +1 days 0 hours 0 mins 0.000000 secs -query IIIR? -SELECT day, - hour, - min, - sec, - MAKE_DT_INTERVAL(day, hour) AS interval_val +query ? +SELECT MAKE_DT_INTERVAL(day, hour) AS interval_val FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); ---- -1 12 30 1.001001 1 days 12 hours 0 mins 0.000000 secs +1 days 12 hours 0 mins 0.000000 secs -query IIIR? -SELECT day, - hour, - min, - sec, - MAKE_DT_INTERVAL(day, hour, min) AS interval_val +query ? +SELECT MAKE_DT_INTERVAL(day, hour, min) AS interval_val FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); ---- -1 12 30 1.001001 1 days 12 hours 30 mins 0.000000 secs +1 days 12 hours 30 mins 0.000000 secs -query IIIR? -SELECT day, - hour, - min, - sec, - MAKE_DT_INTERVAL(day, hour, min, sec) AS interval_val +query ? +SELECT MAKE_DT_INTERVAL(day, hour, min, sec) AS interval_val FROM VALUES (1, 12, 30, 1.001001) AS t(day, hour, min, sec); ---- -1 12 30 1.001001 1 days 12 hours 30 mins 1.001001 secs +1 days 12 hours 30 mins 1.001001 secs query ? SELECT MAKE_DT_INTERVAL(1, 12, 30, 1.001001) From a7e21894360c725cd7763d9d94839e5c4fd20adb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Wed, 24 Sep 2025 09:10:02 +0200 Subject: [PATCH 10/13] null not void type --- .../test_files/spark/datetime/make_dt_interval.slt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt b/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt index fadcdcbd83ac..dc6c33caa9b4 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/make_dt_interval.slt @@ -30,8 +30,10 @@ SELECT make_dt_interval(1::int, 12::int, 30::int, 1.001001::decimal(7,6)); ## Original Query: SELECT make_dt_interval(100, null, 3); ## PySpark 3.5.5 Result: {'make_dt_interval(100, NULL, 3, 0.000000)': None, 'typeof(make_dt_interval(100, NULL, 3, 0.000000))': 'interval day to second', 'typeof(100)': 'int', 'typeof(NULL)': 'void', 'typeof(3)': 'int'} -query error DataFusion error: This feature is not implemented: Unsupported SQL type void -SELECT make_dt_interval(100::int, NULL::void, 3::int); +query ? +SELECT make_dt_interval(100::int, NULL, 3::int); +---- +NULL ## Original Query: SELECT make_dt_interval(2); ## PySpark 3.5.5 Result: {'make_dt_interval(2, 0, 0, 0.000000)': datetime.timedelta(days=2), 'typeof(make_dt_interval(2, 0, 0, 0.000000))': 'interval day to second', 'typeof(2)': 'int'} From 76c2297f27a0e565fb431a9a05a5c5660ac5761f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Fri, 26 Sep 2025 07:53:47 +0200 Subject: [PATCH 11/13] explain types --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 195c78b3e183..24458629aab6 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -63,6 +63,9 @@ impl ScalarUDFImpl for SparkMakeDtInterval { &self.signature } + /// https://github.com/lakehq/sail/blob/dc5368daa24d40a7758a299e1ba8fc985cb29108/docs/guide/dataframe/data-types/compatibility.md?plain=1#L260 + /// Spark return Type -> Arrow return type + /// interval day to second -> DataType::Duration(TimeUnit::Microsecond) fn return_type(&self, _arg_types: &[DataType]) -> Result { Ok(DataType::Duration(Microsecond)) } From 46b62f4b1662d24f9399f8eac4caa16e0fe9b0bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Fri, 26 Sep 2025 08:31:37 +0200 Subject: [PATCH 12/13] explain types fix url --- datafusion/spark/src/function/datetime/make_dt_interval.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 24458629aab6..1e6fc2299a4c 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -63,7 +63,7 @@ impl ScalarUDFImpl for SparkMakeDtInterval { &self.signature } - /// https://github.com/lakehq/sail/blob/dc5368daa24d40a7758a299e1ba8fc985cb29108/docs/guide/dataframe/data-types/compatibility.md?plain=1#L260 + /// /// Spark return Type -> Arrow return type /// interval day to second -> DataType::Duration(TimeUnit::Microsecond) fn return_type(&self, _arg_types: &[DataType]) -> Result { From 6cbaff7c6228add22c4fb48acd48abb8a098b7d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B3pez?= Date: Fri, 26 Sep 2025 15:27:41 +0200 Subject: [PATCH 13/13] better comment --- .../spark/src/function/datetime/make_dt_interval.rs | 9 ++++++--- datafusion/spark/src/function/datetime/mod.rs | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index 1e6fc2299a4c..c44ab69b8b30 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -63,9 +63,12 @@ impl ScalarUDFImpl for SparkMakeDtInterval { &self.signature } - /// - /// Spark return Type -> Arrow return type - /// interval day to second -> DataType::Duration(TimeUnit::Microsecond) + /// Note the return type is `DataType::Duration(TimeUnit::Microsecond)` and not `DataType::Interval(DayTime)` as you might expect. + /// This is because `DataType::Interval(DayTime)` has precision only to the millisecond, whilst Spark's `DayTimeIntervalType` has + /// precision to the microsecond. We use `DataType::Duration(TimeUnit::Microsecond)` in order to not lose any precision. See the + /// [Sail compatibility doc] for reference. + /// + /// [Sail compatibility doc]: https://github.com/lakehq/sail/blob/dc5368daa24d40a7758a299e1ba8fc985cb29108/docs/guide/dataframe/data-types/compatibility.md?plain=1#L260 fn return_type(&self, _arg_types: &[DataType]) -> Result { Ok(DataType::Duration(Microsecond)) } diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 6dffeb89097e..a6adc9960766 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -53,7 +53,7 @@ pub mod expr_fn { )); export_functions!(( make_dt_interval, - "Make a DayTime interval from given days, hours, mins and secs.", + "Make a day time interval from given days, hours, mins and secs (return type is actually a Duration(Microsecond))", days hours mins secs )); export_functions!((