diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index d4eac2eb5fb86..bcc7b3d0c3808 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -849,6 +849,7 @@ impl CubeScanWrapperNode { expr, None, Arc::new(HashMap::new()), + None, ) .await?; columns.push(AliasedColumn { expr, alias }); @@ -1263,7 +1264,9 @@ impl CubeScanWrapperNode { subqueries_sql.clone(), ) .await?; - let join_condition = join_condition[0].expr.clone(); + + let join_condition_members = &join_condition[0].1; + let join_condition = join_condition[0].0.expr.clone(); sql = new_sql; let join_sql_expression = { @@ -1274,6 +1277,7 @@ impl CubeScanWrapperNode { expr: join_condition, alias: "__join__alias__unused".to_string(), }, + join_condition_members, &ungrouped_scan_node.used_cubes, )?; serde_json::json!(res).to_string() @@ -1305,28 +1309,39 @@ impl CubeScanWrapperNode { let load_request = &ungrouped_scan_node.request; + let (dimensions_only_projection, projection_with_measures) = + projection.iter().partition::, _>( + |(_column, used_members)| { + used_members.iter().all(|member| { + plan.meta.find_dimension_with_name(member).is_some() + }) + }, + ); + let load_request = V1LoadRequestQuery { measures: Some( aggregate .iter() - .map(|m| { + .map(|(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) }) - .chain( - // TODO understand type of projections - projection.iter().map(|m| { + .chain(projection_with_measures.iter().map( + |(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) - }), - ) - .chain(window.iter().map(|m| { + }, + )) + .chain(window.iter().map(|(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) })) @@ -1336,21 +1351,32 @@ impl CubeScanWrapperNode { group_by .iter() .zip(group_descs.iter()) - .map(|(m, t)| { + .map(|((m, used_members), t)| { Self::dimension_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, t, ) }) + .chain(dimensions_only_projection.iter().map( + |(m, used_members)| { + Self::ungrouped_member_def( + m, + used_members, + &ungrouped_scan_node.used_cubes, + ) + }, + )) .collect::>()?, ), segments: Some( filter .iter() - .map(|m| { + .map(|(m, used_members)| { Self::ungrouped_member_def( m, + used_members, &ungrouped_scan_node.used_cubes, ) }) @@ -1398,7 +1424,7 @@ impl CubeScanWrapperNode { )) })?; Ok(vec![ - aliased_column.alias.clone(), + aliased_column.0.alias.clone(), if *asc { "asc".to_string() } else { "desc".to_string() }, ]) } @@ -1469,24 +1495,24 @@ impl CubeScanWrapperNode { .get_sql_templates() .select( sql.sql.to_string(), - projection, - group_by, + projection.into_iter().map(|(m, _)| m).collect(), + group_by.into_iter().map(|(m, _)| m).collect(), group_descs, - aggregate, + aggregate.into_iter().map(|(m, _)| m).collect(), // TODO from_alias.unwrap_or("".to_string()), if !filter.is_empty() { Some( filter .iter() - .map(|f| f.expr.to_string()) + .map(|(f, _)| f.expr.to_string()) .join(" AND "), ) } else { None }, None, - order, + order.into_iter().map(|(m, _)| m).collect(), limit, offset, distinct, @@ -1548,7 +1574,7 @@ impl CubeScanWrapperNode { can_rename_columns: bool, push_to_cube_context: Option<&PushToCubeContext<'_>>, subqueries: Arc>, - ) -> result::Result<(Vec, SqlQuery), CubeError> { + ) -> result::Result<(Vec<(AliasedColumn, HashSet)>, SqlQuery), CubeError> { let mut aliased_columns = Vec::new(); for original_expr in exprs { let expr = if let Some(column_remapping) = column_remapping { @@ -1563,6 +1589,8 @@ impl CubeScanWrapperNode { } else { original_expr.clone() }; + + let mut used_members = HashSet::new(); let (expr_sql, new_sql_query) = Self::generate_sql_for_expr( plan.clone(), sql, @@ -1570,6 +1598,7 @@ impl CubeScanWrapperNode { expr.clone(), push_to_cube_context, subqueries.clone(), + Some(&mut used_members), ) .await?; let expr_sql = @@ -1577,48 +1606,66 @@ impl CubeScanWrapperNode { sql = new_sql_query; let alias = next_remapper.add_expr(&schema, &original_expr, &expr)?; - aliased_columns.push(AliasedColumn { - expr: expr_sql, - alias, - }); + aliased_columns.push(( + AliasedColumn { + expr: expr_sql, + alias, + }, + used_members, + )); } Ok((aliased_columns, sql)) } - fn make_member_def( + fn make_member_def<'m>( column: &AliasedColumn, - used_cubes: &Vec, + used_members: impl IntoIterator, + ungrouped_scan_cubes: &Vec, ) -> Result { + let used_cubes = used_members + .into_iter() + .flat_map(|member| member.split_once('.')) + .map(|(cube, _rest)| cube) + .unique() + .map(|cube| cube.to_string()) + .collect::>(); + let cube_name = used_cubes + .first() + .or_else(|| ungrouped_scan_cubes.first()) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Can't generate SQL for column without cubes: {:?}", + column + )) + })? + .clone(); + let res = UngrouppedMemberDef { - cube_name: used_cubes - .iter() - .next() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Can't generate SQL for column without cubes: {:?}", - column - )) - })? - .to_string(), + cube_name, alias: column.alias.clone(), - cube_params: used_cubes.clone(), + cube_params: used_cubes, expr: column.expr.clone(), grouping_set: None, }; Ok(res) } - fn ungrouped_member_def(column: &AliasedColumn, used_cubes: &Vec) -> Result { - let res = Self::make_member_def(column, used_cubes)?; + fn ungrouped_member_def<'m>( + column: &AliasedColumn, + used_members: impl IntoIterator, + ungrouped_scan_cubes: &Vec, + ) -> Result { + let res = Self::make_member_def(column, used_members, ungrouped_scan_cubes)?; Ok(serde_json::json!(res).to_string()) } - fn dimension_member_def( + fn dimension_member_def<'m>( column: &AliasedColumn, - used_cubes: &Vec, + used_members: impl IntoIterator, + ungrouped_scan_cubes: &Vec, grouping_type: &Option, ) -> Result { - let mut res = Self::make_member_def(column, used_cubes)?; + let mut res = Self::make_member_def(column, used_members, ungrouped_scan_cubes)?; res.grouping_set = grouping_type.clone(); Ok(serde_json::json!(res).to_string()) } @@ -1674,6 +1721,7 @@ impl CubeScanWrapperNode { expr: Expr, push_to_cube_context: Option<&'ctx PushToCubeContext>, subqueries: Arc>, + mut used_members: Option<&'ctx mut HashSet>, ) -> Pin> + Send + 'ctx>> { Box::pin(async move { match expr { @@ -1685,6 +1733,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; Ok((expr, sql_query)) @@ -1723,6 +1772,7 @@ impl CubeScanWrapperNode { expr, None, subqueries.clone(), + used_members, ) .await; } @@ -1757,6 +1807,9 @@ impl CubeScanWrapperNode { })?; match member { MemberField::Member(member) => { + if let Some(used_members) = used_members { + used_members.insert(member.clone()); + } Ok((format!("${{{}}}", member), sql_query)) } MemberField::Literal(value) => { @@ -1767,6 +1820,7 @@ impl CubeScanWrapperNode { Expr::Literal(value.clone()), push_to_cube_context, subqueries.clone(), + used_members, ) .await } @@ -1818,6 +1872,7 @@ impl CubeScanWrapperNode { *left, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (right, sql_query) = Self::generate_sql_for_expr( @@ -1827,6 +1882,7 @@ impl CubeScanWrapperNode { *right, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -1849,6 +1905,7 @@ impl CubeScanWrapperNode { *like.expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (pattern, sql_query) = Self::generate_sql_for_expr( @@ -1858,6 +1915,7 @@ impl CubeScanWrapperNode { *like.pattern, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (escape_char, sql_query) = match like.escape_char { @@ -1869,6 +1927,7 @@ impl CubeScanWrapperNode { Expr::Literal(ScalarValue::Utf8(Some(escape_char.to_string()))), push_to_cube_context, subqueries.clone(), + used_members, ) .await?; (Some(escape_char), sql_query) @@ -1894,6 +1953,7 @@ impl CubeScanWrapperNode { *ilike.expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (pattern, sql_query) = Self::generate_sql_for_expr( @@ -1903,6 +1963,7 @@ impl CubeScanWrapperNode { *ilike.pattern, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (escape_char, sql_query) = match ilike.escape_char { @@ -1914,6 +1975,7 @@ impl CubeScanWrapperNode { Expr::Literal(ScalarValue::Utf8(Some(escape_char.to_string()))), push_to_cube_context, subqueries.clone(), + used_members, ) .await?; (Some(escape_char), sql_query) @@ -1940,6 +2002,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = @@ -1962,6 +2025,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -1983,6 +2047,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -2004,6 +2069,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -2032,6 +2098,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = sql_query_next; @@ -2048,6 +2115,7 @@ impl CubeScanWrapperNode { *when, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; let (then, sql_query_next) = Self::generate_sql_for_expr( @@ -2057,6 +2125,7 @@ impl CubeScanWrapperNode { *then, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = sql_query_next; @@ -2070,6 +2139,7 @@ impl CubeScanWrapperNode { *else_expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; sql_query = sql_query_next; @@ -2093,6 +2163,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let data_type = Self::generate_sql_type(sql_generator.clone(), data_type)?; @@ -2113,6 +2184,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; let resulting_sql = sql_generator @@ -2446,6 +2518,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2483,6 +2556,7 @@ impl CubeScanWrapperNode { args[1].clone(), push_to_cube_context, subqueries.clone(), + used_members, ) .await?; return Ok(( @@ -2526,6 +2600,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2566,6 +2641,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2595,6 +2671,7 @@ impl CubeScanWrapperNode { expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2623,6 +2700,7 @@ impl CubeScanWrapperNode { expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2664,6 +2742,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2678,6 +2757,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2692,6 +2772,7 @@ impl CubeScanWrapperNode { arg, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2728,6 +2809,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2740,6 +2822,7 @@ impl CubeScanWrapperNode { expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2771,6 +2854,7 @@ impl CubeScanWrapperNode { *expr, push_to_cube_context, subqueries.clone(), + used_members.as_deref_mut(), ) .await?; sql_query = query; @@ -2781,6 +2865,7 @@ impl CubeScanWrapperNode { *subquery, push_to_cube_context, subqueries.clone(), + used_members, ) .await?; sql_query = query; diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index b0ec639e65420..5593db5c60144 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -7285,7 +7285,7 @@ ORDER BY json!({ "cube_name": "WideCube", "alias": "pivot_grouping", - "cube_params": ["WideCube"], + "cube_params": [], "expr": "0", "grouping_set": null, }) @@ -11809,16 +11809,16 @@ ORDER BY "source"."str0" ASC json!({ "cube_name": "KibanaSampleDataEcommerce", "alias": "ta_1_order_date_", - "cube_params": ["KibanaSampleDataEcommerce", "Logs"], + "cube_params": ["KibanaSampleDataEcommerce"], "expr": "((${KibanaSampleDataEcommerce.order_date} = DATE('1994-05-01')) OR (${KibanaSampleDataEcommerce.order_date} = DATE('1996-05-03')))", "grouping_set": null, }).to_string(), ]), segments: Some(vec![ json!({ - "cube_name": "KibanaSampleDataEcommerce", + "cube_name": "Logs", "alias": "lower_ta_2_conte", - "cube_params": ["KibanaSampleDataEcommerce", "Logs"], + "cube_params": ["Logs"], "expr": "(LOWER(${Logs.content}) = $0$)", "grouping_set": null, }).to_string(), diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index df6a743e69216..0973f7d374261 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1556,8 +1556,8 @@ async fn wrapper_cast_limit_explicit_members() { .as_logical_plan() .find_cube_scan_wrapped_sql() .request; - assert_eq!(request.measures.unwrap().len(), 1); - assert_eq!(request.dimensions.unwrap().len(), 0); + assert_eq!(request.measures.unwrap().len(), 0); + assert_eq!(request.dimensions.unwrap().len(), 1); } #[tokio::test]