Skip to content

Commit

Permalink
feat(cubestore): Introduce support for DATE_ADD (#3085)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jul 12, 2021
1 parent 57c2d38 commit 071d7b4
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 2 deletions.
56 changes: 56 additions & 0 deletions rust/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
t("rolling_window_join", rolling_window_join),
t("decimal_index", decimal_index),
t("float_index", float_index),
t("date_add", date_add),
t("now", now),
t("dump", dump),
];
Expand Down Expand Up @@ -2660,6 +2661,61 @@ async fn float_index(service: Box<dyn SqlClient>) {
}
}

async fn date_add(service: Box<dyn SqlClient>) {
let r = service
.exec_query(
"SELECT
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 second'),\
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 minute'),\
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 hour'),\
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 day'),\
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 day 1 hour 1 minute 1 second'),\
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 month'),\
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 year'),\
DATE_ADD(CAST('2021-01-01T00:00:00Z' as TIMESTAMP), INTERVAL '13 month'),\
DATE_ADD(CAST('2021-01-01T23:59:00Z' as TIMESTAMP), INTERVAL '1 minute'),\
DATE_ADD(CAST('2021-12-01T00:00:00Z' as TIMESTAMP), INTERVAL '1 month'),\
DATE_ADD(CAST('2021-12-31T00:00:00Z' as TIMESTAMP), INTERVAL '1 day'),\
DATE_ADD(CAST('2020-02-29T00:00:00Z' as TIMESTAMP), INTERVAL '1 day'),\
DATE_ADD(CAST('2020-02-28T00:00:00Z' as TIMESTAMP), INTERVAL '1 day'),\
DATE_ADD(CAST('2021-02-28T00:00:00Z' as TIMESTAMP), INTERVAL '1 day'),\
DATE_ADD(CAST('2020-02-29T00:00:00Z' as TIMESTAMP), INTERVAL '1 year'),\
DATE_ADD(CAST('2021-01-30T00:00:00Z' as TIMESTAMP), INTERVAL '1 month'),\
DATE_ADD(CAST('2020-01-29T00:00:00Z' as TIMESTAMP), INTERVAL '1 month'),\
DATE_ADD(CAST('2021-01-29T00:00:00Z' as TIMESTAMP), INTERVAL '1 month')\
",
)
.await
.unwrap();

assert_eq!(
to_rows(&r),
vec![vec![
// Simple tests for IntervalDayTime
TableValue::Timestamp(timestamp_from_string("2021-01-01T00:00:01Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-01-01T00:01:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-01-01T01:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-01-02T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-01-02T01:01:01Z").unwrap()),
// Simple tests for IntervalYearMonth
TableValue::Timestamp(timestamp_from_string("2021-02-01T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2022-01-01T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2022-02-01T00:00:00Z").unwrap()),
// Calculation logic
TableValue::Timestamp(timestamp_from_string("2021-01-02T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2022-01-01T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2022-01-01T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2020-03-01T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2020-02-29T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-03-01T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-02-28T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-02-28T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2020-02-29T00:00:00Z").unwrap()),
TableValue::Timestamp(timestamp_from_string("2021-02-28T00:00:00Z").unwrap()),
],]
);
}

async fn now(service: Box<dyn SqlClient>) {
let r = service.exec_query("SELECT now()").await.unwrap();
assert_eq!(r.get_rows().len(), 1);
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl ContextProvider for MetaStoreSchemaProvider {
"coalesce" | "COALESCE" => CubeScalarUDFKind::Coalesce,
"now" | "NOW" => CubeScalarUDFKind::Now,
"unix_timestamp" | "UNIX_TIMESTAMP" => CubeScalarUDFKind::UnixTimestamp,
"date_add" | "DATE_ADD" => CubeScalarUDFKind::DateAdd,
_ => return None,
};
return Some(Arc::new(scalar_udf_by_kind(kind).descriptor()));
Expand Down
156 changes: 154 additions & 2 deletions rust/cubestore/src/queryplanner/udfs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES};
use crate::queryplanner::hll::Hll;
use crate::CubeError;
use arrow::array::{Array, BinaryArray, UInt64Builder};
use arrow::datatypes::{DataType, TimeUnit};
use arrow::array::{Array, BinaryArray, TimestampNanosecondArray, UInt64Builder};
use arrow::datatypes::{DataType, TimeUnit, TimestampNanosecondType};
use chrono::{DateTime, Datelike, Duration, NaiveDate, TimeZone, Utc};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::functions::Signature;
use datafusion::physical_plan::udaf::AggregateUDF;
Expand All @@ -20,6 +21,7 @@ pub enum CubeScalarUDFKind {
Coalesce,
Now,
UnixTimestamp,
DateAdd,
}

pub trait CubeScalarUDF {
Expand All @@ -34,6 +36,7 @@ pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Box<dyn CubeScalarUDF> {
CubeScalarUDFKind::Coalesce => Box::new(Coalesce {}),
CubeScalarUDFKind::Now => Box::new(Now {}),
CubeScalarUDFKind::UnixTimestamp => Box::new(UnixTimestamp {}),
CubeScalarUDFKind::DateAdd => Box::new(DateAdd {}),
}
}

