diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseTimeDimension.ts b/packages/cubejs-schema-compiler/src/adapter/BaseTimeDimension.ts index 28e16a2c87a64..c5305250c0fa7 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseTimeDimension.ts +++ b/packages/cubejs-schema-compiler/src/adapter/BaseTimeDimension.ts @@ -70,10 +70,14 @@ export class BaseTimeDimension extends BaseFilter { return super.aliasName(); } - // @ts-ignore - public unescapedAliasName(granularity: string) { + public unescapedAliasName(granularity?: string) { const actualGranularity = granularity || this.granularityObj?.granularity || 'day'; + const fullName = `${this.dimension}.${actualGranularity}`; + if (this.query.options.memberToAlias && this.query.options.memberToAlias[fullName]) { + return this.query.options.memberToAlias[fullName]; + } + return `${this.query.aliasName(this.dimension)}_${actualGranularity}`; // TODO date here for rollups } diff --git a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap index ae9fc2de07c05..f027d7904b85a 100644 --- a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap +++ b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap @@ -232,6 +232,84 @@ Array [ ] `; +exports[`SQL API Postgres (Data) select __user and literal grouped under wrapper: select __user and literal in wrapper 1`] = ` +Array [ + Object { + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "new", + "my_user": null, + }, + Object { + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "processed", + "my_user": null, + }, + Object { + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "shipped", + "my_user": null, + }, +] +`; + +exports[`SQL API Postgres (Data) select __user and literal grouped: select __user and literal 1`] = ` +Array [ + Object { + "Int64(2)": "2", + "__cubeJoinField": null, + "datetrunc(Utf8(\\"day\\"),Orders.createdAt)": 2024-01-01T00:00:00.000Z, + "id": 1, + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "new", + "my_user": null, + }, + Object { + "Int64(2)": "2", + "__cubeJoinField": null, + "datetrunc(Utf8(\\"day\\"),Orders.createdAt)": 2024-01-02T00:00:00.000Z, + "id": 2, + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "new", + "my_user": null, + }, + Object { + "Int64(2)": "2", + "__cubeJoinField": null, + "datetrunc(Utf8(\\"day\\"),Orders.createdAt)": 2024-01-03T00:00:00.000Z, + "id": 3, + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "processed", + "my_user": null, + }, + Object { + "Int64(2)": "2", + "__cubeJoinField": null, + "datetrunc(Utf8(\\"day\\"),Orders.createdAt)": 2024-01-04T00:00:00.000Z, + "id": 4, + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "processed", + "my_user": null, + }, + Object { + "Int64(2)": "2", + "__cubeJoinField": null, + "datetrunc(Utf8(\\"day\\"),Orders.createdAt)": 2024-01-05T00:00:00.000Z, + "id": 5, + "my_created_at": 2024-01-01T00:00:00.000Z, + "my_literal": "1", + "my_status": "shipped", + "my_user": null, + }, +] +`; + exports[`SQL API Postgres (Data) select null in subquery with streaming 1`] = ` Array [ Object { diff --git a/packages/cubejs-testing/test/smoke-cubesql.test.ts b/packages/cubejs-testing/test/smoke-cubesql.test.ts index 0833ed85b40c3..ab816da2ecf28 100644 --- a/packages/cubejs-testing/test/smoke-cubesql.test.ts +++ b/packages/cubejs-testing/test/smoke-cubesql.test.ts @@ -404,6 +404,78 @@ describe('SQL API', () => { expect(res.rows).toEqual([{ max: null }]); }); + test('select __user and literal grouped', async () => { + const query = ` + SELECT + status AS my_status, + date_trunc('month', createdAt) AS my_created_at, + __user AS my_user, + 1 AS my_literal, + -- Columns without aliases should also work + id, + date_trunc('day', createdAt), + __cubeJoinField, + 2 + FROM + Orders + GROUP BY 1,2,3,4,5,6,7,8 + ORDER BY 1,2,3,4,5,6,7,8 + `; + + const res = await connection.query(query); + expect(res.rows).toMatchSnapshot('select __user and literal'); + }); + + test('select __user and literal grouped under wrapper', async () => { + const query = ` + WITH +-- This subquery should be represented as CubeScan(ungrouped=false) inside CubeScanWrapper +cube_scan_subq AS ( + SELECT + status AS my_status, + date_trunc('month', createdAt) AS my_created_at, + __user AS my_user, + 1 AS my_literal, + -- Columns without aliases should also work + id, + date_trunc('day', createdAt), + __cubeJoinField, + 2 + FROM Orders + GROUP BY 1,2,3,4,5,6,7,8 +), +filter_subq AS ( + SELECT + status status_filter + FROM Orders + GROUP BY + status_filter +) + SELECT + -- Should use SELECT * here to reference columns without aliases. + -- But it's broken ATM in DF, initial plan contains \`Projection: ... #__subquery-0.logs_content_filter\` on top, but it should not be there + -- TODO fix it + my_created_at, + my_status, + my_user, + my_literal + FROM cube_scan_subq + WHERE + -- This subquery filter should trigger wrapping of whole query + my_status IN ( + SELECT + status_filter + FROM filter_subq + ) + GROUP BY 1,2,3,4 + ORDER BY 1,2,3,4 + ; + `; + + const res = await connection.query(query); + expect(res.rows).toMatchSnapshot('select __user and literal in wrapper'); + }); + test('where segment is false', async () => { const query = 'SELECT value AS val, * FROM "SegmentTest" WHERE segment_eq_1 IS FALSE ORDER BY value;'; diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index 3cf95d0ba8a29..71e934dcffa26 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -34,7 +34,7 @@ use serde::{Deserialize, Serialize}; use std::{ any::Any, cmp::min, - collections::HashMap, + collections::{HashMap, HashSet}, convert::TryInto, fmt, future::Future, @@ -252,7 +252,7 @@ impl CubeScanWrapperNode { } } -fn expr_name(e: &Expr, schema: &Arc) -> Result { +fn expr_name(e: &Expr, schema: &DFSchema) -> Result { match e { Expr::Column(col) => Ok(col.name.clone()), Expr::Sort { expr, .. } => expr_name(expr, schema), @@ -260,10 +260,194 @@ fn expr_name(e: &Expr, schema: &Arc) -> Result { } } +/// Holds column remapping for generated SQL +/// Can be used to remap expression in logical plans on top, +/// and to generate mapping between schema and Cube load query in wrapper +#[derive(Debug)] +pub struct ColumnRemapping { + column_remapping: HashMap, +} + +impl ColumnRemapping { + /// Generate member_fields for CubeScanExecutionPlan, which contains SQL with this remapping. + /// Cube will respond with aliases after remapping, which we must use to read response. + /// Schema in DF will stay the same as before remapping. + /// So result would have all aliases after remapping in order derived from `schema`. + pub fn member_fields(&self, schema: &DFSchema) -> Vec { + schema + .fields() + .iter() + .map(|f| { + MemberField::Member( + self.column_remapping + .get(&Column::from_name(f.name().to_string())) + .map(|x| x.name.to_string()) + .unwrap_or(f.name().to_string()), + ) + }) + .collect() + } + + /// Replace every column expression in `expr` according to this remapping. Column expressions + /// not present in `self` will stay the same. + pub fn remap(&self, expr: &Expr) -> result::Result { + replace_col( + expr.clone(), + &self.column_remapping.iter().map(|(k, v)| (k, v)).collect(), + ) + .map_err(|_| CubeError::internal(format!("Can't rename columns for expr: {expr:?}",))) + } +} + +/// Builds new column mapping +/// One remapper for one context: all unqualified columns with same name are assumed the same column +struct Remapper { + from_alias: Option, + can_rename_columns: bool, + remapping: HashMap, + used_targets: HashSet, +} + +impl Remapper { + /// Constructs new Remapper + /// `from_alias` would be used as qualifier after remapping + /// When `can_rename_columns` is enabled, column names will be generated. + /// When it's disabled, column names must stay the same. + /// Column qualifiers can change in both cases. + pub fn new(from_alias: Option, can_rename_columns: bool) -> Self { + Remapper { + from_alias, + can_rename_columns, + + remapping: HashMap::new(), + used_targets: HashSet::new(), + } + } + + fn generate_new_alias(&self, start_from: String) -> String { + static NON_ID_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"[^a-zA-Z0-9_]").unwrap()); + + let alias = start_from; + let mut truncated_alias = NON_ID_REGEX + .replace_all(&alias, "_") + .trim_start_matches("_") + .to_lowercase(); + truncated_alias.truncate(16); + let mut alias = truncated_alias.clone(); + for i in 1..10000 { + if !self.used_targets.contains(&alias) { + break; + } + alias = format!("{}_{}", truncated_alias, i); + } + alias + } + + fn new_alias( + &self, + original_alias: &String, + start_from: Option, + ) -> result::Result { + let alias = if self.can_rename_columns { + self.generate_new_alias(start_from.unwrap_or_else(|| original_alias.clone())) + } else { + original_alias.clone() + }; + + if self.used_targets.contains(&alias) { + return Err(CubeError::internal(format!( + "Can't generate SQL for column expr: duplicate alias {alias}" + ))); + } + + Ok(alias) + } + + fn insert_new_alias(&mut self, original_column: &Column, new_alias: &String) { + let target_column = Column { + name: new_alias.clone(), + relation: self.from_alias.clone(), + }; + + self.used_targets.insert(new_alias.clone()); + self.remapping.insert( + Column::from_name(&original_column.name), + target_column.clone(), + ); + if let Some(from_alias) = &self.from_alias { + self.remapping.insert( + Column { + name: original_column.name.clone(), + relation: Some(from_alias.clone()), + }, + target_column.clone(), + ); + if let Some(original_relation) = &original_column.relation { + if original_relation != from_alias { + self.remapping + .insert(original_column.clone(), target_column); + } + } + } + } + + pub fn add_column(&mut self, column: &Column) -> result::Result { + if let Some(alias_column) = self.remapping.get(column) { + return Ok(alias_column.name.clone()); + } + + let new_alias = self.new_alias(&column.name, None)?; + self.insert_new_alias(column, &new_alias); + + Ok(new_alias) + } + + /// Generate new alias for expression + /// `original_expr` is the one we are generating alias for + /// `expr` can be same or modified, i.e. when previous column remapping is applied. + /// `expr` would be used to generate new alias when `can_rename_columns` is enabled. + /// When `original_expr` is column it would remap both unqualified and qualified colunms to new alias + pub fn add_expr( + &mut self, + schema: &DFSchema, + original_expr: &Expr, + expr: &Expr, + ) -> result::Result { + let original_alias = expr_name(original_expr, schema)?; + let original_alias_key = Column::from_name(&original_alias); + if let Some(alias_column) = self.remapping.get(&original_alias_key) { + return Ok(alias_column.name.clone()); + } + + let start_from = expr_name(&expr, &schema)?; + let alias = self.new_alias(&original_alias, Some(start_from))?; + + let original_column = if let Expr::Column(column) = &original_expr { + column + } else { + &Column::from_name(original_alias) + }; + self.insert_new_alias(original_column, &alias); + + Ok(alias) + } + + pub fn into_remapping(self) -> Option { + if self.remapping.len() > 0 { + Some(ColumnRemapping { + column_remapping: self.remapping, + }) + } else { + None + } + } +} + pub struct SqlGenerationResult { pub data_source: Option, pub from_alias: Option, - pub column_remapping: Option>, + pub column_remapping: Option, pub sql: SqlQuery, pub request: TransportLoadRequestQuery, } @@ -332,11 +516,7 @@ impl CubeScanWrapperNode { .await .and_then(|SqlGenerationResult { data_source, mut sql, request, column_remapping, .. }| -> result::Result<_, CubeError> { let member_fields = if let Some(column_remapping) = column_remapping { - schema - .fields() - .iter() - .map(|f| MemberField::Member(column_remapping.get(&Column::from_name(f.name().to_string())).map(|x| x.name.to_string()).unwrap_or(f.name().to_string()))) - .collect() + column_remapping.member_fields(schema) } else { schema .fields() @@ -465,40 +645,145 @@ impl CubeScanWrapperNode { node ))); } + let data_source = &data_sources[0]; let mut meta_with_user = load_request_meta.as_ref().clone(); meta_with_user.set_change_user(node.options.change_user.clone()); + + // Single CubeScan can represent join of multiple table scans + // Multiple table scans can have multiple different aliases + // It means that column expressions on top of this node can have multiple different qualifiers + // CubeScan can have only one alias, so we remap every column to use that alias + + // Columns in node.schema can have arbitrary names, assigned by DF + // Stuff like `datetrunc(Utf8("month"), col)` + // They can be very long, and contain unwanted character + // So we rename them + + let from_alias = node + .schema + .fields() + .iter() + .next() + .and_then(|f| f.qualifier().cloned()); + let mut remapper = Remapper::new(from_alias.clone(), true); + let mut member_to_alias = HashMap::new(); + // Probably it should just use member expression for all MemberField::Literal + // But turning literals to dimensions could mess up with NULL in grouping key and joins on Cube.js side (like in fullKeyQuery) + // And tuning literals to measures would require ugly wrapping with noop aggregation function + // TODO investigate Cube.js joins, try to implement dimension member expression + let mut has_literal_members = false; + let mut wrapper_exprs = vec![]; + + for (member, field) in + node.member_fields.iter().zip(node.schema.fields().iter()) + { + let alias = remapper.add_column(&field.qualified_column())?; + let expr = match member { + MemberField::Member(f) => { + member_to_alias.insert(f.to_string(), alias.clone()); + // `alias` is column name that would be generated by Cube.js, just reference that + Expr::Column(Column::from_name(alias.clone())) + } + MemberField::Literal(value) => { + has_literal_members = true; + // Don't care for `member_to_alias`, Cube.js does not handle literals + // Generate literal expression, and put alias into remapper to use higher up + Expr::Literal(value.clone()) + } + }; + wrapper_exprs.push((expr, alias)); + } + + // This is SQL for CubeScan from Cube.js + // It does have all the members with aliases from `member_to_alias` + // But it does not have any literal members let sql = transport .sql( node.span_id.clone(), node.request.clone(), node.auth_context, meta_with_user, - Some( - node.member_fields - .iter() - .zip(node.schema.fields().iter()) - .filter_map(|(m, field)| match m { - MemberField::Member(f) => { - Some((f.to_string(), field.name().to_string())) - } - _ => None, - }) - .collect(), - ), + Some(member_to_alias), None, ) .await?; - // TODO Add wrapper for reprojection and literal members handling + + // TODO is this check necessary? + let sql = if has_literal_members { + // Need to generate wrapper SELECT with literal columns + // Generated columns need to have same aliases as targets in `remapper` + // Because that's what plans higher up would use in generated SQL + let generator = plan + .meta + .data_source_to_sql_generator + .get(data_source) + .ok_or_else(|| { + CubeError::internal(format!( + "Can't generate SQL for CubeScan: no SQL generator for data source {data_source:?}" + )) + })? + .clone(); + + let mut columns = vec![]; + let mut new_sql = sql.sql; + + for (expr, alias) in wrapper_exprs { + // Don't use `generate_column_expr` here + // 1. `generate_column_expr` has different idea of literal members + // When generating column expression that points to literal member it would render literal and generate alias + // Here it should just generate the literal + // 2. It would not allow to provide aliases for expressions, instead it usually generates them + let (expr, sql) = Self::generate_sql_for_expr( + plan.clone(), + new_sql, + generator.clone(), + expr, + None, + Arc::new(HashMap::new()), + ) + .await?; + columns.push(AliasedColumn { expr, alias }); + new_sql = sql; + } + + // Use SQL from Cube.js as FROM, and prepared expressions as projection + let resulting_sql = generator + .get_sql_templates() + .select( + new_sql.sql.to_string(), + columns, + vec![], + vec![], + vec![], + // TODO + from_alias.clone().unwrap_or("".to_string()), + None, + None, + vec![], + None, + None, + false, + ) + .map_err(|e| { + DataFusionError::Internal(format!( + "Can't generate SQL for CubeScan in wrapped select: {}", + e + )) + })?; + new_sql.replace_sql(resulting_sql); + + new_sql + } else { + sql.sql + }; + + let column_remapping = remapper.into_remapping(); + return Ok(SqlGenerationResult { - data_source: Some(data_sources[0].clone()), - from_alias: node - .schema - .fields() - .iter() - .next() - .and_then(|f| f.qualifier().cloned()), - sql: sql.sql, - column_remapping: None, + data_source: Some(data_source.clone()), + from_alias, + sql, + column_remapping, request: node.request.clone(), }); } else if let Some(WrappedSelectNode { @@ -600,6 +885,8 @@ impl CubeScanWrapperNode { .await? }; + let column_remapping = column_remapping.as_ref(); + let mut subqueries_sql = HashMap::new(); for subquery in subqueries.iter() { let SqlGenerationResult { @@ -625,8 +912,8 @@ impl CubeScanWrapperNode { subqueries_sql.insert(field.qualified_name(), sql_string); } let subqueries_sql = Arc::new(subqueries_sql); - let mut next_remapping = HashMap::new(); let alias = alias.or(from_alias.clone()); + let mut next_remapper = Remapper::new(alias.clone(), can_rename_columns); if let Some(data_source) = data_source { let generator = plan .meta @@ -645,9 +932,8 @@ impl CubeScanWrapperNode { projection_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -660,9 +946,8 @@ impl CubeScanWrapperNode { flat_group_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -675,9 +960,8 @@ impl CubeScanWrapperNode { aggr_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -690,9 +974,8 @@ impl CubeScanWrapperNode { filter_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -705,9 +988,8 @@ impl CubeScanWrapperNode { window_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -720,9 +1002,8 @@ impl CubeScanWrapperNode { order_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -870,11 +1151,7 @@ impl CubeScanWrapperNode { data_source: Some(data_source), from_alias: alias, sql: sql_response.sql, - column_remapping: if next_remapping.len() > 0 { - Some(next_remapping) - } else { - None - }, + column_remapping: next_remapper.into_remapping(), request: load_request.clone(), }) } else { @@ -915,11 +1192,7 @@ impl CubeScanWrapperNode { data_source: Some(data_source), from_alias: alias, sql, - column_remapping: if next_remapping.len() > 0 { - Some(next_remapping) - } else { - None - }, + column_remapping: next_remapper.into_remapping(), request, }) } @@ -960,29 +1233,16 @@ impl CubeScanWrapperNode { exprs: Vec, mut sql: SqlQuery, generator: Arc, - column_remapping: &Option>, - next_remapping: &mut HashMap, - from_alias: Option, + column_remapping: Option<&ColumnRemapping>, + next_remapper: &mut Remapper, can_rename_columns: bool, ungrouped_scan_node: Option>, subqueries: Arc>, ) -> result::Result<(Vec, SqlQuery), CubeError> { - static NON_ID_REGEX: LazyLock = - LazyLock::new(|| Regex::new(r"[^a-zA-Z0-9_]").unwrap()); - let mut aliased_columns = Vec::new(); for original_expr in exprs { - let expr = if let Some(column_remapping) = column_remapping.as_ref() { - let mut expr = replace_col( - original_expr.clone(), - &column_remapping.iter().map(|(k, v)| (k, v)).collect(), - ) - .map_err(|_| { - CubeError::internal(format!( - "Can't rename columns for expr: {:?}", - original_expr - )) - })?; + let expr = if let Some(column_remapping) = column_remapping { + let mut expr = column_remapping.remap(&original_expr)?; if !can_rename_columns { let original_alias = expr_name(&original_expr, &schema)?; if original_alias != expr_name(&expr, &schema)? { @@ -1006,75 +1266,7 @@ impl CubeScanWrapperNode { Self::escape_interpolation_quotes(expr_sql, ungrouped_scan_node.is_some()); sql = new_sql_query; - let original_alias = expr_name(&original_expr, &schema)?; - let original_alias_key = Column::from_name(&original_alias); - if let Some(alias_column) = next_remapping.get(&original_alias_key) { - let alias = alias_column.name.clone(); - aliased_columns.push(AliasedColumn { - expr: expr_sql, - alias, - }); - continue; - } - - let alias = if can_rename_columns { - let alias = expr_name(&expr, &schema)?; - let mut truncated_alias = NON_ID_REGEX - .replace_all(&alias, "_") - .trim_start_matches("_") - .to_lowercase(); - truncated_alias.truncate(16); - let mut alias = truncated_alias.clone(); - for i in 1..10000 { - if !next_remapping - .iter() - .any(|(_, v)| v == &Column::from_name(&alias)) - { - break; - } - alias = format!("{}_{}", truncated_alias, i); - } - alias - } else { - original_alias.clone() - }; - if !next_remapping.contains_key(&Column::from_name(&alias)) { - next_remapping.insert(original_alias_key, Column::from_name(&alias)); - if let Some(from_alias) = &from_alias { - next_remapping.insert( - Column { - name: original_alias.clone(), - relation: Some(from_alias.clone()), - }, - Column { - name: alias.clone(), - relation: Some(from_alias.clone()), - }, - ); - if let Expr::Column(column) = &original_expr { - if let Some(original_relation) = &column.relation { - if original_relation != from_alias { - next_remapping.insert( - Column { - name: original_alias.clone(), - relation: Some(original_relation.clone()), - }, - Column { - name: alias.clone(), - relation: Some(from_alias.clone()), - }, - ); - } - } - } - } - } else { - return Err(CubeError::internal(format!( - "Can't generate SQL for column expr: duplicate alias {}", - alias - ))); - } - + let alias = next_remapper.add_expr(&schema, &original_expr, &expr)?; aliased_columns.push(AliasedColumn { expr: expr_sql, alias, diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index 2da928a683930..ed5d604436a6e 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -2274,7 +2274,7 @@ from logical_plan.find_cube_scan().request, V1LoadRequestQuery { measures: Some(vec![]), - dimensions: Some(vec![]), + dimensions: Some(vec!["KibanaSampleDataEcommerce.order_date".to_string()]), segments: Some(vec![]), order: Some(vec![]), ungrouped: Some(true), @@ -7362,7 +7362,10 @@ ORDER BY "source"."str0" ASC query_plan.as_logical_plan().find_cube_scan().request, V1LoadRequestQuery { measures: Some(vec![]), - dimensions: Some(vec![]), + dimensions: Some(vec![ + "WideCube.dim1".to_string(), + "WideCube.dim2".to_string(), + ]), segments: Some(vec![]), order: Some(vec![]), ungrouped: Some(true), diff --git a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs index 83f23e6dedef1..6ae446860960b 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/cost.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/cost.rs @@ -78,6 +78,11 @@ impl BestCubePlan { _ => 0, }; + let zero_members_wrapper = match enode { + LogicalPlanLanguage::WrappedSelect(_) => 1, + _ => 0, + }; + let cube_members = match enode { LogicalPlanLanguage::Measure(_) => 1, LogicalPlanLanguage::Dimension(_) => 1, @@ -196,6 +201,7 @@ impl BestCubePlan { non_pushed_down_window, non_pushed_down_grouping_sets, non_pushed_down_limit_sort, + zero_members_wrapper, cube_members, errors: this_errors, time_dimensions_used_as_dimensions, @@ -247,6 +253,11 @@ pub struct CubePlanCost { filters: i64, structure_points: i64, filter_members: i64, + // This is separate from both non_detected_cube_scans and cube_members + // Because it's ok to use all members inside wrapper (so non_detected_cube_scans would be zero) + // And we want to select representation with less members + // But only when members are present! + zero_members_wrapper: i64, cube_members: i64, errors: i64, time_dimensions_used_as_dimensions: i64, @@ -350,6 +361,11 @@ impl CubePlanCost { non_pushed_down_limit_sort: self.non_pushed_down_limit_sort + other.non_pushed_down_limit_sort, member_errors: self.member_errors + other.member_errors, + zero_members_wrapper: (if other.cube_members == 0 { + self.zero_members_wrapper + } else { + 0 + }) + other.zero_members_wrapper, cube_members: self.cube_members + other.cube_members, errors: self.errors + other.errors, structure_points: self.structure_points + other.structure_points, @@ -403,6 +419,8 @@ impl CubePlanCost { SortState::Current if top_down => self.non_pushed_down_limit_sort, _ => 0, }, + // Don't track state here: we want representation that have fewer wrappers with zero members _in total_ + zero_members_wrapper: self.zero_members_wrapper, cube_members: self.cube_members, errors: self.errors, structure_points: self.structure_points, diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index 6fcdaa5e38452..b89241fab2c01 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1,11 +1,13 @@ -use cubeclient::models::V1LoadRequestQuery; -use datafusion::physical_plan::displayable; +use cubeclient::models::{V1LoadRequestQuery, V1LoadRequestQueryTimeDimension}; +use datafusion::{physical_plan::displayable, scalar::ScalarValue}; use pretty_assertions::assert_eq; +use regex::Regex; use serde_json::json; use std::sync::Arc; use crate::{ compile::{ + engine::df::scan::MemberField, rewrite::rewriter::Rewriter, test::{ convert_select_to_query_plan, convert_select_to_query_plan_customized, @@ -807,7 +809,7 @@ async fn test_case_wrapper_alias_with_order() { .wrapped_sql .unwrap() .sql - .contains("ORDER BY \"case_when_a_cust\"")); + .contains("ORDER BY \"a\".\"case_when_a_cust\"")); let physical_plan = query_plan.as_physical_plan().await.unwrap(); println!( @@ -935,7 +937,7 @@ async fn test_case_wrapper_ungrouped_sorted_aliased() { .unwrap() .sql // TODO test without depend on column name - .contains("ORDER BY \"case_when")); + .contains("ORDER BY \"a\".\"case_when")); } #[tokio::test] @@ -1147,6 +1149,138 @@ async fn test_case_wrapper_escaping() { .contains("\\\\\\\\\\\\`")); } +/// Test aliases for grouped CubeScan in wrapper +/// qualifiers from join should get remapped to single from alias +/// long generated aliases from Datafusion should get shortened +#[tokio::test] +async fn test_join_wrapper_cubescan_aliasing() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" +WITH +-- This subquery should be represented as CubeScan(ungrouped=false) inside CubeScanWrapper +cube_scan_subq AS ( + SELECT + logs_alias.content logs_content, + DATE_TRUNC('month', kibana_alias.last_mod) last_mod_month, + kibana_alias.__user AS cube_user, + 1 AS literal, + -- Columns without aliases should also work + DATE_TRUNC('month', kibana_alias.order_date), + kibana_alias.__cubeJoinField, + 2, + CASE + WHEN sum(kibana_alias."sumPrice") IS NOT NULL + THEN sum(kibana_alias."sumPrice") + ELSE 0 + END sum_price + FROM KibanaSampleDataEcommerce kibana_alias + JOIN Logs logs_alias + ON kibana_alias.__cubeJoinField = logs_alias.__cubeJoinField + GROUP BY 1,2,3,4,5,6,7 +), +filter_subq AS ( + SELECT + Logs.content logs_content_filter + FROM Logs + GROUP BY + logs_content_filter +) +SELECT + -- Should use SELECT * here to reference columns without aliases. + -- But it's broken ATM in DF, initial plan contains `Projection: ... #__subquery-0.logs_content_filter` on top, but it should not be there + -- TODO fix it + logs_content, + cube_user, + literal +FROM cube_scan_subq +WHERE + -- This subquery filter should trigger wrapping of whole query + logs_content IN ( + SELECT + logs_content_filter + FROM filter_subq + ) +; +"# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let logical_plan = query_plan.as_logical_plan(); + let sql = logical_plan + .find_cube_scan_wrapper() + .wrapped_sql + .unwrap() + .sql; + + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec!["KibanaSampleDataEcommerce.sumPrice".to_string(),]), + dimensions: Some(vec!["Logs.content".to_string(),]), + time_dimensions: Some(vec![ + V1LoadRequestQueryTimeDimension { + dimension: "KibanaSampleDataEcommerce.last_mod".to_string(), + granularity: Some("month".to_string()), + date_range: None, + }, + V1LoadRequestQueryTimeDimension { + dimension: "KibanaSampleDataEcommerce.order_date".to_string(), + granularity: Some("month".to_string()), + date_range: None, + }, + ]), + segments: Some(vec![]), + order: Some(vec![]), + ..Default::default() + } + ); + + assert_eq!( + logical_plan.find_cube_scan().member_fields, + vec![ + MemberField::Member("Logs.content".to_string()), + MemberField::Member("KibanaSampleDataEcommerce.last_mod.month".to_string()), + MemberField::Literal(ScalarValue::Utf8(None)), + MemberField::Literal(ScalarValue::Int64(Some(1))), + MemberField::Member("KibanaSampleDataEcommerce.order_date.month".to_string()), + MemberField::Literal(ScalarValue::Utf8(None)), + MemberField::Literal(ScalarValue::Int64(Some(2))), + MemberField::Member("KibanaSampleDataEcommerce.sumPrice".to_string()), + ], + ); + + // Check that all aliases from different tables have same qualifier, and that names are simple and short + // logs_content => logs_alias.content + // last_mod_month => DATE_TRUNC('month', kibana_alias.last_mod), + // sum_price => CASE WHEN sum(kibana_alias."sumPrice") ... END + let content_re = Regex::new(r#""logs_alias"."[a-zA-Z0-9_]{1,16}" "logs_content""#).unwrap(); + assert!(content_re.is_match(&sql)); + let last_mod_month_re = + Regex::new(r#""logs_alias"."[a-zA-Z0-9_]{1,16}" "last_mod_month""#).unwrap(); + assert!(last_mod_month_re.is_match(&sql)); + let sum_price_re = Regex::new(r#"CASE WHEN "logs_alias"."[a-zA-Z0-9_]{1,16}" IS NOT NULL THEN "logs_alias"."[a-zA-Z0-9_]{1,16}" ELSE 0 END "sum_price""#) + .unwrap(); + assert!(sum_price_re.is_match(&sql)); + let cube_user_re = Regex::new(r#""logs_alias"."[a-zA-Z0-9_]{1,16}" "cube_user""#).unwrap(); + assert!(cube_user_re.is_match(&sql)); + let literal_re = Regex::new(r#""logs_alias"."[a-zA-Z0-9_]{1,16}" "literal""#).unwrap(); + assert!(literal_re.is_match(&sql)); +} + /// Test that WrappedSelect(... limit=Some(0) ...) will render it correctly #[tokio::test] async fn test_wrapper_limit_zero() { @@ -1261,6 +1395,7 @@ async fn test_wrapper_filter_flatten() { } /// Regular aggregation over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan +/// and inner ungrouped CubeScan should have both proper members and limit #[tokio::test] async fn wrapper_agg_over_limit() { if !Rewriter::sql_push_down_enabled() { @@ -1299,7 +1434,9 @@ async fn wrapper_agg_over_limit() { logical_plan.find_cube_scan().request, V1LoadRequestQuery { measures: Some(vec![]), - dimensions: Some(vec![]), + dimensions: Some(vec![ + "KibanaSampleDataEcommerce.customer_gender".to_string(), + ]), segments: Some(vec![]), order: Some(vec![]), limit: Some(5), @@ -1324,6 +1461,7 @@ async fn wrapper_agg_over_limit() { } /// Aggregation(dimension) over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan +/// and inner ungrouped CubeScan should have both proper members and limit #[tokio::test] async fn wrapper_agg_dimension_over_limit() { if !Rewriter::sql_push_down_enabled() { @@ -1360,7 +1498,9 @@ async fn wrapper_agg_dimension_over_limit() { logical_plan.find_cube_scan().request, V1LoadRequestQuery { measures: Some(vec![]), - dimensions: Some(vec![]), + dimensions: Some(vec![ + "KibanaSampleDataEcommerce.customer_gender".to_string(), + ]), segments: Some(vec![]), order: Some(vec![]), limit: Some(5),