From ae28d4cabb9be05e51ba96a9ab0c959a1303895e Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Mon, 12 Feb 2024 16:44:47 -0800 Subject: [PATCH 1/3] feat(cubesql): Always Prefer SQL push down over aggregation in Datafusion --- rust/cubesql/cubesql/src/compile/mod.rs | 40 +++++++++++++++++++ .../cubesql/src/compile/rewrite/analysis.rs | 1 - .../cubesql/src/compile/rewrite/cost.rs | 40 ++++++++++++++++--- 3 files changed, 74 insertions(+), 7 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index b5d1ed6dab9f9..2ec17a5cef621 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -4294,6 +4294,46 @@ limit ); } + #[tokio::test] + async fn powerbi_date_range_min_max() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_logger(); + + let query_plan = convert_select_to_query_plan( + r#"select + max("rows"."order_date") as "a0", + min("rows"."order_date") as "a1" +from + ( + select + "order_date" + from + "public"."KibanaSampleDataEcommerce" "$Table" + ) "rows" "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec![]), + segments: Some(vec![]), + time_dimensions: None, + order: None, + limit: None, + offset: None, + filters: None, + ungrouped: Some(true), + } + ); + } + #[tokio::test] async fn powerbi_inner_decimal_cast() { init_logger(); diff --git a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs index aa9555613292b..8e1a61fa614f4 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs @@ -1298,7 +1298,6 @@ impl Analysis for LogicalPlanAnalysis { ScalarValue::Date32(_) | ScalarValue::Date64(_) | ScalarValue::Int64(_) - | ScalarValue::Int32(_) | ScalarValue::Float64(_) | ScalarValue::IntervalYearMonth(_) | ScalarValue::IntervalDayTime(_) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs index 59f87798a3592..7c93ba15bd268 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs @@ -1,8 +1,9 @@ use crate::{ compile::{ rewrite::{ - rules::utils::granularity_str_to_int_order, CubeScanWrapped, DimensionName, - LogicalPlanLanguage, MemberErrorPriority, ScalarUDFExprFun, TimeDimensionGranularity, + rules::utils::granularity_str_to_int_order, CubeScanUngrouped, CubeScanWrapped, + DimensionName, LogicalPlanLanguage, MemberErrorPriority, ScalarUDFExprFun, + TimeDimensionGranularity, }, MetaContext, }, @@ -44,6 +45,7 @@ pub struct CubePlanCost { member_errors: i64, // TODO if pre-aggregation can be used for window functions, then it'd be suboptimal non_pushed_down_window: i64, + ungrouped_aggregates: usize, wrapper_nodes: i64, ast_size_outside_wrapper: usize, cube_members: i64, @@ -54,6 +56,7 @@ pub struct CubePlanCost { ast_size_without_alias: usize, ast_size: usize, ast_size_inside_wrapper: usize, + ungrouped_nodes: usize, } #[derive(Debug, Clone, Eq, PartialEq)] @@ -101,9 +104,9 @@ impl CubePlanCostAndState { } } - pub fn finalize(&self) -> Self { + pub fn finalize(&self, enode: &LogicalPlanLanguage) -> Self { Self { - cost: self.cost.finalize(&self.state), + cost: self.cost.finalize(&self.state, enode), state: self.state.clone(), } } @@ -129,6 +132,7 @@ impl CubePlanCost { empty_wrappers: self.empty_wrappers + other.empty_wrappers, ast_size_outside_wrapper: self.ast_size_outside_wrapper + other.ast_size_outside_wrapper, + ungrouped_aggregates: self.ungrouped_aggregates + other.ungrouped_aggregates, wrapper_nodes: self.wrapper_nodes + other.wrapper_nodes, cube_scan_nodes: self.cube_scan_nodes + other.cube_scan_nodes, time_dimensions_used_as_dimensions: self.time_dimensions_used_as_dimensions @@ -139,10 +143,11 @@ impl CubePlanCost { ast_size_without_alias: self.ast_size_without_alias + other.ast_size_without_alias, ast_size: self.ast_size + other.ast_size, ast_size_inside_wrapper: self.ast_size_inside_wrapper + other.ast_size_inside_wrapper, + ungrouped_nodes: self.ungrouped_nodes + other.ungrouped_nodes, } } - pub fn finalize(&self, state: &CubePlanState) -> Self { + pub fn finalize(&self, state: &CubePlanState, enode: &LogicalPlanLanguage) -> Self { Self { replacers: self.replacers, table_scans: self.table_scans, @@ -176,11 +181,27 @@ impl CubePlanCost { } + self.empty_wrappers, time_dimensions_used_as_dimensions: self.time_dimensions_used_as_dimensions, max_time_dimensions_granularity: self.max_time_dimensions_granularity, + ungrouped_aggregates: match state { + CubePlanState::Wrapped => 0, + CubePlanState::Unwrapped(_) => { + if let LogicalPlanLanguage::Aggregate(_) = enode { + if self.ungrouped_nodes > 0 { + 1 + } else { + 0 + } + } else { + 0 + } + } + CubePlanState::Wrapper => 0, + } + self.ungrouped_aggregates, wrapper_nodes: self.wrapper_nodes, cube_scan_nodes: self.cube_scan_nodes, ast_size_without_alias: self.ast_size_without_alias, ast_size: self.ast_size, ast_size_inside_wrapper: self.ast_size_inside_wrapper, + ungrouped_nodes: self.ungrouped_nodes, } } } @@ -338,6 +359,11 @@ impl CostFunction for BestCubePlan { _ => 1, }; + let ungrouped_nodes = match enode { + LogicalPlanLanguage::CubeScanUngrouped(CubeScanUngrouped(true)) => 1, + _ => 0, + }; + let initial_cost = CubePlanCostAndState { cost: CubePlanCost { replacers: this_replacers, @@ -352,6 +378,7 @@ impl CostFunction for BestCubePlan { time_dimensions_used_as_dimensions, max_time_dimensions_granularity, structure_points, + ungrouped_aggregates: 0, wrapper_nodes, empty_wrappers: 0, ast_size_outside_wrapper: 0, @@ -359,6 +386,7 @@ impl CostFunction for BestCubePlan { cube_scan_nodes, ast_size_without_alias, ast_size: 1, + ungrouped_nodes, }, state: match enode { LogicalPlanLanguage::CubeScanWrapped(CubeScanWrapped(true)) => { @@ -375,7 +403,7 @@ impl CostFunction for BestCubePlan { let child = costs(*id); cost.add_child(&child) }) - .finalize(); + .finalize(enode); res } } From 93b2df4520b4dfe5c49902454e434928e4ccd4c1 Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Mon, 12 Feb 2024 17:09:47 -0800 Subject: [PATCH 2/3] Bring back ScalarValue::Int32(_) as it was already removed --- rust/cubesql/cubesql/src/compile/rewrite/analysis.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs index 8e1a61fa614f4..4a75eb03f0970 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs @@ -1297,6 +1297,7 @@ impl Analysis for LogicalPlanAnalysis { c, ScalarValue::Date32(_) | ScalarValue::Date64(_) + | ScalarValue::Int32(_) | ScalarValue::Int64(_) | ScalarValue::Float64(_) | ScalarValue::IntervalYearMonth(_) From 9a32d957276daf4a1aaf51fc0096db95e7081ecb Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Mon, 12 Feb 2024 17:20:35 -0800 Subject: [PATCH 3/3] Bring back ScalarValue::Int32(_) as it was already removed --- rust/cubesql/cubesql/src/compile/rewrite/analysis.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs index 4a75eb03f0970..aa9555613292b 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs @@ -1297,8 +1297,8 @@ impl Analysis for LogicalPlanAnalysis { c, ScalarValue::Date32(_) | ScalarValue::Date64(_) - | ScalarValue::Int32(_) | ScalarValue::Int64(_) + | ScalarValue::Int32(_) | ScalarValue::Float64(_) | ScalarValue::IntervalYearMonth(_) | ScalarValue::IntervalDayTime(_)