Skip to content

Commit

Permalink
fix(cubesql): Support Thoughtspot DATEADD queries
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou committed Dec 22, 2022
1 parent 5f63c02 commit 58b5669
Show file tree
Hide file tree
Showing 38 changed files with 1,955 additions and 215 deletions.
119 changes: 114 additions & 5 deletions rust/cubesql/cubesql/src/compile/engine/udf.rs
@@ -1,6 +1,6 @@
use std::{any::type_name, collections::HashMap, convert::TryFrom, sync::Arc, thread};

use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime};
use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, NaiveTime};
use datafusion::{
arrow::{
array::{
Expand Down Expand Up @@ -711,9 +711,50 @@ pub fn create_datediff_udf() -> ScalarUDF {
let fun = make_scalar_function(move |args: &[ArrayRef]| {
assert!(args.len() == 3);

return Err(DataFusionError::NotImplemented(format!(
"datediff is not implemented, it's stub"
)));
let datepart_array = downcast_string_arg!(args[0], "datepart", i32);
match (&args[1].data_type(), &args[2].data_type()) {
(
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
) => (),
_ => {
return Err(DataFusionError::Execution(format!(
"date arguments must be of type TimestampNanosecond, actual: {}, {}",
&args[1].data_type(),
&args[2].data_type(),
)));
}
}
let left_date_array =
downcast_primitive_arg!(args[1], "left_date", TimestampNanosecondType);
let right_date_array =
downcast_primitive_arg!(args[2], "right_date", TimestampNanosecondType);

let result = izip!(datepart_array, left_date_array, right_date_array)
.map(|args| {
match args {
(Some(datepart), Some(left_date), Some(right_date)) => {
match datepart.to_lowercase().as_str() {
// TODO: support more dateparts as needed
"day" | "days" | "d" => {
let nanoseconds_in_day = 86_400_000_000_000_i64;
let left_days = left_date / nanoseconds_in_day;
let right_days = right_date / nanoseconds_in_day;
let day_difference = right_days - left_days;
Ok(Some(day_difference))
}
_ => Err(DataFusionError::Execution(format!(
"unsupported DATEDIFF datepart: {}",
datepart,
))),
}
}
_ => Ok(None),
}
})
.collect::<Result<PrimitiveArray<Int64Type>>>()?;

Ok(Arc::new(result) as ArrayRef)
});

let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Int64)));
Expand All @@ -736,7 +777,8 @@ pub fn create_dateadd_udf() -> ScalarUDF {
)));
});

let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Int64)));
let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None))));

ScalarUDF::new(
"dateadd",
Expand Down Expand Up @@ -2804,3 +2846,70 @@ pub fn create_position_udf() -> ScalarUDF {
&fun,
)
}

pub fn create_date_to_timestamp_udf() -> ScalarUDF {
let fun = make_scalar_function(move |args: &[ArrayRef]| {
assert!(args.len() == 1);

match args[0].data_type() {
DataType::Date32 => {
let date_arr = downcast_primitive_arg!(args[0], "date", Date32Type);

let result = date_arr
.iter()
.map(|date| {
date.map(|date| {
let nanoseconds_in_day = 86_400_000_000_000_i64;
let timestamp = date as i64 * nanoseconds_in_day;
timestamp
})
})
.collect::<PrimitiveArray<TimestampNanosecondType>>();

Ok(Arc::new(result) as ArrayRef)
}
DataType::Utf8 => {
let date_arr = downcast_string_arg!(args[0], "date", i32);

let result = date_arr
.iter()
.map(|date| match date {
Some(date) => {
let date = NaiveDate::parse_from_str(date, "%Y-%m-%d")
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let time = NaiveTime::from_hms_opt(0, 0, 0).ok_or(
DataFusionError::Execution(
"Cannot initalize default zero NaiveTime".to_string(),
),
)?;
Ok(Some(NaiveDateTime::new(date, time).timestamp_nanos()))
}
None => Ok(None),
})
.collect::<Result<PrimitiveArray<TimestampNanosecondType>>>()?;

Ok(Arc::new(result) as ArrayRef)
}
_ => Err(DataFusionError::Execution(format!(
"DATE_TO_TIMESTAMP doesn't support this type: {}",
args[0].data_type(),
))),
}
});

let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None))));

ScalarUDF::new(
"date_to_timestamp",
&Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Date32]),
TypeSignature::Exact(vec![DataType::Utf8]),
],
Volatility::Immutable,
),
&return_type,
&fun,
)
}

0 comments on commit 58b5669

Please sign in to comment.