Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubesql): Projection aggregate split to support Tableau casts #4435

Merged
merged 1 commit into from Apr 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions packages/cubejs-backend-native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions rust/cubesql/cubesql/src/compile/mod.rs
Expand Up @@ -3154,6 +3154,60 @@ mod tests {
);
}

#[test]
fn tableau_group_by_month() {
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT COUNT(\"KibanaSampleDataEcommerce\".\"count\") AS \"sum:bytesBilled:ok\",\n DATE_TRUNC( 'MONTH', CAST(\"KibanaSampleDataEcommerce\".\"order_date\" AS TIMESTAMP) ) AS \"tmn:timestamp:ok\"\nFROM \"public\".\"KibanaSampleDataEcommerce\" \"KibanaSampleDataEcommerce\"\nGROUP BY 2".to_string(),
DatabaseProtocol::PostgreSQL,
);

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string(),]),
segments: Some(vec![]),
dimensions: Some(vec![]),
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
granularity: Some("month".to_string()),
date_range: None,
}]),
order: None,
limit: None,
offset: None,
filters: None,
}
);
}

#[test]
fn tableau_group_by_month_and_dimension() {
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT CAST(\"KibanaSampleDataEcommerce\".\"customer_gender\" AS TEXT) AS \"query\",\n SUM(\"KibanaSampleDataEcommerce\".\"count\") AS \"sum:bytesBilled:ok\"\nFROM \"public\".\"KibanaSampleDataEcommerce\" \"KibanaSampleDataEcommerce\"\nGROUP BY 1".to_string(),
DatabaseProtocol::PostgreSQL,
);

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string(),]),
segments: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
time_dimensions: None,
order: None,
limit: None,
offset: None,
filters: None,
}
);
}

