Skip to content

Commit

Permalink
feat(cubesql): Metabase interval date range filter support (#4763)
Browse files Browse the repository at this point in the history
  • Loading branch information
gandronchik committed Jun 24, 2022
1 parent 2738417 commit 221715a
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 23 deletions.
12 changes: 9 additions & 3 deletions rust/cubesql/cubesql/src/compile/engine/udf.rs
Expand Up @@ -948,10 +948,16 @@ pub fn create_date_add_udf() -> ScalarUDF {

ScalarUDF::new(
"date_add",
&Signature::exact(
&Signature::one_of(
vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Interval(IntervalUnit::DayTime),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Interval(IntervalUnit::DayTime),
]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())),
DataType::Interval(IntervalUnit::DayTime),
]),
],
Volatility::Immutable,
),
Expand Down
58 changes: 54 additions & 4 deletions rust/cubesql/cubesql/src/compile/mod.rs
Expand Up @@ -2739,6 +2739,7 @@ mod tests {
};
use datafusion::dataframe::DataFrame as DFDataFrame;
use pretty_assertions::assert_eq;
use regex::Regex;

use super::*;
use crate::{
Expand Down Expand Up @@ -2959,7 +2960,6 @@ mod tests {
parent.accept(&mut visitor).unwrap();
visitor.0.expect("No CubeScanNode was found in plan")
}

trait LogicalPlanTestUtils {
fn find_projection_schema(&self) -> DFSchemaRef;

Expand Down Expand Up @@ -3741,11 +3741,18 @@ mod tests {
DatabaseProtocol::PostgreSQL,
);

let logical_plan = query_plan.print(true).unwrap();
let logical_plan = &query_plan.print(true).unwrap();

let re = Regex::new(r"TimestampNanosecond\(\d+, None\)").unwrap();
let logical_plan = re
.replace_all(logical_plan, "TimestampNanosecond(0, None)")
.as_ref()
.to_string();

assert_eq!(
logical_plan,
"Projection: CAST(utctimestamp() AS Timestamp(Nanosecond, None)) AS COL\
\n EmptyRelation"
"Projection: CAST(TimestampNanosecond(0, None) AS Timestamp(Nanosecond, None)) AS COL\
\n EmptyRelation",
);
}

Expand Down Expand Up @@ -9370,4 +9377,47 @@ ORDER BY \"COUNT(count)\" DESC"

Ok(())
}

#[test]
fn metabase_interval_date_range_filter() {
let logical_plan = convert_select_to_query_plan(
"
SELECT COUNT(*)
FROM KibanaSampleDataEcommerce
WHERE KibanaSampleDataEcommerce.order_date >= CAST((CAST(now() AS timestamp) + (INTERVAL '-30 day')) AS date);
".to_string(),
DatabaseProtocol::PostgreSQL
).as_logical_plan();

let filters = logical_plan
.find_cube_scan()
.request
.filters
.unwrap_or_default();
let filter_vals = if filters.len() > 0 {
filters[0].values.clone()
} else {
None
};

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]),
dimensions: Some(vec![]),
segments: Some(vec![]),
time_dimensions: None,
order: None,
limit: None,
offset: None,
filters: Some(vec![V1LoadRequestQueryFilterItem {
member: Some("KibanaSampleDataEcommerce.order_date".to_string()),
operator: Some("afterDate".to_string()),
values: filter_vals,
or: None,
and: None,
},])
}
)
}
}
56 changes: 40 additions & 16 deletions rust/cubesql/cubesql/src/compile/rewrite/analysis.rs
Expand Up @@ -347,7 +347,9 @@ impl LogicalPlanAnalysis {
.ok()?;

if let Expr::ScalarFunction { fun, .. } = &expr {
if fun.volatility() == Volatility::Immutable {
if fun.volatility() == Volatility::Immutable
|| fun.volatility() == Volatility::Stable
{
Self::eval_constant_expr(&egraph, &expr)
} else {
None
Expand Down Expand Up @@ -376,6 +378,21 @@ impl LogicalPlanAnalysis {
)
.ok()?;

match &expr {
Expr::BinaryExpr { left, right, .. } => match (&**left, &**right) {
(Expr::Literal(ScalarValue::IntervalYearMonth(_)), Expr::Literal(_))
| (Expr::Literal(ScalarValue::IntervalDayTime(_)), Expr::Literal(_))
| (Expr::Literal(ScalarValue::IntervalMonthDayNano(_)), Expr::Literal(_))
| (Expr::Literal(_), Expr::Literal(ScalarValue::IntervalYearMonth(_)))
| (Expr::Literal(_), Expr::Literal(ScalarValue::IntervalDayTime(_)))
| (Expr::Literal(_), Expr::Literal(ScalarValue::IntervalMonthDayNano(_))) => {
return None
}
_ => (),
},
_ => (),
}

Self::eval_constant_expr(&egraph, &expr)
}
_ => None,
Expand Down Expand Up @@ -412,16 +429,18 @@ impl LogicalPlanAnalysis {
) -> Option<ScalarValue> {
let schema = DFSchema::empty();
let arrow_schema = Arc::new(schema.to_owned().into());
let physical_expr = egraph
.analysis
.planner
.create_physical_expr(
&expr,
&schema,
&arrow_schema,
&egraph.analysis.cube_context.state,
)
.expect(&format!("Can't plan expression: {:?}", expr));
let physical_expr = match egraph.analysis.planner.create_physical_expr(
&expr,
&schema,
&arrow_schema,
&egraph.analysis.cube_context.state,
) {
Ok(res) => res,
Err(e) => {
log::trace!("Can't plan expression: {:?}", e);
return None;
}
};
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"placeholder",
Expand All @@ -431,19 +450,24 @@ impl LogicalPlanAnalysis {
vec![Arc::new(NullArray::new(1))],
)
.unwrap();
let value = physical_expr
.evaluate(&batch)
.expect(&format!("Can't evaluate expression: {:?}", expr));
let value = match physical_expr.evaluate(&batch) {
Ok(res) => res,
Err(e) => {
log::trace!("Can't evaluate expression: {:?}", e);
return None;
}
};
Some(match value {
ColumnarValue::Scalar(value) => value,
ColumnarValue::Array(arr) => {
if arr.len() == 1 {
ScalarValue::try_from_array(&arr, 0).unwrap()
} else {
panic!(
log::trace!(
"Expected one row but got {} during constant eval",
arr.len()
)
);
return None;
}
}
})
Expand Down
15 changes: 15 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/dates.rs
Expand Up @@ -259,6 +259,21 @@ impl RewriteRules for DateRules {
vec![literal_string("week"), column_expr("?column")],
),
),
rewrite(
"metabase-interval-date-range",
binary_expr(
cast_expr(fun_expr("Now", Vec::<String>::new()), "?data_type"),
"+",
literal_expr("?interval"),
),
udf_expr(
"date_add",
vec![
fun_expr("Now", Vec::<String>::new()),
literal_expr("?interval"),
],
),
),
]
}
}
Expand Down

0 comments on commit 221715a

Please sign in to comment.