diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b42616fab7a9..4795eb6b7c84 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1287,7 +1287,7 @@ dependencies = [ [[package]] name = "datafusion" version = "5.1.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8df4132b83d896a0d3db5c82a4eaaa3eaa285d15#8df4132b83d896a0d3db5c82a4eaaa3eaa285d15" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=868f3c4de13d13cda84cee33475b9782b94fa60c#868f3c4de13d13cda84cee33475b9782b94fa60c" dependencies = [ "ahash 0.7.4", "arrow 6.0.0", diff --git a/rust/cubesql/Cargo.toml b/rust/cubesql/Cargo.toml index 3cb386a8337a..9aef494b1ab5 100644 --- a/rust/cubesql/Cargo.toml +++ b/rust/cubesql/Cargo.toml @@ -9,7 +9,7 @@ documentation = "https://cube.dev/docs" homepage = "https://cube.dev" [dependencies] -datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "8df4132b83d896a0d3db5c82a4eaaa3eaa285d15", default-features = false, features = ["unicode_expressions"] } +datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "868f3c4de13d13cda84cee33475b9782b94fa60c", default-features = false, features = ["unicode_expressions"] } anyhow = "1.0" thiserror = "1.0" cubeclient = { path = "../cubeclient" } diff --git a/rust/cubesql/src/compile/engine/df/intervals.rs b/rust/cubesql/src/compile/engine/df/intervals.rs new file mode 100644 index 000000000000..9e6cb7e337be --- /dev/null +++ b/rust/cubesql/src/compile/engine/df/intervals.rs @@ -0,0 +1,51 @@ +#[macro_export] +macro_rules! make_string_interval_year_month { + ($array: ident, $row: ident) => {{ + let s = if $array.is_null($row) { + "NULL".to_string() + } else { + let interval = $array.value($row) as f64; + let years = (interval / 12_f64).floor(); + let month = interval - (years * 12_f64); + + format!( + "{} years {} mons 0 days 0 hours 0 mins 0.00 secs", + years, month, + ) + }; + + s + }}; +} + +#[macro_export] +macro_rules! make_string_interval_day_time { + ($array: ident, $row: ident) => {{ + let s = if $array.is_null($row) { + "NULL".to_string() + } else { + let value: u64 = $array.value($row) as u64; + + let days_parts: i32 = ((value & 0xFFFFFFFF00000000) >> 32) as i32; + let milliseconds_part: i32 = (value & 0xFFFFFFFF) as i32; + + let secs = milliseconds_part / 1000; + let mins = secs / 60; + let hours = mins / 60; + + let secs = secs - (mins * 60); + let mins = mins - (hours * 60); + + format!( + "0 years 0 mons {} days {} hours {} mins {}.{:02} secs", + days_parts, + hours, + mins, + secs, + (milliseconds_part % 1000), + ) + }; + + s + }}; +} diff --git a/rust/cubesql/src/compile/engine/df/mod.rs b/rust/cubesql/src/compile/engine/df/mod.rs index a19a97001182..3097523c73a2 100644 --- a/rust/cubesql/src/compile/engine/df/mod.rs +++ b/rust/cubesql/src/compile/engine/df/mod.rs @@ -1 +1,2 @@ pub mod coerce; +pub mod intervals; diff --git a/rust/cubesql/src/compile/engine/udf.rs b/rust/cubesql/src/compile/engine/udf.rs index 55b8bc1811fa..0e160b38117e 100644 --- a/rust/cubesql/src/compile/engine/udf.rs +++ b/rust/cubesql/src/compile/engine/udf.rs @@ -1,14 +1,19 @@ use std::any::type_name; use std::sync::Arc; + use datafusion::{ arrow::{ array::{ ArrayRef, BooleanArray, BooleanBuilder, GenericStringArray, Int32Builder, - PrimitiveArray, StringBuilder, UInt32Builder, + IntervalDayTimeBuilder, PrimitiveArray, StringBuilder, + UInt32Builder, }, compute::cast, - datatypes::{DataType, Int64Type}, + datatypes::{ + DataType, Int64Type, IntervalUnit, TimeUnit, + TimestampNanosecondType, + }, }, error::DataFusionError, logical_plan::create_udf, @@ -399,3 +404,63 @@ pub fn create_convert_tz_udf() -> ScalarUDF { &fun, ) } + +pub fn create_timediff_udf() -> ScalarUDF { + let fun = make_scalar_function(move |args: &[ArrayRef]| { + assert!(args.len() == 2); + + let left_dt = &args[0]; + let right_dt = &args[1]; + + let left_date = match left_dt.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let arr = downcast_primitive_arg!(left_dt, "left_dt", TimestampNanosecondType); + let ts = arr.value(0); + + // NaiveDateTime::from_timestamp(ts, 0) + ts + } + _ => { + return Err(DataFusionError::Execution(format!( + "left_dt argument must be a Timestamp, actual: {}", + left_dt.data_type() + ))); + } + }; + + let right_date = match right_dt.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let arr = downcast_primitive_arg!(right_dt, "right_dt", TimestampNanosecondType); + arr.value(0) + } + _ => { + return Err(DataFusionError::Execution(format!( + "right_dt argument must be a Timestamp, actual: {}", + right_dt.data_type() + ))); + } + }; + + let diff = right_date - left_date; + if diff != 0 { + return Err(DataFusionError::NotImplemented(format!( + "timediff is not implemented, it's stub" + ))); + } + + let mut interal_arr = IntervalDayTimeBuilder::new(1); + interal_arr.append_value(diff)?; + + Ok(Arc::new(interal_arr.finish()) as ArrayRef) + }); + + let return_type: ReturnTypeFunction = + Arc::new(move |_| Ok(Arc::new(DataType::Interval(IntervalUnit::DayTime)))); + + ScalarUDF::new( + "timediff", + &Signature::any(2, Volatility::Immutable), + &return_type, + &fun, + ) +} diff --git a/rust/cubesql/src/compile/mod.rs b/rust/cubesql/src/compile/mod.rs index a88da5773416..6121aa011283 100644 --- a/rust/cubesql/src/compile/mod.rs +++ b/rust/cubesql/src/compile/mod.rs @@ -32,8 +32,8 @@ use self::engine::context::SystemVar; use self::engine::provider::CubeContext; use self::engine::udf::{ create_connection_id_udf, create_convert_tz_udf, create_current_user_udf, create_db_udf, - create_if_udf, create_instr_udf, create_isnull_udf, create_least_udf, create_user_udf, - create_version_udf, + create_if_udf, create_instr_udf, create_isnull_udf, create_least_udf, create_timediff_udf, + create_user_udf, create_version_udf, }; use self::parser::parse_sql_to_statement; @@ -1450,6 +1450,7 @@ impl QueryPlanner { ctx.register_udf(create_if_udf()); ctx.register_udf(create_least_udf()); ctx.register_udf(create_convert_tz_udf()); + ctx.register_udf(create_timediff_udf()); let state = ctx.state.lock().unwrap().clone(); let cube_ctx = CubeContext::new(&state, &self.context.cubes); @@ -3225,6 +3226,25 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_timediff() -> Result<(), CubeError> { + assert_eq!( + execute_df_query( + "select \ + timediff('1994-11-26T13:25:00.000Z'::timestamp, '1994-11-26T13:25:00.000Z'::timestamp) as r1 + ".to_string() + ) + .await?, + "+------------------------------------------------+\n\ + | r1 |\n\ + +------------------------------------------------+\n\ + | 0 years 0 mons 0 days 0 hours 0 mins 0.00 secs |\n\ + +------------------------------------------------+" + ); + + Ok(()) + } + #[tokio::test] async fn test_metabase() -> Result<(), CubeError> { assert_eq!( diff --git a/rust/cubesql/src/mysql/dataframe.rs b/rust/cubesql/src/mysql/dataframe.rs index fa246aaaef51..244345842b67 100644 --- a/rust/cubesql/src/mysql/dataframe.rs +++ b/rust/cubesql/src/mysql/dataframe.rs @@ -3,9 +3,10 @@ use std::fmt::{self, Debug, Formatter}; use chrono::{SecondsFormat, TimeZone, Utc}; use comfy_table::{Cell, Table}; use datafusion::arrow::array::{ - Array, Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray, - UInt32Array, + Array, Float64Array, Int32Array, Int64Array, IntervalDayTimeArray, IntervalYearMonthArray, + StringArray, TimestampMicrosecondArray, UInt32Array, }; +use datafusion::arrow::datatypes::IntervalUnit; use datafusion::arrow::{ array::{BooleanArray, TimestampNanosecondArray, UInt64Array}, datatypes::{DataType, TimeUnit}, @@ -15,6 +16,7 @@ use log::{error, warn}; use msql_srv::{ColumnFlags, ColumnType}; use crate::{compile::builder::CompiledQueryFieldMeta, CubeError}; +use crate::{make_string_interval_day_time, make_string_interval_year_month}; #[derive(Clone, Debug)] pub struct Column { @@ -309,6 +311,7 @@ pub fn arrow_to_column_type(arrow_type: DataType) -> Result Ok(ColumnType::MYSQL_TYPE_BLOB), DataType::Utf8 | DataType::LargeUtf8 => Ok(ColumnType::MYSQL_TYPE_STRING), DataType::Timestamp(_, _) => Ok(ColumnType::MYSQL_TYPE_STRING), + DataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_STRING), DataType::Float16 | DataType::Float64 => Ok(ColumnType::MYSQL_TYPE_DOUBLE), DataType::Boolean => Ok(ColumnType::MYSQL_TYPE_TINY), DataType::Int8 @@ -402,6 +405,24 @@ pub fn batch_to_dataframe(batches: &Vec) -> Result { + let a = array + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..num_rows { + rows[i].push(TableValue::String(make_string_interval_day_time!(a, i))); + } + } + DataType::Interval(IntervalUnit::YearMonth) => { + let a = array + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..num_rows { + rows[i].push(TableValue::String(make_string_interval_year_month!(a, i))); + } + } DataType::Boolean => { let a = array.as_any().downcast_ref::().unwrap(); for i in 0..num_rows {