#[test]
fn test_select_aggregations() {
let variants = vec![
Expand Down
63 changes: 53 additions & 10 deletions rust/cubesql/cubesql/src/compile/rewrite/analysis.rs
Expand Up @@ -8,12 +8,13 @@ use crate::compile::rewrite::DimensionName;
use crate::compile::rewrite::LiteralExprValue;
use crate::compile::rewrite::LogicalPlanLanguage;
use crate::compile::rewrite::MeasureName;
use crate::compile::rewrite::ScalarFunctionExprFun;
use crate::compile::rewrite::TimeDimensionName;
use crate::var_iter;
use crate::CubeError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::logical_plan::{DFSchema, Expr};
use datafusion::physical_plan::functions::Volatility;
use datafusion::physical_plan::functions::{BuiltinScalarFunction, Volatility};
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::{ColumnarValue, PhysicalPlanner};
use datafusion::scalar::ScalarValue;
Expand All @@ -31,6 +32,7 @@ pub struct LogicalPlanData {
pub referenced_expr: Option<Vec<Expr>>,
pub constant: Option<ScalarValue>,
pub constant_in_list: Option<Vec<ScalarValue>>,
pub can_split: Option<SplitType>,
}

#[derive(Clone)]
Expand All @@ -56,6 +58,13 @@ impl<'a> Index<Id> for SingleNodeIndex<'a> {
}
}

#[derive(Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
pub enum SplitType {
// TODO there can be also aggregation split where additional aggregation applied on top of split aggregation
// Aggregation,
Projection,
}

impl LogicalPlanAnalysis {
pub fn new(cube_context: Arc<CubeContext>, planner: Arc<DefaultPhysicalPlanner>) -> Self {
Self {
Expand All @@ -77,16 +86,13 @@ impl LogicalPlanAnalysis {
})
};
let original_expr = if is_expr_node(enode) {
// TODO .unwrap
Some(
node_to_expr(
enode,
&egraph.analysis.cube_context,
&id_to_expr,
&SingleNodeIndex { egraph },
)
.unwrap(),
node_to_expr(
enode,
&egraph.analysis.cube_context,
&id_to_expr,
&SingleNodeIndex { egraph },
)
.ok()
} else {
None
};
Expand Down Expand Up @@ -346,6 +352,40 @@ impl LogicalPlanAnalysis {
}
}

fn make_can_split(
egraph: &EGraph<LogicalPlanLanguage, Self>,
enode: &LogicalPlanLanguage,
) -> Option<SplitType> {
let can_split = |id| egraph.index(id).data.can_split.clone();
let node_by_id = &SingleNodeIndex { egraph };
match enode {
LogicalPlanLanguage::ScalarFunctionExpr(params) => {
let fun = crate::match_data_node!(node_by_id, params[0], ScalarFunctionExprFun);

if fun == BuiltinScalarFunction::DateTrunc {
Some(SplitType::Projection)
} else {
None
}
}
LogicalPlanLanguage::ColumnExpr(_) | LogicalPlanLanguage::AggregateFunctionExpr(_) => {
Some(SplitType::Projection)
}
LogicalPlanLanguage::CastExpr(params) => can_split(params[0]),
LogicalPlanLanguage::AggregateGroupExpr(params)
| LogicalPlanLanguage::AggregateAggrExpr(params) => Some(
params
.iter()
.map(|p| can_split(*p))
.collect::<Option<Vec<_>>>()?
.into_iter()
.max()
.unwrap_or(SplitType::Projection),
),
_ => None,
}
}

fn eval_constant_expr(
egraph: &EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
expr: &Expr,
Expand Down Expand Up @@ -431,6 +471,7 @@ impl Analysis<LogicalPlanLanguage> for LogicalPlanAnalysis {
referenced_expr: Self::make_referenced_expr(egraph, enode),
constant: Self::make_constant(egraph, enode),
constant_in_list: Self::make_constant_in_list(egraph, enode),
can_split: Self::make_can_split(egraph, enode),
}
}

Expand All @@ -441,12 +482,14 @@ impl Analysis<LogicalPlanLanguage> for LogicalPlanAnalysis {
let (column_name_to_alias, b) = self.merge_option_field(a, b, |d| &mut d.expr_to_alias);
let (referenced_columns, b) = self.merge_option_field(a, b, |d| &mut d.referenced_expr);
let (constant_in_list, b) = self.merge_option_field(a, b, |d| &mut d.constant_in_list);
let (can_split, b) = self.merge_option_field(a, b, |d| &mut d.can_split);
let (column_name, _) = self.merge_option_field(a, b, |d| &mut d.column_name);
original_expr
| member_name_to_expr
| column_name_to_alias
| referenced_columns
| constant_in_list
| can_split
| column_name
}

Expand Down
1 change: 1 addition & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Expand Up @@ -499,6 +499,7 @@ macro_rules! match_params {
};
}

#[macro_export]
macro_rules! match_data_node {
($node_by_id:expr, $id_expr:expr, $field_variant:ident) => {
match $node_by_id.index($id_expr.clone()) {
Expand Down
11 changes: 10 additions & 1 deletion rust/cubesql/cubesql/src/compile/rewrite/language.rs
Expand Up @@ -120,7 +120,16 @@ macro_rules! variant_field_struct {

impl FromStr for [<$variant $var_field:camel>] {
type Err = CubeError;
fn from_str(_s: &str) -> Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
let prefix = format!("{}:", std::stringify!([<$variant $var_field:camel>]));
if s.starts_with(&prefix) {
let replaced = s.replace(&prefix, "");
if &replaced == "None" {
return Ok([<$variant $var_field:camel>](None));
} else {
return Ok([<$variant $var_field:camel>](Some(s.to_string())));
}
}
Err(CubeError::internal("Conversion from string is not supported".to_string()))
}
}
Expand Down
18 changes: 18 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/mod.rs
Expand Up @@ -267,6 +267,12 @@ crate::plan_to_language! {
aliases: Vec<(String, String)>,
table_name: Option<String>,
},
InnerProjectionSplitReplacer {
members: Vec<LogicalPlan>,
},
OuterProjectionSplitReplacer {
members: Vec<LogicalPlan>,
},
}
}

Expand Down Expand Up @@ -519,6 +525,10 @@ fn column_expr(column: impl Display) -> String {
format!("(ColumnExpr {})", column)
}

fn cast_expr(expr: impl Display, data_type: impl Display) -> String {
format!("(CastExpr {} {})", expr, data_type)
}

fn alias_expr(column: impl Display, alias: impl Display) -> String {
format!("(AliasExpr {} {})", column, alias)
}
Expand Down Expand Up @@ -570,6 +580,14 @@ fn filter_replacer(members: impl Display, cube: impl Display) -> String {
format!("(FilterReplacer {} {})", members, cube)
}

fn inner_projection_split_replacer(members: impl Display) -> String {
format!("(InnerProjectionSplitReplacer {})", members)
}

fn outer_projection_split_replacer(members: impl Display) -> String {
format!("(OuterProjectionSplitReplacer {})", members)
}

fn cube_scan_members(left: impl Display, right: impl Display) -> String {
format!("(CubeScanMembers {} {})", left, right)
}
Expand Down
3 changes: 3 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs
Expand Up @@ -6,6 +6,7 @@ use crate::compile::rewrite::rules::dates::DateRules;
use crate::compile::rewrite::rules::filters::FilterRules;
use crate::compile::rewrite::rules::members::MemberRules;
use crate::compile::rewrite::rules::order::OrderRules;
use crate::compile::rewrite::rules::split::SplitRules;
use crate::compile::rewrite::LogicalPlanLanguage;
use crate::sql::AuthContext;
use crate::CubeError;
Expand Down Expand Up @@ -52,6 +53,7 @@ impl Rewriter {
let extractor = Extractor::new(&runner.egraph, BestCubePlan);
let (_, best) = extractor.find_best(root);
let new_root = Id::from(best.as_ref().len() - 1);
//log::debug!("Egraph: {:#?}", runner.egraph);
log::debug!("Best: {:?}", best);
self.graph = runner.egraph.clone();
let converter =
Expand All @@ -65,6 +67,7 @@ impl Rewriter {
Box::new(FilterRules::new(self.cube_context.clone())),
Box::new(DateRules::new(self.cube_context.clone())),
Box::new(OrderRules::new(self.cube_context.clone())),
Box::new(SplitRules::new(self.cube_context.clone())),
];
let mut rewrites = Vec::new();
for r in rules {
Expand Down
17 changes: 16 additions & 1 deletion rust/cubesql/cubesql/src/compile/rewrite/rules/dates.rs
@@ -1,8 +1,8 @@
use crate::compile::engine::provider::CubeContext;
use crate::compile::rewrite::analysis::LogicalPlanAnalysis;
use crate::compile::rewrite::rewriter::RewriteRules;
use crate::compile::rewrite::LogicalPlanLanguage;
use crate::compile::rewrite::{binary_expr, column_expr, literal_expr, rewrite};
use crate::compile::rewrite::{cast_expr, LogicalPlanLanguage};
use crate::compile::rewrite::{fun_expr, literal_string, to_day_interval_expr, udf_expr};
use egg::Rewrite;
use std::sync::Arc;
Expand Down Expand Up @@ -214,6 +214,21 @@ impl RewriteRules for DateRules {
vec![literal_string("day"), column_expr("?column")],
),
),
rewrite(
"cast-in-date-trunc",
fun_expr(
"DateTrunc",
// TODO check data_type?
vec![
"?granularity".to_string(),
cast_expr(column_expr("?column"), "?data_type"),
],
),
fun_expr(
"DateTrunc",
vec!["?granularity".to_string(), column_expr("?column")],
),
),
]
}
}
Expand Down