From 2f7a1051178b2f19c5f66a0649fa66b715672d5f Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Mon, 31 Mar 2025 21:37:04 +0200 Subject: [PATCH] fix(cubesql): Allow more filters in CubeScan before aggregation pushdown For now, only __user filters are enabled, others will be done later --- .../cubesql/src/compile/rewrite/analysis.rs | 10 +++ .../src/compile/rewrite/rules/members.rs | 9 ++- .../src/compile/test/test_user_change.rs | 63 +++++++++++++++++++ .../cubesql/cubesql/src/compile/test/utils.rs | 14 +++++ 4 files changed, 94 insertions(+), 2 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs index 574a8e205a5b0..a96b03403e1e0 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/analysis.rs @@ -737,6 +737,13 @@ impl LogicalPlanAnalysis { ) -> Option> { let filter_operators = |id| egraph.index(id).data.filter_operators.clone(); match enode { + LogicalPlanLanguage::CubeScanFilters(params) => { + let mut map = Vec::new(); + for id in params.iter() { + map.extend(filter_operators(*id)?.into_iter()); + } + Some(map) + } LogicalPlanLanguage::FilterOp(params) => filter_operators(params[0]), LogicalPlanLanguage::FilterOpFilters(params) => { let mut map = Vec::new(); @@ -763,6 +770,9 @@ impl LogicalPlanAnalysis { .to_string(); Some(vec![(member, "equals".to_string())]) } + LogicalPlanLanguage::ChangeUserMember(_) => { + Some(vec![("__user".to_string(), "equals".to_string())]) + } _ => None, } } diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs index 90b9ebdd73b3b..c128ae11dc8aa 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs @@ -1625,11 +1625,16 @@ impl MemberRules { if *ungrouped { continue; } - let Some(empty_filters) = &egraph.index(subst[filters_var]).data.is_empty_list + let Some(filter_operators) = + &egraph.index(subst[filters_var]).data.filter_operators else { return false; }; - if !empty_filters { + let only_allowed_filters = filter_operators.iter().all(|(member, _op)| { + // TODO this should allow even more, like dimensions and segments + member == "__user" + }); + if !only_allowed_filters { return false; } if referenced_aggr_expr.len() == 0 { diff --git a/rust/cubesql/cubesql/src/compile/test/test_user_change.rs b/rust/cubesql/cubesql/src/compile/test/test_user_change.rs index a5680eeab2c3d..d5746f6a01336 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_user_change.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_user_change.rs @@ -275,3 +275,66 @@ GROUP BY 1 assert!(sql_query.sql.contains(r#""changeUser": "gopher""#)); assert_eq!(load_calls[0].meta.change_user(), Some("gopher".to_string())); } + +/// Repeated aggregation should be flattened even in presence of __user filter +#[tokio::test] +async fn flatten_aggregation_into_user_change() { + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" +SELECT + dim_str0 +FROM + ( + SELECT + dim_str0 + FROM + ( + SELECT + dim_str0, + AVG(avgPrice) + FROM + MultiTypeCube + WHERE + __user = 'gopher' + GROUP BY + 1 + ) t + GROUP BY + dim_str0 + ) AS t +GROUP BY + dim_str0 +ORDER BY + dim_str0 ASC +LIMIT + 1 + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + // This query should rewrite completely as CubeScan + let logical_plan = query_plan.as_logical_plan(); + let cube_scan = logical_plan.expect_root_cube_scan(); + + assert_eq!(cube_scan.options.change_user, Some("gopher".to_string())); + + assert_eq!( + cube_scan.request, + V1LoadRequestQuery { + measures: Some(vec![]), + segments: Some(vec![]), + dimensions: Some(vec!["MultiTypeCube.dim_str0".to_string(),]), + order: Some(vec![vec![ + "MultiTypeCube.dim_str0".to_string(), + "asc".to_string(), + ],],), + limit: Some(1), + ..Default::default() + } + ) +} diff --git a/rust/cubesql/cubesql/src/compile/test/utils.rs b/rust/cubesql/cubesql/src/compile/test/utils.rs index e22772a655b61..08cf23a39e944 100644 --- a/rust/cubesql/cubesql/src/compile/test/utils.rs +++ b/rust/cubesql/cubesql/src/compile/test/utils.rs @@ -11,6 +11,13 @@ use crate::{ }; pub trait LogicalPlanTestUtils { + fn try_expect_root_cube_scan(&self) -> Option<&CubeScanNode>; + + fn expect_root_cube_scan(&self) -> &CubeScanNode { + self.try_expect_root_cube_scan() + .expect("Root node is not CubeScan") + } + fn find_cube_scan(&self) -> CubeScanNode; fn find_cube_scan_wrapped_sql(&self) -> CubeScanWrappedSqlNode; @@ -21,6 +28,13 @@ pub trait LogicalPlanTestUtils { } impl LogicalPlanTestUtils for LogicalPlan { + fn try_expect_root_cube_scan(&self) -> Option<&CubeScanNode> { + let LogicalPlan::Extension(ext) = self else { + return None; + }; + ext.node.as_any().downcast_ref::() + } + fn find_cube_scan(&self) -> CubeScanNode { let cube_scans = find_cube_scans_deep_search(Arc::new(self.clone()), true); if cube_scans.len() != 1 {