Expand All @@ -51,6 +54,9 @@ pub fn scalar_kind_by_name(n: &str) -> Option<CubeScalarUDFKind> {
if n == "UNIX_TIMESTAMP" {
return Some(CubeScalarUDFKind::UnixTimestamp);
}
if n == "DATE_ADD" {
return Some(CubeScalarUDFKind::DateAdd);
}
return None;
}

Expand Down Expand Up @@ -180,6 +186,152 @@ impl CubeScalarUDF for UnixTimestamp {
}
}

fn datetime_safety_unwrap(opt: Option<DateTime<Utc>>) -> Result<DateTime<Utc>, DataFusionError> {
if opt.is_some() {
return Ok(opt.unwrap());
}

return Err(DataFusionError::Internal(
"Unable to calculate operation between DateTime and Interval".to_string(),
));
}

fn last_day_of_month(year: i32, month: u32) -> u32 {
NaiveDate::from_ymd_opt(year, month + 1, 1)
.unwrap_or(NaiveDate::from_ymd(year + 1, 1, 1))
.pred()
.day()
}

struct DateAdd {}
impl DateAdd {
fn signature() -> Signature {
Signature::Any(2)
}
}
impl CubeScalarUDF for DateAdd {
fn kind(&self) -> CubeScalarUDFKind {
CubeScalarUDFKind::DateAdd
}

fn name(&self) -> &str {
"DATE_ADD"
}

fn descriptor(&self) -> ScalarUDF {
return ScalarUDF {
name: self.name().to_string(),
signature: Self::signature(),
return_type: Arc::new(|inputs| {
assert!(inputs.len() == 2);

// Right now, we support only TimeUnit::Nanosecond without TZ
Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
}),
fun: Arc::new(|inputs| {
if inputs.len() != 2 {
return Err(DataFusionError::Plan(
"Expected two arguments in DATE_ADD".to_string(),
));
}

let mut result_date = match &inputs[0] {
ColumnarValue::Scalar(scalar) => match (scalar.get_datatype(), scalar) {
// Right now, we support only TimeUnit::Nanosecond without TZ
(
DataType::Timestamp(TimeUnit::Nanosecond, None),
ScalarValue::TimestampNanosecond(Some(v)),
) => Utc.timestamp_nanos(*v),
_ => {
return Err(DataFusionError::Plan(
"First argument of `DATE_PART` must be non-null scalar TimestampNanosecond without timezone"
.to_string(),
));
}
},
_ => {
return Err(DataFusionError::Plan(
"First argument of `DATE_PART` must be non-null scalar TimestampNanosecond without timezone"
.to_string(),
));
}
};

match &inputs[1] {
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::IntervalYearMonth(Some(v)) => {
if *v < 0 {
return Err(DataFusionError::Plan(
"Second argument of `DATE_PART` must be a positive Interval"
.to_string(),
));
}

let years_to_add = *v / 12;
let months_to_add = (*v % 12) as u32;

let mut year = result_date.year() + years_to_add;
let mut month = result_date.month();
let mut day = result_date.day();

if month + months_to_add > 12 {
year += 1;
month = (month + months_to_add) - 12;
} else {
month += months_to_add;
}

assert!(month <= 12);

let days_in_month = last_day_of_month(year, month);

if day > days_in_month {
day = days_in_month;
}

result_date = datetime_safety_unwrap(result_date.with_day(1))?;

// @todo Optimize? Chrono is using string -> parsing and applying it back to obj
result_date = datetime_safety_unwrap(result_date.with_month(month))?;
result_date = datetime_safety_unwrap(result_date.with_year(year))?;
result_date = datetime_safety_unwrap(result_date.with_day(day))?;
}
ScalarValue::IntervalDayTime(Some(v)) => {
if *v < 0 {
return Err(DataFusionError::Plan(
"Second argument of `DATE_PART` must be a positive Interval"
.to_string(),
));
}

let days_parts: i64 = (((*v as u64) & 0xFFFFFFFF00000000) >> 32) as i64;
let milliseconds_part: i64 = ((*v as u64) & 0xFFFFFFFF) as i64;

result_date = result_date + Duration::days(days_parts);
result_date = result_date + Duration::milliseconds(milliseconds_part);
}
_ => {
return Err(DataFusionError::Plan(
"Second argument of `DATE_PART` must be non-null scalar of Interval type"
.to_string(),
));
}
},
_ => return Err(DataFusionError::Plan(
"Second argument of `DATE_PART` must be non-null scalar of Interval type"
.to_string(),
)),
}

let result =
TimestampNanosecondArray::from_vec(vec![result_date.timestamp_nanos()], None);

return Ok(ColumnarValue::Array(Arc::new(result)));
}),
};
}
}

struct HllCardinality {}
impl CubeScalarUDF for HllCardinality {
fn kind(&self) -> CubeScalarUDFKind {
Expand Down

0 comments on commit 071d7b4

Please sign in to comment.