Skip to content

Commit

Permalink
feat(cubesql): Introduce timediff fn (stub)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Dec 9, 2021
1 parent cd88d26 commit 29dfb97
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 8 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cubesql/Cargo.toml
Expand Up @@ -9,7 +9,7 @@ documentation = "https://cube.dev/docs"
homepage = "https://cube.dev"

[dependencies]
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "8df4132b83d896a0d3db5c82a4eaaa3eaa285d15", default-features = false, features = ["unicode_expressions"] }
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "868f3c4de13d13cda84cee33475b9782b94fa60c", default-features = false, features = ["unicode_expressions"] }
anyhow = "1.0"
thiserror = "1.0"
cubeclient = { path = "../cubeclient" }
Expand Down
51 changes: 51 additions & 0 deletions rust/cubesql/src/compile/engine/df/intervals.rs
@@ -0,0 +1,51 @@
#[macro_export]
macro_rules! make_string_interval_year_month {
($array: ident, $row: ident) => {{
let s = if $array.is_null($row) {
"NULL".to_string()
} else {
let interval = $array.value($row) as f64;
let years = (interval / 12_f64).floor();
let month = interval - (years * 12_f64);

format!(
"{} years {} mons 0 days 0 hours 0 mins 0.00 secs",
years, month,
)
};

s
}};
}

#[macro_export]
macro_rules! make_string_interval_day_time {
($array: ident, $row: ident) => {{
let s = if $array.is_null($row) {
"NULL".to_string()
} else {
let value: u64 = $array.value($row) as u64;

let days_parts: i32 = ((value & 0xFFFFFFFF00000000) >> 32) as i32;
let milliseconds_part: i32 = (value & 0xFFFFFFFF) as i32;

let secs = milliseconds_part / 1000;
let mins = secs / 60;
let hours = mins / 60;

let secs = secs - (mins * 60);
let mins = mins - (hours * 60);

format!(
"0 years 0 mons {} days {} hours {} mins {}.{:02} secs",
days_parts,
hours,
mins,
secs,
(milliseconds_part % 1000),
)
};

s
}};
}
1 change: 1 addition & 0 deletions rust/cubesql/src/compile/engine/df/mod.rs
@@ -1 +1,2 @@
pub mod coerce;
pub mod intervals;
69 changes: 67 additions & 2 deletions rust/cubesql/src/compile/engine/udf.rs
@@ -1,14 +1,19 @@
use std::any::type_name;
use std::sync::Arc;


use datafusion::{
arrow::{
array::{
ArrayRef, BooleanArray, BooleanBuilder, GenericStringArray, Int32Builder,
PrimitiveArray, StringBuilder, UInt32Builder,
IntervalDayTimeBuilder, PrimitiveArray, StringBuilder,
UInt32Builder,
},
compute::cast,
datatypes::{DataType, Int64Type},
datatypes::{
DataType, Int64Type, IntervalUnit, TimeUnit,
TimestampNanosecondType,
},
},
error::DataFusionError,
logical_plan::create_udf,
Expand Down Expand Up @@ -399,3 +404,63 @@ pub fn create_convert_tz_udf() -> ScalarUDF {
&fun,
)
}

pub fn create_timediff_udf() -> ScalarUDF {
let fun = make_scalar_function(move |args: &[ArrayRef]| {
assert!(args.len() == 2);

let left_dt = &args[0];
let right_dt = &args[1];

let left_date = match left_dt.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
let arr = downcast_primitive_arg!(left_dt, "left_dt", TimestampNanosecondType);
let ts = arr.value(0);

// NaiveDateTime::from_timestamp(ts, 0)
ts
}
_ => {
return Err(DataFusionError::Execution(format!(
"left_dt argument must be a Timestamp, actual: {}",
left_dt.data_type()
)));
}
};

let right_date = match right_dt.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
let arr = downcast_primitive_arg!(right_dt, "right_dt", TimestampNanosecondType);
arr.value(0)
}
_ => {
return Err(DataFusionError::Execution(format!(
"right_dt argument must be a Timestamp, actual: {}",
right_dt.data_type()
)));
}
};

let diff = right_date - left_date;
if diff != 0 {
return Err(DataFusionError::NotImplemented(format!(
"timediff is not implemented, it's stub"
)));
}

let mut interal_arr = IntervalDayTimeBuilder::new(1);
interal_arr.append_value(diff)?;

Ok(Arc::new(interal_arr.finish()) as ArrayRef)
});

let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Interval(IntervalUnit::DayTime))));

