Skip to content

Commit

Permalink
feat(cubesql): Support to_char UDF (#4600)
Browse files Browse the repository at this point in the history
  • Loading branch information
gandronchik committed May 30, 2022
1 parent 005eaa9 commit 48077a9
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 15 deletions.
111 changes: 98 additions & 13 deletions rust/cubesql/cubesql/src/compile/engine/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use datafusion::{
new_null_array, Array, ArrayBuilder, ArrayRef, BooleanArray, BooleanBuilder,
Float64Array, GenericStringArray, Int32Array, Int64Array, Int64Builder,
IntervalDayTimeArray, IntervalDayTimeBuilder, ListArray, ListBuilder, PrimitiveArray,
PrimitiveBuilder, StringArray, StringBuilder, StructBuilder, TimestampNanosecondArray,
UInt32Array, UInt32Builder,
PrimitiveBuilder, StringArray, StringBuilder, StructBuilder, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt32Array,
UInt32Builder,
},
compute::{cast, concat},
datatypes::{
Expand Down Expand Up @@ -953,6 +954,21 @@ pub fn create_date_add_udf() -> ScalarUDF {
)
}

fn postgres_datetime_format_to_iso(format: String) -> String {
format
.replace("%i", "%M")
.replace("%s", "%S")
.replace(".%f", "%.f")
.replace("YYYY", "%Y")
.replace("DD", "%d")
.replace("HH24", "%H")
.replace("MI", "%M")
.replace("SS", "%S")
.replace(".US", "%.f")
.replace("MM", "%m")
.replace(".MS", "%.3f")
}

pub fn create_str_to_date_udf() -> ScalarUDF {
let fun: Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync> =
Arc::new(move |args: &[ColumnarValue]| {
Expand All @@ -978,17 +994,7 @@ pub fn create_str_to_date_udf() -> ScalarUDF {
}
};

let format = format
.replace("%i", "%M")
.replace("%s", "%S")
.replace(".%f", "%.f")
.replace("YYYY", "%Y")
.replace("DD", "%d")
.replace("HH24", "%H")
.replace("MI", "%M")
.replace("SS", "%S")
.replace(".US", "%.f")
.replace("MM", "%m");
let format = postgres_datetime_format_to_iso(format.clone());

let res = NaiveDateTime::parse_from_str(timestamp, &format).map_err(|e| {
DataFusionError::Execution(format!(
Expand Down Expand Up @@ -1031,6 +1037,85 @@ pub fn create_current_timestamp_udf() -> ScalarUDF {
)
}

macro_rules! parse_timestamp_arr {
($ARR:expr, $ARR_TYPE: ident, $FN_NAME: ident) => {{
let arr = $ARR.as_any().downcast_ref::<$ARR_TYPE>();
if arr.is_some() {
let mut result = Vec::new();
let arr = arr.unwrap();
for i in 0..arr.len() {
result.push(Duration::$FN_NAME(arr.value(i)));
}

Some(result)
} else {
None
}
}};
}

pub fn create_to_char_udf() -> ScalarUDF {
let fun: Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync> =
make_scalar_function(move |args: &[ArrayRef]| {
let arr = &args[0];
let (durations, timezone) = match arr.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, str) => (
parse_timestamp_arr!(arr, TimestampNanosecondArray, nanoseconds),
str.clone().unwrap_or_default(),
),
DataType::Timestamp(TimeUnit::Millisecond, str) => (
parse_timestamp_arr!(arr, TimestampMillisecondArray, milliseconds),
str.clone().unwrap_or_default(),
),
DataType::Timestamp(TimeUnit::Microsecond, str) => (
parse_timestamp_arr!(arr, TimestampMicrosecondArray, microseconds),
str.clone().unwrap_or_default(),
),
DataType::Timestamp(TimeUnit::Second, str) => (
parse_timestamp_arr!(arr, TimestampSecondArray, seconds),
str.clone().unwrap_or_default(),
),
_ => (None, "".to_string()),
};

if durations.is_none() {
return Err(DataFusionError::Execution(
"unsupported datetime format for to_char".to_string(),
));
}

let durations = durations.unwrap();
let formats = downcast_string_arg!(&args[1], "format_str", i32);

let mut builder = StringBuilder::new(durations.len());

for (i, duration) in durations.iter().enumerate() {
let format = formats.value(i);
let replaced_format =
postgres_datetime_format_to_iso(format.to_string()).replace("TZ", &timezone);

let secs = duration.num_seconds();
let nanosecs = duration.num_nanoseconds().unwrap_or(0) - secs * 1_000_000_000;
let timestamp = NaiveDateTime::from_timestamp(secs, nanosecs as u32);

builder
.append_value(timestamp.format(&replaced_format).to_string())
.unwrap();
}

Ok(Arc::new(builder.finish()) as ArrayRef)
});

let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(Arc::new(DataType::Utf8)));

ScalarUDF::new(
"to_char",
&Signature::any(2, Volatility::Immutable),
&return_type,
&fun,
)
}

pub fn create_current_schemas_udf() -> ScalarUDF {
let current_schemas = make_scalar_function(move |args: &[ArrayRef]| {
assert!(args.len() == 1);
Expand Down
47 changes: 45 additions & 2 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use self::{
create_pg_numeric_scale_udf, create_pg_table_is_visible_udf, create_pg_truetypid_udf,
create_pg_truetypmod_udf, create_pg_type_is_visible_udf, create_quarter_udf,
create_second_udf, create_str_to_date_udf, create_time_format_udf, create_timediff_udf,
create_ucase_udf, create_unnest_udtf, create_user_udf, create_version_udf,
create_year_udf,
create_to_char_udf, create_ucase_udf, create_unnest_udtf, create_user_udf,
create_version_udf, create_year_udf,
},
},
parser::parse_sql_to_statement,
Expand Down Expand Up @@ -2386,6 +2386,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
ctx.register_udf(create_pg_get_constraintdef_udf());
ctx.register_udf(create_pg_truetypid_udf());
ctx.register_udf(create_pg_truetypmod_udf());
ctx.register_udf(create_to_char_udf());

// udaf
ctx.register_udaf(create_measure_udaf());
Expand Down Expand Up @@ -7764,6 +7765,48 @@ ORDER BY \"COUNT(count)\" DESC"
Ok(())
}

#[tokio::test]
async fn test_to_char_udf() -> Result<(), CubeError> {
insta::assert_snapshot!(
"to_char_1",
execute_query(
"SELECT to_char(x, 'YYYY-MM-DD HH24:MI:SS.MS TZ') FROM (SELECT Str_to_date('2021-08-31 11:05:10.400000', '%Y-%m-%d %H:%i:%s.%f') x) e".to_string(),
DatabaseProtocol::PostgreSQL
)
.await?
);

insta::assert_snapshot!(
"to_char_2",
execute_query(
"
SELECT to_char(x, 'YYYY-MM-DD HH24:MI:SS.MS TZ')
FROM (
SELECT Str_to_date('2021-08-31 11:05:10.400000', '%Y-%m-%d %H:%i:%s.%f') x
UNION ALL
SELECT str_to_date('2021-08-31 11:05', '%Y-%m-%d %H:%i') x
) e
"
.to_string(),
DatabaseProtocol::PostgreSQL
)
.await?
);

Ok(())
}

#[tokio::test]
async fn test_metabase_to_char_query() -> Result<(), CubeError> {
execute_query(
"select to_char(current_timestamp, 'YYYY-MM-DD HH24:MI:SS.MS TZ')".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await?;

Ok(())
}

#[tokio::test]
async fn test_subquery_with_same_name_excel() -> Result<(), CubeError> {
insta::assert_snapshot!(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
source: cubesql/src/compile/mod.rs
expression: "execute_query(\"select to_char(current_timestamp, 'YYYY-MM-DD HH24:MI:SS.MS TZ')\".to_string(),\n DatabaseProtocol::PostgreSQL).await?"
---
+-------------------------------------------------------------+
| to_char(utctimestamp(),Utf8("YYYY-MM-DD HH24:MI:SS.MS TZ")) |
+-------------------------------------------------------------+
| 2022-05-26 11:18:59.122 |
+-------------------------------------------------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
source: cubesql/src/compile/mod.rs
expression: "execute_query(\"SELECT to_char(x, 'YYYY-MM-DD HH24:MI:SS.MS TZ') FROM (SELECT Str_to_date('2021-08-31 11:05:10.400000', '%Y-%m-%d %H:%i:%s.%f') x) e\".to_string(),\n DatabaseProtocol::PostgreSQL).await?"
---
+--------------------------------------------------+
| to_char(e.x,Utf8("YYYY-MM-DD HH24:MI:SS.MS TZ")) |
+--------------------------------------------------+
| 2021-08-31 11:05:10.400 |
+--------------------------------------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
source: cubesql/src/compile/mod.rs
expression: "execute_query(\"\n SELECT to_char(x, 'YYYY-MM-DD HH24:MI:SS.MS TZ') \n FROM (\n SELECT Str_to_date('2021-08-31 11:05:10.400000', '%Y-%m-%d %H:%i:%s.%f') x \n UNION ALL \n SELECT str_to_date('2021-08-31 11:05', '%Y-%m-%d %H:%i') x\n ) e\n \".to_string(),\n DatabaseProtocol::PostgreSQL).await?"
---
+--------------------------------------------------+
| to_char(e.x,Utf8("YYYY-MM-DD HH24:MI:SS.MS TZ")) |
+--------------------------------------------------+
| 2021-08-31 11:05:10.400 |
| 2021-08-31 11:05:00.000 |
+--------------------------------------------------+

0 comments on commit 48077a9

Please sign in to comment.