From c1e0cb7a14fa0bd18a4e74b2ab8e0fd76164ef98 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Fri, 7 Mar 2025 21:09:14 +0200 Subject: [PATCH 1/4] feat(cubesql): Track used cube members in generate_sql_for_expr --- .../cubesql/src/compile/engine/df/wrapper.rs | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index d4eac2eb5fb86..fb9f577b51d89 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -849,6 +849,7 @@ impl CubeScanWrapperNode { expr, None, Arc::new(HashMap::new()), + None, ) .await?; columns.push(AliasedColumn { expr, alias }); @@ -1563,6 +1564,7 @@ impl CubeScanWrapperNode { } else { original_expr.clone() }; + let (expr_sql, new_sql_query) = Self::generate_sql_for_expr( plan.clone(), sql, @@ -1570,6 +1572,7 @@ impl CubeScanWrapperNode { expr.clone(), push_to_cube_context, subqueries.clone(), + None, ) .await?; let expr_sql = @@ -1674,6 +1677,7 @@ impl CubeScanWrapperNode { expr: Expr, push_to_cube_context: Option<&'ctx PushToCubeContext>, subqueries: Arc>, + mut used_members: Option<&'ctx mut HashSet>, ) -> Pin> + Send + 'ctx>> { Box::pin(async move { match expr { @@ -1685,6 +1689,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; Ok((expr, sql_query)) @@ -1723,6 +1728,7 @@ impl CubeScanWrapperNode { expr, None, subqueries.clone(), + used_members, ) .await; } @@ -1757,6 +1763,9 @@ impl CubeScanWrapperNode { })?; match member { MemberField::Member(member) => { + if let Some(used_members) = used_members { + used_members.insert(member.clone()); + } Ok((format!("${{{}}}", member), sql_query)) } MemberField::Literal(value) => { @@ -1767,6 +1776,7 @@ impl CubeScanWrapperNode { Expr::Literal(value.clone()), push_to_cube_context, subqueries.clone(), + used_members, ) .await } @@ -1818,6 +1828,7 @@ impl CubeScanWrapperNode { *left, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (right, sql_query) = Self::generate_sql_for_expr( @@ -1827,6 +1838,7 @@ impl CubeScanWrapperNode { *right, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -1849,6 +1861,7 @@ impl CubeScanWrapperNode { *like.expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (pattern, sql_query) = Self::generate_sql_for_expr( @@ -1858,6 +1871,7 @@ impl CubeScanWrapperNode { *like.pattern, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (escape_char, sql_query) = match like.escape_char { @@ -1869,6 +1883,7 @@ impl CubeScanWrapperNode { Expr::Literal(ScalarValue::Utf8(Some(escape_char.to_string()))), push_to_cube_context, subqueries.clone(), + used_members, ) .await?; (Some(escape_char), sql_query) @@ -1894,6 +1909,7 @@ impl CubeScanWrapperNode { *ilike.expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (pattern, sql_query) = Self::generate_sql_for_expr( @@ -1903,6 +1919,7 @@ impl CubeScanWrapperNode { *ilike.pattern, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (escape_char, sql_query) = match ilike.escape_char { @@ -1914,6 +1931,7 @@ impl CubeScanWrapperNode { Expr::Literal(ScalarValue::Utf8(Some(escape_char.to_string()))), push_to_cube_context, subqueries.clone(), + used_members, ) .await?; (Some(escape_char), sql_query) @@ -1940,6 +1958,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = @@ -1962,6 +1981,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -1983,6 +2003,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -2004,6 +2025,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -2032,6 +2054,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = sql_query_next; @@ -2048,6 +2071,7 @@ impl CubeScanWrapperNode { *when, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (then, sql_query_next) = Self::generate_sql_for_expr( @@ -2057,6 +2081,7 @@ impl CubeScanWrapperNode { *then, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = sql_query_next; @@ -2070,6 +2095,7 @@ impl CubeScanWrapperNode { *else_expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; sql_query = sql_query_next; @@ -2093,6 +2119,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let data_type = Self::generate_sql_type(sql_generator.clone(), data_type)?; @@ -2113,6 +2140,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -2446,6 +2474,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2483,6 +2512,7 @@ impl CubeScanWrapperNode { args[1].clone(), push_to_cube_context, subqueries.clone(), + used_members, ) .await?; return Ok(( @@ -2526,6 +2556,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2566,6 +2597,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2595,6 +2627,7 @@ impl CubeScanWrapperNode { expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2623,6 +2656,7 @@ impl CubeScanWrapperNode { expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2664,6 +2698,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2678,6 +2713,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2692,6 +2728,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2728,6 +2765,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2740,6 +2778,7 @@ impl CubeScanWrapperNode { expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2771,6 +2810,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2781,6 +2821,7 @@ impl CubeScanWrapperNode { *subquery, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; sql_query = query; From 2edcd0d25ce513e14ea07227c971853b5a04a5bc Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 15 Oct 2024 01:29:50 +0300 Subject: [PATCH 2/4] feat(cubesql): Lift used members through generate_column_expr --- .../cubesql/src/compile/engine/df/wrapper.rs | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index fb9f577b51d89..25a21c8c31103 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -1264,7 +1264,7 @@ impl CubeScanWrapperNode { subqueries_sql.clone(), ) .await?; - let join_condition = join_condition[0].expr.clone(); + let join_condition = join_condition[0].0.expr.clone(); sql = new_sql; let join_sql_expression = { @@ -1310,7 +1310,7 @@ impl CubeScanWrapperNode { measures: Some( aggregate .iter() - .map(|m| { + .map(|(m, _used_members)| { Self::ungrouped_member_def( m, &ungrouped_scan_node.used_cubes, @@ -1318,14 +1318,14 @@ impl CubeScanWrapperNode { }) .chain( // TODO understand type of projections - projection.iter().map(|m| { + projection.iter().map(|(m, _used_members)| { Self::ungrouped_member_def( m, &ungrouped_scan_node.used_cubes, ) }), ) - .chain(window.iter().map(|m| { + .chain(window.iter().map(|(m, _used_members)| { Self::ungrouped_member_def( m, &ungrouped_scan_node.used_cubes, @@ -1337,7 +1337,7 @@ impl CubeScanWrapperNode { group_by .iter() .zip(group_descs.iter()) - .map(|(m, t)| { + .map(|((m, _used_members), t)| { Self::dimension_member_def( m, &ungrouped_scan_node.used_cubes, @@ -1349,7 +1349,7 @@ impl CubeScanWrapperNode { segments: Some( filter .iter() - .map(|m| { + .map(|(m, _used_members)| { Self::ungrouped_member_def( m, &ungrouped_scan_node.used_cubes, @@ -1399,7 +1399,7 @@ impl CubeScanWrapperNode { )) })?; Ok(vec![ - aliased_column.alias.clone(), + aliased_column.0.alias.clone(), if *asc { "asc".to_string() } else { "desc".to_string() }, ]) } @@ -1470,24 +1470,24 @@ impl CubeScanWrapperNode { .get_sql_templates() .select( sql.sql.to_string(), - projection, - group_by, + projection.into_iter().map(|(m, _)| m).collect(), + group_by.into_iter().map(|(m, _)| m).collect(), group_descs, - aggregate, + aggregate.into_iter().map(|(m, _)| m).collect(), // TODO from_alias.unwrap_or("".to_string()), if !filter.is_empty() { Some( filter .iter() - .map(|f| f.expr.to_string()) + .map(|(f, _)| f.expr.to_string()) .join(" AND "), ) } else { None }, None, - order, + order.into_iter().map(|(m, _)| m).collect(), limit, offset, distinct, @@ -1549,7 +1549,7 @@ impl CubeScanWrapperNode { can_rename_columns: bool, push_to_cube_context: Option<&PushToCubeContext<'_>>, subqueries: Arc>, - ) -> result::Result<(Vec, SqlQuery), CubeError> { + ) -> result::Result<(Vec<(AliasedColumn, HashSet)>, SqlQuery), CubeError> { let mut aliased_columns = Vec::new(); for original_expr in exprs { let expr = if let Some(column_remapping) = column_remapping { @@ -1565,6 +1565,7 @@ impl CubeScanWrapperNode { original_expr.clone() }; + let mut used_members = HashSet::new(); let (expr_sql, new_sql_query) = Self::generate_sql_for_expr( plan.clone(), sql, @@ -1572,7 +1573,7 @@ impl CubeScanWrapperNode { expr.clone(), push_to_cube_context, subqueries.clone(), - None, + Some(&mut used_members), ) .await?; let expr_sql = @@ -1580,10 +1581,13 @@ impl CubeScanWrapperNode { sql = new_sql_query; let alias = next_remapper.add_expr(&schema, &original_expr, &expr)?; - aliased_columns.push(AliasedColumn { - expr: expr_sql, - alias, - }); + aliased_columns.push(( + AliasedColumn { + expr: expr_sql, + alias, + }, + used_members, + )); } Ok((aliased_columns, sql)) } From 71646b3182ed52aa0464d06379a25555abc8d689 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Sat, 8 Mar 2025 17:20:24 +0200 Subject: [PATCH 3/4] feat(cubesql): Use only actually used cubes in member expressions --- .../cubesql/src/compile/engine/df/wrapper.rs | 69 ++++++++++++------- rust/cubesql/cubesql/src/compile/mod.rs | 8 +-- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index 25a21c8c31103..608e7154b42b2 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -1264,6 +1264,8 @@ impl CubeScanWrapperNode { subqueries_sql.clone(), ) .await?; + + let join_condition_members = &join_condition[0].1; let join_condition = join_condition[0].0.expr.clone(); sql = new_sql; @@ -1275,6 +1277,7 @@ impl CubeScanWrapperNode { expr: join_condition, alias: "__join__alias__unused".to_string(), }, + join_condition_members, &ungrouped_scan_node.used_cubes, )?; serde_json::json!(res).to_string() @@ -1310,24 +1313,27 @@ impl CubeScanWrapperNode { measures: Some( aggregate .iter() - .map(|(m, _used_members)| { + .map(|(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) }) .chain( // TODO understand type of projections - projection.iter().map(|(m, _used_members)| { + projection.iter().map(|(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) }), ) - .chain(window.iter().map(|(m, _used_members)| { + .chain(window.iter().map(|(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) })) @@ -1337,9 +1343,10 @@ impl CubeScanWrapperNode { group_by .iter() .zip(group_descs.iter()) - .map(|((m, _used_members), t)| { + .map(|((m, used_members), t)| { Self::dimension_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, t, ) @@ -1349,9 +1356,10 @@ impl CubeScanWrapperNode { segments: Some( filter .iter() - .map(|(m, _used_members)| { + .map(|(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) }) @@ -1592,40 +1600,55 @@ impl CubeScanWrapperNode { Ok((aliased_columns, sql)) } - fn make_member_def( + fn make_member_def<'m>( column: &AliasedColumn, - used_cubes: &Vec, + used_members: impl IntoIterator, + ungrouped_scan_cubes: &Vec, ) -> Result { + let used_cubes = used_members + .into_iter() + .flat_map(|member| member.split_once('.')) + .map(|(cube, _rest)| cube) + .unique() + .map(|cube| cube.to_string()) + .collect::>(); + let cube_name = used_cubes + .first() + .or_else(|| ungrouped_scan_cubes.first()) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Can't generate SQL for column without cubes: {:?}", + column + )) + })? + .clone(); + let res = UngrouppedMemberDef { - cube_name: used_cubes - .iter() - .next() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Can't generate SQL for column without cubes: {:?}", - column - )) - })? - .to_string(), + cube_name, alias: column.alias.clone(), - cube_params: used_cubes.clone(), + cube_params: used_cubes, expr: column.expr.clone(), grouping_set: None, }; Ok(res) } - fn ungrouped_member_def(column: &AliasedColumn, used_cubes: &Vec) -> Result { - let res = Self::make_member_def(column, used_cubes)?; + fn ungrouped_member_def<'m>( + column: &AliasedColumn, + used_members: impl IntoIterator, + ungrouped_scan_cubes: &Vec, + ) -> Result { + let res = Self::make_member_def(column, used_members, ungrouped_scan_cubes)?; Ok(serde_json::json!(res).to_string()) } - fn dimension_member_def( + fn dimension_member_def<'m>( column: &AliasedColumn, - used_cubes: &Vec, + used_members: impl IntoIterator, + ungrouped_scan_cubes: &Vec, grouping_type: &Option, ) -> Result { - let mut res = Self::make_member_def(column, used_cubes)?; + let mut res = Self::make_member_def(column, used_members, ungrouped_scan_cubes)?; res.grouping_set = grouping_type.clone(); Ok(serde_json::json!(res).to_string()) } diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index b0ec639e65420..5593db5c60144 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -7285,7 +7285,7 @@ ORDER BY json!({ "cube_name": "WideCube", "alias": "pivot_grouping", - "cube_params": ["WideCube"], + "cube_params": [], "expr": "0", "grouping_set": null, }) @@ -11809,16 +11809,16 @@ ORDER BY "source"."str0" ASC json!({ "cube_name": "KibanaSampleDataEcommerce", "alias": "ta_1_order_date_", - "cube_params": ["KibanaSampleDataEcommerce", "Logs"], + "cube_params": ["KibanaSampleDataEcommerce"], "expr": "((${KibanaSampleDataEcommerce.order_date} = DATE('1994-05-01')) OR (${KibanaSampleDataEcommerce.order_date} = DATE('1996-05-03')))", "grouping_set": null, }).to_string(), ]), segments: Some(vec![ json!({ - "cube_name": "KibanaSampleDataEcommerce", + "cube_name": "Logs", "alias": "lower_ta_2_conte", - "cube_params": ["KibanaSampleDataEcommerce", "Logs"], + "cube_params": ["Logs"], "expr": "(LOWER(${Logs.content}) = $0$)", "grouping_set": null, }).to_string(), From 7090a0daa58c9054fcdc61e677ee1628146aed97 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Fri, 7 Mar 2025 22:55:10 +0200 Subject: [PATCH 4/4] feat(cubesql): Move dimensions-only projections to dimensions for push-to-Cube wrapper --- .../cubesql/src/compile/engine/df/wrapper.rs | 27 +++++++++++++++---- .../cubesql/src/compile/test/test_wrapper.rs | 4 +-- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index 608e7154b42b2..bcc7b3d0c3808 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -1309,6 +1309,15 @@ impl CubeScanWrapperNode { let load_request = &ungrouped_scan_node.request; + let (dimensions_only_projection, projection_with_measures) = + projection.iter().partition::, _>( + |(_column, used_members)| { + used_members.iter().all(|member| { + plan.meta.find_dimension_with_name(member).is_some() + }) + }, + ); + let load_request = V1LoadRequestQuery { measures: Some( aggregate @@ -1320,16 +1329,15 @@ impl CubeScanWrapperNode { &ungrouped_scan_node.used_cubes, ) }) - .chain( - // TODO understand type of projections - projection.iter().map(|(m, used_members)| { + .chain(projection_with_measures.iter().map( + |(m, used_members)| { Self::ungrouped_member_def( m, used_members, &ungrouped_scan_node.used_cubes, ) - }), - ) + }, + )) .chain(window.iter().map(|(m, used_members)| { Self::ungrouped_member_def( m, @@ -1351,6 +1359,15 @@ impl CubeScanWrapperNode { t, ) }) + .chain(dimensions_only_projection.iter().map( + |(m, used_members)| { + Self::ungrouped_member_def( + m, + used_members, + &ungrouped_scan_node.used_cubes, + ) + }, + )) .collect::>()?, ), segments: Some( diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index df6a743e69216..0973f7d374261 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1556,8 +1556,8 @@ async fn wrapper_cast_limit_explicit_members() { .as_logical_plan() .find_cube_scan_wrapped_sql() .request; - assert_eq!(request.measures.unwrap().len(), 1); - assert_eq!(request.dimensions.unwrap().len(), 0); + assert_eq!(request.measures.unwrap().len(), 0); + assert_eq!(request.dimensions.unwrap().len(), 1); } #[tokio::test]