Skip to content

Commit

Permalink
feat(cubestore): add date_sub function
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-biryukov committed Jul 28, 2021
1 parent 9d935d6 commit 3bf2520
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 43 deletions.
38 changes: 35 additions & 3 deletions rust/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2672,47 +2672,79 @@ async fn float_index(service: Box<dyn SqlClient>) {
}

async fn date_add(service: Box<dyn SqlClient>) {
let check_adds_to = |t, i, expected| {
let check_fun = |name, t, i, expected| {
let expected = timestamp_from_string(expected).unwrap();
let service = &service;
async move {
let actual = service
.exec_query(&format!(
"SELECT DATE_ADD(CAST('{}' as TIMESTAMP), INTERVAL '{}')",
t, i
"SELECT {}(CAST('{}' as TIMESTAMP), INTERVAL '{}')",
name, t, i
))
.await
.unwrap();
assert_eq!(to_rows(&actual), rows(&[expected]));
}
};
let check_adds_to = |t, i, expected| check_fun("DATE_ADD", t, i, expected);
let check_subs_to = |t, i, expected| check_fun("DATE_SUB", t, i, expected);

check_adds_to("2021-01-01T00:00:00Z", "1 second", "2021-01-01T00:00:01Z").await;
check_adds_to("2021-01-01T00:00:00Z", "1 minute", "2021-01-01T00:01:00Z").await;
check_adds_to("2021-01-01T00:00:00Z", "1 hour", "2021-01-01T01:00:00Z").await;
check_adds_to("2021-01-01T00:00:00Z", "1 day", "2021-01-02T00:00:00Z").await;

check_adds_to(
"2021-01-01T00:00:00Z",
"1 day 1 hour 1 minute 1 second",
"2021-01-02T01:01:01Z",
)
.await;
check_subs_to(
"2021-01-02T01:01:01Z",
"1 day 1 hour 1 minute 1 second",
"2021-01-01T00:00:00Z",
)
.await;

check_adds_to("2021-01-01T00:00:00Z", "1 month", "2021-02-01T00:00:00Z").await;

check_adds_to("2021-01-01T00:00:00Z", "1 year", "2022-01-01T00:00:00Z").await;
check_subs_to("2022-01-01T00:00:00Z", "1 year", "2021-01-01T00:00:00Z").await;

check_adds_to("2021-01-01T00:00:00Z", "13 month", "2022-02-01T00:00:00Z").await;
check_subs_to("2022-02-01T00:00:00Z", "13 month", "2021-01-01T00:00:00Z").await;

check_adds_to("2021-01-01T23:59:00Z", "1 minute", "2021-01-02T00:00:00Z").await;
check_subs_to("2021-01-02T00:00:00Z", "1 minute", "2021-01-01T23:59:00Z").await;

check_adds_to("2021-12-01T00:00:00Z", "1 month", "2022-01-01T00:00:00Z").await;
check_subs_to("2022-01-01T00:00:00Z", "1 month", "2021-12-01T00:00:00Z").await;

check_adds_to("2021-12-31T00:00:00Z", "1 day", "2022-01-01T00:00:00Z").await;
check_subs_to("2022-01-01T00:00:00Z", "1 day", "2021-12-31T00:00:00Z").await;

// Feb 29 on leap and non-leap years.
check_adds_to("2020-02-29T00:00:00Z", "1 day", "2020-03-01T00:00:00Z").await;
check_subs_to("2020-03-01T00:00:00Z", "1 day", "2020-02-29T00:00:00Z").await;

check_adds_to("2020-02-28T00:00:00Z", "1 day", "2020-02-29T00:00:00Z").await;
check_subs_to("2020-02-29T00:00:00Z", "1 day", "2020-02-28T00:00:00Z").await;

check_adds_to("2021-02-28T00:00:00Z", "1 day", "2021-03-01T00:00:00Z").await;
check_subs_to("2021-03-01T00:00:00Z", "1 day", "2021-02-28T00:00:00Z").await;

check_adds_to("2020-02-29T00:00:00Z", "1 year", "2021-02-28T00:00:00Z").await;
check_subs_to("2020-02-29T00:00:00Z", "1 year", "2019-02-28T00:00:00Z").await;

check_adds_to("2020-01-30T00:00:00Z", "1 month", "2020-02-29T00:00:00Z").await;
check_subs_to("2020-03-30T00:00:00Z", "1 month", "2020-02-29T00:00:00Z").await;

check_adds_to("2020-01-29T00:00:00Z", "1 month", "2020-02-29T00:00:00Z").await;
check_subs_to("2020-03-29T00:00:00Z", "1 month", "2020-02-29T00:00:00Z").await;

check_adds_to("2021-01-29T00:00:00Z", "1 month", "2021-02-28T00:00:00Z").await;
check_subs_to("2021-03-29T00:00:00Z", "1 month", "2021-02-28T00:00:00Z").await;

// Invalid types passed to date_add.
service
Expand Down
61 changes: 37 additions & 24 deletions rust/cubestore/src/queryplanner/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,63 @@ use chrono::{DateTime, Datelike, Duration, NaiveDate, Utc};
use datafusion::error::DataFusionError;
use datafusion::scalar::ScalarValue;

pub fn date_add(t: DateTime<Utc>, i: ScalarValue) -> Result<DateTime<Utc>, DataFusionError> {
pub fn date_addsub(
t: DateTime<Utc>,
i: ScalarValue,
is_add: bool,
) -> Result<DateTime<Utc>, DataFusionError> {
match i {
ScalarValue::IntervalYearMonth(Some(v)) => {
if v < 0 {
return Err(DataFusionError::Plan(
"Second argument of `DATE_ADD` must be a positive".to_string(),
));
}
let v = match is_add {
true => v,
false => -v,
};

let mut year = t.year();
let mut month = t.month();
// Note month is numbered 0..11 in this function.
let mut month = t.month() as i32 - 1;

year += v / 12;
month += (v % 12) as u32;
if 12 < month {
year += 1;
month -= 12;
month += v % 12;

if month < 0 {
year -= 1;
month += 12;
}
assert!(month <= 12);
debug_assert!(0 <= month);
year += month / 12;
month = month % 12;

match change_ym(t, year, month) {
match change_ym(t, year, 1 + month as u32) {
Some(t) => return Ok(t),
None => {
return Err(DataFusionError::Execution(format!(
"Failed to set date to ({}-{})",
year, month
year,
1 + month
)))
}
};
}
ScalarValue::IntervalDayTime(Some(v)) => {
if v < 0 {
return Err(DataFusionError::Plan(
"Second argument of `DATE_ADD` must be positive".to_string(),
));
}
let days: i64 = v >> 32;
let millis: i64 = v & 0xFFFFFFFF;
let v = match is_add {
true => v,
false => -v,
};

let days: i64 = v.signum() * (v.abs() >> 32);
let millis: i64 = v.signum() * ((v.abs() << 32) >> 32);
return Ok(t + Duration::days(days) + Duration::milliseconds(millis));
}
_ => {
return Err(DataFusionError::Plan(
"Second argument of `DATE_ADD` must be a non-null interval".to_string(),
));
let name = match is_add {
true => "DATE_ADD",
false => "DATE_SUB",
};
return Err(DataFusionError::Plan(format!(
"Second argument of `{}` must be a non-null interval",
name
)));
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl ContextProvider for MetaStoreSchemaProvider {
"now" | "NOW" => CubeScalarUDFKind::Now,
"unix_timestamp" | "UNIX_TIMESTAMP" => CubeScalarUDFKind::UnixTimestamp,
"date_add" | "DATE_ADD" => CubeScalarUDFKind::DateAdd,
"date_sub" | "DATE_SUB" => CubeScalarUDFKind::DateSub,
_ => return None,
};
return Some(Arc::new(scalar_udf_by_kind(kind).descriptor()));
Expand Down
55 changes: 40 additions & 15 deletions rust/cubestore/src/queryplanner/udfs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES};
use crate::queryplanner::datetime::date_add;
use crate::queryplanner::datetime::date_addsub;
use crate::queryplanner::hll::Hll;
use crate::CubeError;
use arrow::array::{Array, BinaryArray, UInt64Builder};
Expand All @@ -23,6 +23,7 @@ pub enum CubeScalarUDFKind {
Now,
UnixTimestamp,
DateAdd,
DateSub,
}

pub trait CubeScalarUDF {
Expand All @@ -37,7 +38,8 @@ pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Box<dyn CubeScalarUDF> {
CubeScalarUDFKind::Coalesce => Box::new(Coalesce {}),
CubeScalarUDFKind::Now => Box::new(Now {}),
CubeScalarUDFKind::UnixTimestamp => Box::new(UnixTimestamp {}),
CubeScalarUDFKind::DateAdd => Box::new(DateAdd {}),
CubeScalarUDFKind::DateAdd => Box::new(DateAddSub { is_add: true }),
CubeScalarUDFKind::DateSub => Box::new(DateAddSub { is_add: false }),
}
}

Expand All @@ -58,6 +60,9 @@ pub fn scalar_kind_by_name(n: &str) -> Option<CubeScalarUDFKind> {
if n == "DATE_ADD" {
return Some(CubeScalarUDFKind::DateAdd);
}
if n == "DATE_SUB" {
return Some(CubeScalarUDFKind::DateSub);
}
return None;
}

Expand Down Expand Up @@ -187,8 +192,11 @@ impl CubeScalarUDF for UnixTimestamp {
}
}

struct DateAdd {}
impl DateAdd {
struct DateAddSub {
is_add: bool,
}

impl DateAddSub {
fn signature() -> Signature {
Signature::OneOf(vec![
Signature::Exact(vec![
Expand All @@ -202,45 +210,62 @@ impl DateAdd {
])
}
}
impl CubeScalarUDF for DateAdd {

impl DateAddSub {
fn name_static(&self) -> &'static str {
match self.is_add {
true => "DATE_ADD",
false => "DATE_SUB",
}
}
}

impl CubeScalarUDF for DateAddSub {
fn kind(&self) -> CubeScalarUDFKind {
CubeScalarUDFKind::DateAdd
match self.is_add {
true => CubeScalarUDFKind::DateAdd,
false => CubeScalarUDFKind::DateSub,
}
}

fn name(&self) -> &str {
"DATE_ADD"
self.name_static()
}

fn descriptor(&self) -> ScalarUDF {
let name = self.name_static();
let is_add = self.is_add;
return ScalarUDF {
name: self.name().to_string(),
signature: Self::signature(),
return_type: Arc::new(|_| {
Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
}),
fun: Arc::new(|inputs| {
fun: Arc::new(move |inputs| {
assert_eq!(inputs.len(), 2);
// TODO: support arrays as inputs.
let t = match &inputs[0] {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(t))) => {
Utc.timestamp_nanos(*t)
}
_ => {
return Err(DataFusionError::Execution(
"First argument of `DATE_ADD` must be a non-null timestamp".to_string(),
))
return Err(DataFusionError::Execution(format!(
"First argument of `{}` must be a non-null timestamp",
name
)))
}
};
let i = match &inputs[1] {
ColumnarValue::Scalar(i) => i,
_ => {
return Err(DataFusionError::Execution(
"Second argument of `DATE_ADD` must be a non-null interval".to_string(),
))
return Err(DataFusionError::Execution(format!(
"Second argument of `{}` must be a non-null interval",
name
)))
}
};
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(date_add(t, i.clone())?.timestamp_nanos()),
Some(date_addsub(t, i.clone(), is_add)?.timestamp_nanos()),
)))
}),
};
Expand Down
13 changes: 12 additions & 1 deletion rust/cubestore/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::CubeError;
use chrono::{SecondsFormat, TimeZone, Utc};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use std::fmt::{Debug, Formatter};

pub mod data;
pub(crate) mod parquet;
Expand All @@ -21,7 +23,7 @@ pub enum TableValue {
Boolean(bool),
}

#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Debug, Hash)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct TimestampValue {
unix_nano: i64,
}
Expand All @@ -36,6 +38,15 @@ impl TimestampValue {
}
}

impl Debug for TimestampValue {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("TimestampValue")
.field("unix_nano", &self.unix_nano)
.field("str", &self.to_string())
.finish()
}
}

impl ToString for TimestampValue {
fn to_string(&self) -> String {
Utc.timestamp_nanos(self.unix_nano)
Expand Down

0 comments on commit 3bf2520

Please sign in to comment.