ScalarUDF::new(
"timediff",
&Signature::any(2, Volatility::Immutable),
&return_type,
&fun,
)
}
24 changes: 22 additions & 2 deletions rust/cubesql/src/compile/mod.rs
Expand Up @@ -32,8 +32,8 @@ use self::engine::context::SystemVar;
use self::engine::provider::CubeContext;
use self::engine::udf::{
create_connection_id_udf, create_convert_tz_udf, create_current_user_udf, create_db_udf,
create_if_udf, create_instr_udf, create_isnull_udf, create_least_udf, create_user_udf,
create_version_udf,
create_if_udf, create_instr_udf, create_isnull_udf, create_least_udf, create_timediff_udf,
create_user_udf, create_version_udf,
};
use self::parser::parse_sql_to_statement;

Expand Down Expand Up @@ -1450,6 +1450,7 @@ impl QueryPlanner {
ctx.register_udf(create_if_udf());
ctx.register_udf(create_least_udf());
ctx.register_udf(create_convert_tz_udf());
ctx.register_udf(create_timediff_udf());

let state = ctx.state.lock().unwrap().clone();
let cube_ctx = CubeContext::new(&state, &self.context.cubes);
Expand Down Expand Up @@ -3225,6 +3226,25 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_timediff() -> Result<(), CubeError> {
assert_eq!(
execute_df_query(
"select \
timediff('1994-11-26T13:25:00.000Z'::timestamp, '1994-11-26T13:25:00.000Z'::timestamp) as r1
".to_string()
)
.await?,
"+------------------------------------------------+\n\
| r1 |\n\
+------------------------------------------------+\n\
| 0 years 0 mons 0 days 0 hours 0 mins 0.00 secs |\n\
+------------------------------------------------+"
);

Ok(())
}

#[tokio::test]
async fn test_metabase() -> Result<(), CubeError> {
assert_eq!(
Expand Down
25 changes: 23 additions & 2 deletions rust/cubesql/src/mysql/dataframe.rs
Expand Up @@ -3,9 +3,10 @@ use std::fmt::{self, Debug, Formatter};
use chrono::{SecondsFormat, TimeZone, Utc};
use comfy_table::{Cell, Table};
use datafusion::arrow::array::{
Array, Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
UInt32Array,
Array, Float64Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalYearMonthArray,
StringArray, TimestampMicrosecondArray, UInt32Array,
};
use datafusion::arrow::datatypes::IntervalUnit;
use datafusion::arrow::{
array::{BooleanArray, TimestampNanosecondArray, UInt64Array},
datatypes::{DataType, TimeUnit},
Expand All @@ -15,6 +16,7 @@ use log::{error, warn};
use msql_srv::{ColumnFlags, ColumnType};

use crate::{compile::builder::CompiledQueryFieldMeta, CubeError};
use crate::{make_string_interval_day_time, make_string_interval_year_month};

#[derive(Clone, Debug)]
pub struct Column {
Expand Down Expand Up @@ -309,6 +311,7 @@ pub fn arrow_to_column_type(arrow_type: DataType) -> Result<ColumnType, CubeErro
DataType::Binary => Ok(ColumnType::MYSQL_TYPE_BLOB),
DataType::Utf8 | DataType::LargeUtf8 => Ok(ColumnType::MYSQL_TYPE_STRING),
DataType::Timestamp(_, _) => Ok(ColumnType::MYSQL_TYPE_STRING),
DataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_STRING),
DataType::Float16 | DataType::Float64 => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
DataType::Boolean => Ok(ColumnType::MYSQL_TYPE_TINY),
DataType::Int8
Expand Down Expand Up @@ -402,6 +405,24 @@ pub fn batch_to_dataframe(batches: &Vec<RecordBatch>) -> Result<DataFrame, CubeE
});
}
}
DataType::Interval(IntervalUnit::DayTime) => {
let a = array
.as_any()
.downcast_ref::<IntervalDayTimeArray>()
.unwrap();
for i in 0..num_rows {
rows[i].push(TableValue::String(make_string_interval_day_time!(a, i)));
}
}
DataType::Interval(IntervalUnit::YearMonth) => {
let a = array
.as_any()
.downcast_ref::<IntervalYearMonthArray>()
.unwrap();
for i in 0..num_rows {
rows[i].push(TableValue::String(make_string_interval_year_month!(a, i)));
}
}
DataType::Boolean => {
let a = array.as_any().downcast_ref::<BooleanArray>().unwrap();
for i in 0..num_rows {
Expand Down

0 comments on commit 29dfb97

Please sign in to comment.