Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions datafusion/functions/src/datetime/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use arrow::array::types::{
ArrowTimestampType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow::array::{Array, ArrayRef, Int64Array, PrimitiveArray};
use arrow::array::{Array, ArrayRef, AsArray, PrimitiveArray};
use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8, Utf8View};
use arrow::datatypes::Int64Type;
use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second};
use datafusion_common::cast::as_primitive_array;
use datafusion_common::{
Expand Down Expand Up @@ -455,36 +456,57 @@ fn general_date_trunc_array_fine_granularity<T: ArrowTimestampType>(
array: &PrimitiveArray<T>,
granularity: &str,
) -> Result<ArrayRef> {
let unit = match (tu, granularity) {
(Second, "minute") => Some(Int64Array::new_scalar(60)),
(Second, "hour") => Some(Int64Array::new_scalar(3600)),
(Second, "day") => Some(Int64Array::new_scalar(86400)),

(Millisecond, "second") => Some(Int64Array::new_scalar(1_000)),
(Millisecond, "minute") => Some(Int64Array::new_scalar(60_000)),
(Millisecond, "hour") => Some(Int64Array::new_scalar(3_600_000)),
(Millisecond, "day") => Some(Int64Array::new_scalar(86_400_000)),

(Microsecond, "millisecond") => Some(Int64Array::new_scalar(1_000)),
(Microsecond, "second") => Some(Int64Array::new_scalar(1_000_000)),
(Microsecond, "minute") => Some(Int64Array::new_scalar(60_000_000)),
(Microsecond, "hour") => Some(Int64Array::new_scalar(3_600_000_000)),
(Microsecond, "day") => Some(Int64Array::new_scalar(86_400_000_000)),

(Nanosecond, "microsecond") => Some(Int64Array::new_scalar(1_000)),
(Nanosecond, "millisecond") => Some(Int64Array::new_scalar(1_000_000)),
(Nanosecond, "second") => Some(Int64Array::new_scalar(1_000_000_000)),
(Nanosecond, "minute") => Some(Int64Array::new_scalar(60_000_000_000)),
(Nanosecond, "hour") => Some(Int64Array::new_scalar(3_600_000_000_000)),
(Nanosecond, "day") => Some(Int64Array::new_scalar(86_400_000_000_000)),
let unit: Option<i64> = match (tu, granularity) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key idea here is to make this code faster by reusing the allocation and operating in place rather than allocating new arrays

(Second, "minute") => Some(60),
(Second, "hour") => Some(3600),
(Second, "day") => Some(86400),

(Millisecond, "second") => Some(1_000),
(Millisecond, "minute") => Some(60_000),
(Millisecond, "hour") => Some(3_600_000),
(Millisecond, "day") => Some(86_400_000),

(Microsecond, "millisecond") => Some(1_000),
(Microsecond, "second") => Some(1_000_000),
(Microsecond, "minute") => Some(60_000_000),
(Microsecond, "hour") => Some(3_600_000_000),
(Microsecond, "day") => Some(86_400_000_000),

(Nanosecond, "microsecond") => Some(1_000),
(Nanosecond, "millisecond") => Some(1_000_000),
(Nanosecond, "second") => Some(1_000_000_000),
(Nanosecond, "minute") => Some(60_000_000_000),
(Nanosecond, "hour") => Some(3_600_000_000_000),
(Nanosecond, "day") => Some(86_400_000_000_000),
_ => None,
};

if let Some(unit) = unit {
let original_type = array.data_type();
let array = arrow::compute::cast(array, &DataType::Int64)?;
let array = arrow::compute::kernels::numeric::div(&array, &unit)?;
let array = arrow::compute::kernels::numeric::mul(&array, &unit)?;
let input = arrow::compute::cast(array, &DataType::Int64)?;
// Optimize performance by doing operations in place if possible
let array = input.as_primitive::<Int64Type>().clone();
drop(input); // ensure the input reference is dropped (so we can reuse the memory if possible)
let array = try_unary_mut_or_clone(array, |i| {
i.checked_div(unit)
.ok_or_else(|| exec_datafusion_err!("division overflow"))
})?;
let array = try_unary_mut_or_clone(array, |i| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically speaking, only the first try_unary_mut_or_clone is needed
on the second transformation, we're guaranteed to be the pointer into the array, and the so taking the or_clone path would be an error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, though I don't know how to represent this in code.

Maybe I could make a second function try_unary_mut_or_error that throws a runtime error 🤔

i.checked_mul(unit)
.ok_or_else(|| exec_datafusion_err!("multiplication overflow"))
})?;
let array = try_unary_mut_or_clone(array, |i| {
// For timestamps before 1970-01-01T00:00:00Z (negative values)
// it is possible that the truncated value is actually later
// than the original value. Correct any such cases by
// subtracting `unit`.
if i > 0 {
Ok(i)
} else {
i.checked_sub(unit)
.ok_or_else(|| exec_datafusion_err!("subtraction overflow"))
}
})?;
let array = arrow::compute::cast(&array, original_type)?;
Ok(array)
} else {
Expand All @@ -493,6 +515,21 @@ fn general_date_trunc_array_fine_granularity<T: ArrowTimestampType>(
}
}

/// Applies the unary operation in place if possible, or cloning the array if not
fn try_unary_mut_or_clone<F>(
array: PrimitiveArray<Int64Type>,
op: F,
) -> Result<PrimitiveArray<Int64Type>>
where
F: Fn(i64) -> Result<i64>,
Comment on lines +519 to +524
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really date_trunc specific. can this be made more flexible with a more generous use of generics?
perhaps it could even be in arrow-rs. it makes try_unary_mut significantly more approachable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree -- the try_unary_mut is quite awkward to use. I will see if I can port some of these changes upstream / see what they look like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
match array.try_unary_mut(&op) {
Ok(result) => result,
// on error, make a new array
Err(array) => array.try_unary(op),
}
}

// truncates a single value with the given timeunit to the specified granularity
fn general_date_trunc(
tu: TimeUnit,
Expand Down
7 changes: 7 additions & 0 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,13 @@ SELECT DATE_TRUNC('second', '2022-08-03 14:38:50Z');
----
2022-08-03T14:38:50

# DATE_TRUNC handling of times before the unix epoch (issue 18334)
query PPP
SELECT d, DATE_TRUNC('hour', d), DATE_TRUNC('hour', TIMESTAMP '1900-06-15 07:09:00')
FROM (VALUES (TIMESTAMP '1900-06-15 07:09:00')) AS t(d);
----
1900-06-15T07:09:00 1900-06-15T07:00:00 1900-06-15T07:00:00

# Test that interval can add a timestamp
query P
SELECT timestamp '2013-07-01 12:00:00' + INTERVAL '8' DAY;
Expand Down