Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ workspace = true
[features]
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
# enable datetime functions
datetime_expressions = []
datetime_expressions = ["chrono-tz"]
# Enable encoding by default so the doctests work. In general don't automatically enable all packages.
default = [
"datetime_expressions",
Expand Down Expand Up @@ -71,6 +71,7 @@ base64 = { version = "0.22", optional = true }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.8", optional = true }
chrono = { workspace = true }
chrono-tz = { version = "0.10.4", optional = true }
datafusion-common = { workspace = true }
datafusion-doc = { workspace = true }
datafusion-execution = { workspace = true }
Expand Down
19 changes: 13 additions & 6 deletions datafusion/functions/benches/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ fn criterion_benchmark(c: &mut Criterion) {
let arg_field = Field::new("a", DataType::Utf8, false).into();
let arg_fields = vec![arg_field];
let config_options = Arc::new(ConfigOptions::default());
let to_timestamp_udf = to_timestamp(config_options.as_ref());

c.bench_function("to_timestamp_no_formats_utf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let arr_data = data();
let batch_len = arr_data.len();
let string_array = ColumnarValue::Array(Arc::new(arr_data) as ArrayRef);

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: vec![string_array.clone()],
arg_fields: arg_fields.clone(),
Expand All @@ -137,13 +139,14 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_no_formats_largeutf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let data = cast(&data(), &DataType::LargeUtf8).unwrap();
let batch_len = data.len();
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: vec![string_array.clone()],
arg_fields: arg_fields.clone(),
Expand All @@ -157,13 +160,14 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_no_formats_utf8view", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let data = cast(&data(), &DataType::Utf8View).unwrap();
let batch_len = data.len();
let string_array = ColumnarValue::Array(Arc::new(data) as ArrayRef);

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: vec![string_array.clone()],
arg_fields: arg_fields.clone(),
Expand All @@ -177,6 +181,7 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_with_formats_utf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let (inputs, format1, format2, format3) = data_with_formats();
let batch_len = inputs.len();

Expand All @@ -196,7 +201,7 @@ fn criterion_benchmark(c: &mut Criterion) {

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
Expand All @@ -210,6 +215,7 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_with_formats_largeutf8", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let (inputs, format1, format2, format3) = data_with_formats();
let batch_len = inputs.len();

Expand Down Expand Up @@ -237,7 +243,7 @@ fn criterion_benchmark(c: &mut Criterion) {

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
Expand All @@ -251,6 +257,7 @@ fn criterion_benchmark(c: &mut Criterion) {
});

c.bench_function("to_timestamp_with_formats_utf8view", |b| {
let to_timestamp_udf = Arc::clone(&to_timestamp_udf);
let (inputs, format1, format2, format3) = data_with_formats();

let batch_len = inputs.len();
Expand Down Expand Up @@ -279,7 +286,7 @@ fn criterion_benchmark(c: &mut Criterion) {

b.iter(|| {
black_box(
to_timestamp()
to_timestamp_udf
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
Expand Down
164 changes: 142 additions & 22 deletions datafusion/functions/src/datetime/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,54 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use std::marker::PhantomData;
use std::sync::{Arc, LazyLock};

use arrow::array::timezone::Tz;
use arrow::array::{
Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
StringArrayType, StringViewArray,
};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use arrow::compute::kernels::cast_utils::{
string_to_datetime, string_to_timestamp_nanos,
};
use arrow::datatypes::{DataType, TimeUnit};
use arrow_buffer::ArrowNativeType;
use chrono::format::{parse, Parsed, StrftimeItems};
use chrono::LocalResult::Single;
use chrono::{DateTime, TimeZone, Utc};

use datafusion_common::cast::as_generic_string_array;
use datafusion_common::{
exec_datafusion_err, exec_err, unwrap_or_internal_err, DataFusionError, Result,
ScalarType, ScalarValue,
exec_datafusion_err, exec_err, internal_datafusion_err, unwrap_or_internal_err,
DataFusionError, Result, ScalarValue,
};
use datafusion_expr::ColumnarValue;
use num_traits::{PrimInt, ToPrimitive};

/// Error message if nanosecond conversion request beyond supported interval
const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";

static UTC: LazyLock<Tz> = LazyLock::new(|| "UTC".parse().expect("UTC is always valid"));

#[expect(unused)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is crate-private why not remove it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I wasn't sure if it still could be used by a custom UDF that used the same package name. I was just being super cautious since I hate breaking other people's code needlessly.

/// Calls string_to_timestamp_nanos and converts the error type
pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
}

pub(crate) fn string_to_timestamp_nanos_with_timezone(
timezone: &Option<Tz>,
s: &str,
) -> Result<i64> {
let tz = timezone.unwrap_or(*UTC);
let dt = string_to_datetime(&tz, s)?;
let parsed = dt
.timestamp_nanos_opt()
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;

Ok(parsed)
}

/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
///
/// [Utf8]: DataType::Utf8
Expand Down Expand Up @@ -69,13 +90,12 @@ pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<
/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers
/// relative to the provided `timezone`
///
/// [IANA timezones] are only supported if the `arrow-array/chrono-tz` feature is enabled
///
/// * `2023-01-01 040506 America/Los_Angeles`
///
/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error
/// will be returned
///
/// Note that parsing [IANA timezones] is not supported yet in chrono - <https://github.com/chronotope/chrono/issues/38>
/// and this implementation only supports named timezones at the end of the string preceded by a space.
///
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
/// [IANA timezones]: https://www.iana.org/time-zones
pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
Expand All @@ -89,11 +109,55 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
)
};

let mut datetime_str = s;
let mut format = format;

// Manually handle the most common case of a named timezone at the end of the timestamp.
// Note that %+ handles 'Z' at the end of the string without a space. This code doesn't
// handle named timezones with no preceding space since that would require writing a
// custom parser (or switching to Jiff)
let tz: Option<chrono_tz::Tz> = if format.ends_with(" %Z") {
// grab the string after the last space as the named timezone
if let Some((dt_str, timezone_name)) = datetime_str.rsplit_once(' ') {
datetime_str = dt_str;

// attempt to parse the timezone name
let result: Result<chrono_tz::Tz, chrono_tz::ParseError> =
timezone_name.parse();
let Ok(tz) = result else {
return Err(err(&result.unwrap_err().to_string()));
};

// successfully parsed the timezone name, remove the ' %Z' from the format
format = &format[..format.len() - 3];

Some(tz)
} else {
None
}
} else if format.contains("%Z") {
return Err(err(
"'%Z' is only supported at the end of the format string preceded by a space",
));
} else {
None
};

let mut parsed = Parsed::new();
parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
parse(&mut parsed, datetime_str, StrftimeItems::new(format))
.map_err(|e| err(&e.to_string()))?;

// attempt to parse the string assuming it has a timezone
let dt = parsed.to_datetime();
let dt = match tz {
Some(tz) => {
// A timezone was manually parsed out, convert it to a fixed offset
match parsed.to_datetime_with_timezone(&tz) {
Ok(dt) => Ok(dt.fixed_offset()),
Err(e) => Err(e),
}
}
// default to parse the string assuming it has a timezone
None => parsed.to_datetime(),
};

if let Err(e) = &dt {
// no timezone or other failure, try without a timezone
Expand Down Expand Up @@ -141,6 +205,7 @@ pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
///
/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
#[inline]
#[expect(unused)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is crate-private why not remove it ?

pub(crate) fn string_to_timestamp_nanos_formatted(
s: &str,
format: &str,
Expand All @@ -152,6 +217,19 @@ pub(crate) fn string_to_timestamp_nanos_formatted(
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
}

pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
timezone: &Option<Tz>,
s: &str,
format: &str,
) -> Result<i64, DataFusionError> {
let dt = string_to_datetime_formatted(&timezone.unwrap_or(*UTC), s, format)?;
let parsed = dt
.timestamp_nanos_opt()
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;

Ok(parsed)
}

/// Accepts a string with a `chrono` format and converts it to a
/// millisecond precision timestamp.
///
Expand All @@ -176,14 +254,50 @@ pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Res
.timestamp_millis())
}

pub(crate) fn handle<O, F, S>(
pub(crate) struct ScalarDataType<T: PrimInt> {
data_type: DataType,
_marker: PhantomData<T>,
}

impl<T: PrimInt> ScalarDataType<T> {
pub(crate) fn new(dt: DataType) -> Self {
Self {
data_type: dt,
_marker: PhantomData,
}
}

fn scalar(&self, r: Option<i64>) -> Result<ScalarValue> {
match &self.data_type {
DataType::Date32 => Ok(ScalarValue::Date32(r.and_then(|v| v.to_i32()))),
DataType::Timestamp(u, tz) => match u {
TimeUnit::Second => Ok(ScalarValue::TimestampSecond(r, tz.clone())),
TimeUnit::Millisecond => {
Ok(ScalarValue::TimestampMillisecond(r, tz.clone()))
}
TimeUnit::Microsecond => {
Ok(ScalarValue::TimestampMicrosecond(r, tz.clone()))
}
TimeUnit::Nanosecond => {
Ok(ScalarValue::TimestampNanosecond(r, tz.clone()))
}
},
t => Err(internal_datafusion_err!(
"Unsupported data type for ScalarDataType<T>: {t:?}"
)),
}
}
}

pub(crate) fn handle<O, F, T>(
args: &[ColumnarValue],
op: F,
name: &str,
sdt: &ScalarDataType<T>,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
T: PrimInt,
F: Fn(&str) -> Result<O::Native>,
{
match &args[0] {
Expand All @@ -210,8 +324,13 @@ where
},
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(a) => {
let result = a.as_ref().map(|x| op(x)).transpose()?;
Ok(ColumnarValue::Scalar(S::scalar(result)))
let result = a
.as_ref()
.map(|x| op(x))
.transpose()?
.and_then(|v| v.to_i64());
let s = sdt.scalar(result)?;
Ok(ColumnarValue::Scalar(s))
}
_ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
},
Expand All @@ -221,17 +340,18 @@ where
// Given a function that maps a `&str`, `&str` to an arrow native type,
// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
// depending on the `args`'s variant.
pub(crate) fn handle_multiple<O, F, S, M>(
pub(crate) fn handle_multiple<O, F, M, T>(
args: &[ColumnarValue],
op: F,
op2: M,
name: &str,
sdt: &ScalarDataType<T>,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&str, &str) -> Result<O::Native>,
M: Fn(O::Native) -> O::Native,
T: PrimInt,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
Expand Down Expand Up @@ -286,9 +406,9 @@ where
if let Some(s) = x {
match op(a, s.as_str()) {
Ok(r) => {
ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
op2(r),
)))));
let result = op2(r).to_i64();
let s = sdt.scalar(result)?;
ret = Some(Ok(ColumnarValue::Scalar(s)));
break;
}
Err(e) => ret = Some(Err(e)),
Expand Down
Loading