Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,13 @@ impl LogicalPlanAnalysis {
) -> Option<Vec<(String, String)>> {
let filter_operators = |id| egraph.index(id).data.filter_operators.clone();
match enode {
LogicalPlanLanguage::CubeScanFilters(params) => {
let mut map = Vec::new();
for id in params.iter() {
map.extend(filter_operators(*id)?.into_iter());
}
Some(map)
}
LogicalPlanLanguage::FilterOp(params) => filter_operators(params[0]),
LogicalPlanLanguage::FilterOpFilters(params) => {
let mut map = Vec::new();
Expand All @@ -763,6 +770,9 @@ impl LogicalPlanAnalysis {
.to_string();
Some(vec![(member, "equals".to_string())])
}
LogicalPlanLanguage::ChangeUserMember(_) => {
Some(vec![("__user".to_string(), "equals".to_string())])
}
_ => None,
}
}
Expand Down
9 changes: 7 additions & 2 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1625,11 +1625,16 @@ impl MemberRules {
if *ungrouped {
continue;
}
let Some(empty_filters) = &egraph.index(subst[filters_var]).data.is_empty_list
let Some(filter_operators) =
&egraph.index(subst[filters_var]).data.filter_operators
else {
return false;
};
if !empty_filters {
let only_allowed_filters = filter_operators.iter().all(|(member, _op)| {
// TODO this should allow even more, like dimensions and segments
member == "__user"
});
if !only_allowed_filters {
return false;
}
if referenced_aggr_expr.len() == 0 {
Expand Down
63 changes: 63 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_user_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,66 @@ GROUP BY 1
assert!(sql_query.sql.contains(r#""changeUser": "gopher""#));
assert_eq!(load_calls[0].meta.change_user(), Some("gopher".to_string()));
}

/// Repeated aggregation should be flattened even in presence of __user filter
#[tokio::test]
async fn flatten_aggregation_into_user_change() {
init_testing_logger();

let query_plan = convert_select_to_query_plan(
// language=PostgreSQL
r#"
SELECT
dim_str0
FROM
(
SELECT
dim_str0
FROM
(
SELECT
dim_str0,
AVG(avgPrice)
FROM
MultiTypeCube
WHERE
__user = 'gopher'
GROUP BY
1
) t
GROUP BY
dim_str0
) AS t
GROUP BY
dim_str0
ORDER BY
dim_str0 ASC
LIMIT
1
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

// This query should rewrite completely as CubeScan
let logical_plan = query_plan.as_logical_plan();
let cube_scan = logical_plan.expect_root_cube_scan();

assert_eq!(cube_scan.options.change_user, Some("gopher".to_string()));

assert_eq!(
cube_scan.request,
V1LoadRequestQuery {
measures: Some(vec![]),
segments: Some(vec![]),
dimensions: Some(vec!["MultiTypeCube.dim_str0".to_string(),]),
order: Some(vec![vec![
"MultiTypeCube.dim_str0".to_string(),
"asc".to_string(),
],],),
limit: Some(1),
..Default::default()
}
)
}
14 changes: 14 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
};

pub trait LogicalPlanTestUtils {
fn try_expect_root_cube_scan(&self) -> Option<&CubeScanNode>;

fn expect_root_cube_scan(&self) -> &CubeScanNode {
self.try_expect_root_cube_scan()
.expect("Root node is not CubeScan")
}

fn find_cube_scan(&self) -> CubeScanNode;

fn find_cube_scan_wrapped_sql(&self) -> CubeScanWrappedSqlNode;
Expand All @@ -21,6 +28,13 @@
}

impl LogicalPlanTestUtils for LogicalPlan {
fn try_expect_root_cube_scan(&self) -> Option<&CubeScanNode> {
let LogicalPlan::Extension(ext) = self else {
return None;

Check warning on line 33 in rust/cubesql/cubesql/src/compile/test/utils.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/test/utils.rs#L33

Added line #L33 was not covered by tests
};
ext.node.as_any().downcast_ref::<CubeScanNode>()
}

fn find_cube_scan(&self) -> CubeScanNode {
let cube_scans = find_cube_scans_deep_search(Arc::new(self.clone()), true);
if cube_scans.len() != 1 {
Expand Down
Loading