From 3bfe8121be684fb08e22e27efb59877734276dba Mon Sep 17 00:00:00 2001 From: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> Date: Tue, 31 Mar 2026 13:58:56 +0400 Subject: [PATCH] feat(cubesql): Support PatchMeasure for view measures (#10571) --- .../src/adapter/BaseMeasure.ts | 30 ++- .../src/adapter/BaseQuery.js | 33 ++- .../src/compiler/CubeEvaluator.ts | 1 + .../postgresql/schema/Orders.js | 22 ++ .../__snapshots__/smoke-cubesql.test.ts.snap | 18 +- .../cubejs-testing/test/smoke-cubesql.test.ts | 32 +++ .../rewrite/rules/wrapper/aggregate.rs | 219 ++++++++++++------ rust/cubesql/cubesql/src/transport/ext.rs | 4 +- 8 files changed, 272 insertions(+), 87 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseMeasure.ts b/packages/cubejs-schema-compiler/src/adapter/BaseMeasure.ts index 2c6eaa6624066..6fff3530ee121 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseMeasure.ts +++ b/packages/cubejs-schema-compiler/src/adapter/BaseMeasure.ts @@ -18,10 +18,11 @@ export class BaseMeasure { protected preparePatchedMeasure(sourceMeasure: string, newMeasureType: string | null, addFilters: Array<{sql: Function}>): MeasureDefinition { const source = this.query.cubeEvaluator.measureByPath(sourceMeasure); + const aggType = source.aggType ?? source.type; - let resultMeasureType = source.type; + let resultMeasureType = aggType; if (newMeasureType !== null) { - switch (source.type) { + switch (aggType) { case 'sum': case 'avg': case 'min': @@ -32,29 +33,35 @@ export class BaseMeasure { case 'min': case 'max': case 'count_distinct': + case 'countDistinct': case 'count_distinct_approx': + case 'countDistinctApprox': // Can change from avg/... to count_distinct // Latter does not care what input value is // ok, do nothing break; default: throw new UserError( - `Unsupported measure type replacement for ${sourceMeasure}: ${source.type} => ${newMeasureType}` + `Unsupported measure type replacement for ${sourceMeasure}: ${aggType} => ${newMeasureType}` ); } break; case 'count_distinct': + case 'countDistinct': case 'count_distinct_approx': + case 'countDistinctApprox': switch (newMeasureType) { case 'count_distinct': + case 'countDistinct': case 'count_distinct_approx': + case 'countDistinctApprox': // ok, do nothing break; default: // Can not change from count_distinct to avg/... // Latter do care what input value is, and original measure can be defined on strings throw new UserError( - `Unsupported measure type replacement for ${sourceMeasure}: ${source.type} => ${newMeasureType}` + `Unsupported measure type replacement for ${sourceMeasure}: ${aggType} => ${newMeasureType}` ); } break; @@ -64,7 +71,7 @@ export class BaseMeasure { // Can not change from count // There's no SQL at all throw new UserError( - `Unsupported measure type replacement for ${sourceMeasure}: ${source.type} => ${newMeasureType}` + `Unsupported measure type replacement for ${sourceMeasure}: ${aggType} => ${newMeasureType}` ); } @@ -81,14 +88,16 @@ export class BaseMeasure { case 'max': case 'count': case 'count_distinct': + case 'countDistinct': case 'count_distinct_approx': + case 'countDistinctApprox': // ok, do nothing break; default: // Can not add filters to string, time, boolean, number // Aggregation is already included in SQL, it's hard to patch that throw new UserError( - `Unsupported additional filters for measure ${sourceMeasure} type ${source.type}` + `Unsupported additional filters for measure ${sourceMeasure} type ${aggType}` ); } @@ -97,9 +106,16 @@ export class BaseMeasure { const patchedFrom = this.query.cubeEvaluator.parsePath('measures', sourceMeasure); + // For view measures, `type` is `number` (aggregation is embedded in SQL) + // while `aggType` carries the real aggregation kind. We must preserve that + // distinction to avoid double-wrapping (e.g. SUM(SUM(...))). + const typeFields = source.aggType != null + ? { type: source.type, aggType: resultMeasureType } + : { type: resultMeasureType }; + return { ...source, - type: resultMeasureType, + ...typeFields, filters: resultFilters, patchedFrom: { cubeName: patchedFrom[0], diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index f1137a4d607f0..20381e39bbbf6 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -3319,13 +3319,28 @@ export class BaseQuery { const primaryKeys = this.cubeEvaluator.primaryKeys[cubeName]; const orderBySql = (symbol.orderBy || []).map(o => ({ sql: this.evaluateSql(cubeName, o.sql), dir: o.dir })); let sql; + let patchedSymbol = symbol; if (symbol.type !== 'rank') { - sql = symbol.sql && this.evaluateSql(cubeName, symbol.sql) || + const evaluateSql = () => symbol.sql && this.evaluateSql(cubeName, symbol.sql) || primaryKeys.length && ( primaryKeys.length > 1 ? this.concatStringsSql(primaryKeys.map((pk) => this.castToString(this.primaryKeySql(pk, cubeName)))) : this.primaryKeySql(primaryKeys[0], cubeName) ) || '*'; + // For patched view measures (aggType is set), the view's sql resolves to + // already-aggregated SQL (e.g. SUM(col)). Filters must be applied inside + // that aggregation, not outside. We pre-evaluate the filter SQL at the + // view level, push it down via context, and skip filters at this level. + const isPatchedViewMeasure = symbol.aggType && symbol.patchedFrom && symbol.filters?.length; + if (isPatchedViewMeasure) { + const pushDownFilterSql = this.evaluateFiltersArray(symbol.filters, cubeName); + sql = this.evaluateSymbolSqlWithContext(evaluateSql, { + patchMeasurePushDownFilterSql: pushDownFilterSql, + }); + patchedSymbol = { ...symbol, filters: [] }; + } else { + sql = evaluateSql(); + } } const result = this.renderSqlMeasure( name, @@ -3335,7 +3350,7 @@ export class BaseQuery { sql, isMemberExpr, ), - symbol, + patchedSymbol, cubeName ), symbol, @@ -3836,11 +3851,21 @@ export class BaseQuery { } applyMeasureFilters(evaluateSql, symbol, cubeName) { - if (!symbol.filters || !symbol.filters.length) { + const pushDownFilterSql = this.safeEvaluateSymbolContext().patchMeasurePushDownFilterSql; + const hasOwnFilters = symbol.filters && symbol.filters.length; + + if (!hasOwnFilters && !pushDownFilterSql) { return evaluateSql; } - const where = this.evaluateMeasureFilters(symbol, cubeName); + const parts = []; + if (hasOwnFilters) { + parts.push(this.evaluateMeasureFilters(symbol, cubeName)); + } + if (pushDownFilterSql) { + parts.push(pushDownFilterSql); + } + const where = parts.join(' AND '); return `CASE WHEN ${where} THEN ${evaluateSql === '*' ? '1' : evaluateSql} END`; } diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts index 73ad1dfa240ec..2866c6f5d29a0 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts @@ -60,6 +60,7 @@ export type TimeShiftDefinitionReference = { export type MeasureDefinition = { type: string; + aggType?: string, sql(): string; ownedByCube: boolean; rollingWindow?: any diff --git a/packages/cubejs-testing/birdbox-fixtures/postgresql/schema/Orders.js b/packages/cubejs-testing/birdbox-fixtures/postgresql/schema/Orders.js index 8f77a81fd3c97..bf612a1b5ea59 100644 --- a/packages/cubejs-testing/birdbox-fixtures/postgresql/schema/Orders.js +++ b/packages/cubejs-testing/birdbox-fixtures/postgresql/schema/Orders.js @@ -24,6 +24,10 @@ cube(`Orders`, { type: `count_distinct`, sql: `CASE WHEN ${Orders.status} = 'shipped' THEN ${CUBE}.id END` }, + approxOrderCount: { + type: `count_distinct_approx`, + sql: `CASE WHEN ${Orders.status} = 'shipped' THEN ${CUBE}.id END` + }, netCollectionCompleted: { type: `sum`, sql: `CASE WHEN ${Orders.status} = 'shipped' THEN ${CUBE}.amount END` @@ -50,6 +54,24 @@ cube(`Orders`, { format: `currency`, currency: `usd`, }, + avgAmount: { + sql: `amount`, + type: `avg`, + format: `currency`, + currency: `usd`, + }, + minAmount: { + sql: `amount`, + type: `min`, + format: `currency`, + currency: `usd`, + }, + maxAmount: { + sql: `amount`, + type: `max`, + format: `currency`, + currency: `usd`, + }, toRemove: { type: `count`, }, diff --git a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap index f3cfa3b8f5d26..886271f355aaf 100644 --- a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap +++ b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap @@ -149,8 +149,8 @@ Object { exports[`SQL API Cube SQL over HTTP sql4sql regular query with missing column 1`] = ` Object { "body": Object { - "error": "Error: SQLCompilationError: Internal: Initial planning error: Error during planning: Invalid identifier '#foobar' for schema fields:[Orders.count, Orders.orderCount, Orders.netCollectionCompleted, Orders.arpu, Orders.refundRate, Orders.refundOrdersCount, Orders.overallOrders, Orders.totalAmount, Orders.toRemove, Orders.numberTotal, Orders.amountRank, Orders.amountReducedByStatus, Orders.statusPercentageOfTotal, Orders.amountRankView, Orders.amountRankDateMax, Orders.amountRankDate, Orders.countAndTotalAmount, Orders.createdAtMax, Orders.createdAtMaxProxy, Orders.id, Orders.status, Orders.createdAt, Orders.updatedAt, Orders.__user, Orders.__cubeJoinField], metadata:{}", - "stack": "Error: SQLCompilationError: Internal: Initial planning error: Error during planning: Invalid identifier '#foobar' for schema fields:[Orders.count, Orders.orderCount, Orders.netCollectionCompleted, Orders.arpu, Orders.refundRate, Orders.refundOrdersCount, Orders.overallOrders, Orders.totalAmount, Orders.toRemove, Orders.numberTotal, Orders.amountRank, Orders.amountReducedByStatus, Orders.statusPercentageOfTotal, Orders.amountRankView, Orders.amountRankDateMax, Orders.amountRankDate, Orders.countAndTotalAmount, Orders.createdAtMax, Orders.createdAtMaxProxy, Orders.id, Orders.status, Orders.createdAt, Orders.updatedAt, Orders.__user, Orders.__cubeJoinField], metadata:{}", + "error": "Error: SQLCompilationError: Internal: Initial planning error: Error during planning: Invalid identifier '#foobar' for schema fields:[Orders.count, Orders.orderCount, Orders.approxOrderCount, Orders.netCollectionCompleted, Orders.arpu, Orders.refundRate, Orders.refundOrdersCount, Orders.overallOrders, Orders.totalAmount, Orders.avgAmount, Orders.minAmount, Orders.maxAmount, Orders.toRemove, Orders.numberTotal, Orders.amountRank, Orders.amountReducedByStatus, Orders.statusPercentageOfTotal, Orders.amountRankView, Orders.amountRankDateMax, Orders.amountRankDate, Orders.countAndTotalAmount, Orders.createdAtMax, Orders.createdAtMaxProxy, Orders.id, Orders.status, Orders.createdAt, Orders.updatedAt, Orders.__user, Orders.__cubeJoinField], metadata:{}", + "stack": "Error: SQLCompilationError: Internal: Initial planning error: Error during planning: Invalid identifier '#foobar' for schema fields:[Orders.count, Orders.orderCount, Orders.approxOrderCount, Orders.netCollectionCompleted, Orders.arpu, Orders.refundRate, Orders.refundOrdersCount, Orders.overallOrders, Orders.totalAmount, Orders.avgAmount, Orders.minAmount, Orders.maxAmount, Orders.toRemove, Orders.numberTotal, Orders.amountRank, Orders.amountReducedByStatus, Orders.statusPercentageOfTotal, Orders.amountRankView, Orders.amountRankDateMax, Orders.amountRankDate, Orders.countAndTotalAmount, Orders.createdAtMax, Orders.createdAtMaxProxy, Orders.id, Orders.status, Orders.createdAt, Orders.updatedAt, Orders.__user, Orders.__cubeJoinField], metadata:{}", }, "headers": Headers { Symbol(map): Object { @@ -161,7 +161,7 @@ Object { "keep-alive", ], "content-length": Array [ - "1431", + "1589", ], "content-type": Array [ "application/json; charset=utf-8", @@ -557,6 +557,18 @@ Array [ ] `; +exports[`SQL API Postgres (Data) measure in view with ad-hoc filter: measure-in-view-with-ad-hoc-filters 1`] = ` +Array [ + Object { + "new_amount": 800, + "new_avg_amount": 400, + "new_count_distinct": "1", + "new_max_amount": 500, + "new_min_amount": 300, + }, +] +`; + exports[`SQL API Postgres (Data) measure with ad-hoc filter and original measure: measure-with-ad-hoc-filters-and-original-measure 1`] = ` Array [ Object { diff --git a/packages/cubejs-testing/test/smoke-cubesql.test.ts b/packages/cubejs-testing/test/smoke-cubesql.test.ts index 10142323b1caf..04e614cbc8101 100644 --- a/packages/cubejs-testing/test/smoke-cubesql.test.ts +++ b/packages/cubejs-testing/test/smoke-cubesql.test.ts @@ -980,6 +980,38 @@ filter_subq AS ( expect(res.rows).toMatchSnapshot('measure-with-ad-hoc-filters-and-original-measure'); }); + test('measure in view with ad-hoc filter', async () => { + const query = ` + SELECT + SUM(CASE + WHEN status = 'processed' THEN totalAmount + END) AS new_amount, + AVG(CASE + WHEN status = 'processed' THEN avgAmount + END) AS new_avg_amount, + MIN(CASE + WHEN status = 'processed' THEN minAmount + END) AS new_min_amount, + MAX(CASE + WHEN status = 'processed' THEN maxAmount + END) AS new_max_amount, + COUNT(DISTINCT CASE + WHEN status = 'shipped' THEN orderCount + END) AS new_count_distinct + + /* Works but testing Postgres does not include "hll_hash_any" function + APPROX_DISTINCT(CASE + WHEN status = 'shipped' THEN approxOrderCount + END) AS new_approx_distinct + */ + FROM + OrdersView + `; + + const res = await connection.query(query); + expect(res.rows).toMatchSnapshot('measure-in-view-with-ad-hoc-filters'); + }); + /// Query references `updatedAt` in three places: in outer projection, in grouping key and in window /// Incoming query is consistent: all three references same column /// This tests that generated SQL for pushdown remains consistent: diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/aggregate.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/aggregate.rs index bdf41b3985b1d..4c1337cb3654f 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/aggregate.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/aggregate.rs @@ -5,7 +5,7 @@ use crate::{ agg_fun_expr, agg_fun_expr_within_group_empty_tail, aggregate, alias_expr, analysis::ConstantFolding, binary_expr, case_expr, column_expr, cube_scan_wrapper, grouping_set_expr, - literal_null, original_expr_name, rewrite, + literal_bool, literal_null, original_expr_name, rewrite, rewriter::{CubeEGraph, CubeRewrite}, rules::{members::MemberRules, wrapper::WrapperRules}, subquery, transforming_chain_rewrite, transforming_rewrite, udaf_expr, wrapped_select, @@ -395,9 +395,71 @@ impl WrapperRules { ), self.transform_filtered_measure( "?aggr_expr", - "?literal", + Some("?literal"), "?measure_column", "?fun", + "?distinct", + "?cube_members", + "?replace_agg_type", + "?out_measure_alias", + ), + ), + transforming_chain_rewrite( + "wrapper-push-down-aggregation-over-filtered-measure-simplified", + wrapper_pushdown_replacer("?aggr_expr", "?context"), + vec![ + ( + "?aggr_expr", + agg_fun_expr( + "?fun", + vec![case_expr( + None, + vec![("?case_expr".to_string(), column_expr("?measure_column"))], + // TODO make `ELSE NULL` optional and/or add generic rewrite to normalize it + None, + )], + "?distinct", + agg_fun_expr_within_group_empty_tail(), + ), + ), + ( + "?context", + wrapper_replacer_context( + "?alias_to_cube", + "WrapperReplacerContextPushToCube:true", + "?in_projection", + "?cube_members", + "?grouped_subqueries", + "?ungrouped_scan", + "?input_data_source", + ), + ), + ], + alias_expr( + udaf_expr( + PATCH_MEASURE_UDAF_NAME, + vec![ + column_expr("?measure_column"), + "?replace_agg_type".to_string(), + wrapper_pushdown_replacer( + // = is a proper way to filter here: + // CASE NULL WHEN ... will return null + // So NULL in ?case_expr is equivalent to hitting ELSE branch + // TODO add "is not null" to cond? just to make is always boolean + binary_expr("?case_expr", "=", literal_bool(true)), + "?context", + ), + ], + "AggregateUDFExprDistinct:false", + ), + "?out_measure_alias", + ), + self.transform_filtered_measure( + "?aggr_expr", + None, + "?measure_column", + "?fun", + "?distinct", "?cube_members", "?replace_agg_type", "?out_measure_alias", @@ -1217,17 +1279,19 @@ impl WrapperRules { fn transform_filtered_measure( &self, aggr_expr_var: &'static str, - literal_var: &'static str, + literal_var: Option<&'static str>, column_var: &'static str, fun_name_var: &'static str, + distinct_var: &'static str, cube_members_var: &'static str, replace_agg_type_var: &'static str, out_measure_alias_var: &'static str, ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { let aggr_expr_var = var!(aggr_expr_var); - let literal_var = var!(literal_var); + let literal_var = literal_var.map(|v| var!(v)); let column_var = var!(column_var); let fun_name_var = var!(fun_name_var); + let distinct_var = var!(distinct_var); let cube_members_var = var!(cube_members_var); let replace_agg_type_var = var!(replace_agg_type_var); let out_measure_alias_var = var!(out_measure_alias_var); @@ -1236,14 +1300,16 @@ impl WrapperRules { let disable_strict_agg_type_match = self.config_obj.disable_strict_agg_type_match(); move |egraph, subst| { - match &egraph[subst[literal_var]].data.constant { - Some(ConstantFolding::Scalar(_)) => { - // Do nothing - } - _ => { - return false; + if let Some(literal_var) = literal_var { + match &egraph[subst[literal_var]].data.constant { + Some(ConstantFolding::Scalar(_)) => { + // Do nothing + } + _ => { + return false; + } } - } + }; let Some(alias) = original_expr_name(egraph, subst[aggr_expr_var]) else { return false; @@ -1253,80 +1319,89 @@ impl WrapperRules { .cloned() .collect::>() { - let call_agg_type = MemberRules::get_agg_type(Some(&fun), false); + for distinct in + var_iter!(egraph[subst[distinct_var]], AggregateFunctionExprDistinct) + .cloned() + .collect::>() + { + let call_agg_type = MemberRules::get_agg_type(Some(&fun), distinct); - let column_iter = var_iter!(egraph[subst[column_var]], ColumnExprColumn) - .cloned() - .collect::>(); + let column_iter = var_iter!(egraph[subst[column_var]], ColumnExprColumn) + .cloned() + .collect::>(); + + let Some(member_names_to_expr) = &mut egraph + .index_mut(subst[cube_members_var]) + .data + .member_name_to_expr + else { + continue; + }; - if let Some(member_names_to_expr) = &mut egraph - .index_mut(subst[cube_members_var]) - .data - .member_name_to_expr - { for column in column_iter { - if let Some((&(Some(ref member), _, _), _)) = + let Some((&(Some(ref member), _, _), _)) = LogicalPlanData::do_find_member_by_alias( member_names_to_expr, &column.name, ) - { - if let Some(measure) = meta.find_measure_with_name(member) { - if !measure.allow_add_filter(call_agg_type.as_deref()) { - continue; - } + else { + continue; + }; - let Some(call_agg_type) = &call_agg_type else { - // call_agg_type is None, rewrite as is - Self::insert_patch_measure( - egraph, - subst, - column, - None, - alias, - None, - Some(replace_agg_type_var), - out_measure_alias_var, - ); + let Some(measure) = meta.find_measure_with_name(member) else { + continue; + }; - return true; - }; + if !measure.allow_add_filter(call_agg_type.as_deref()) { + continue; + } - if measure - .is_same_agg_type(call_agg_type, disable_strict_agg_type_match) - { - Self::insert_patch_measure( - egraph, - subst, - column, - None, - alias, - None, - Some(replace_agg_type_var), - out_measure_alias_var, - ); + let Some(call_agg_type) = &call_agg_type else { + // call_agg_type is None, rewrite as is + Self::insert_patch_measure( + egraph, + subst, + column, + None, + alias, + None, + Some(replace_agg_type_var), + out_measure_alias_var, + ); - return true; - } + return true; + }; - if measure.allow_replace_agg_type( - call_agg_type, - disable_strict_agg_type_match, - ) { - Self::insert_patch_measure( - egraph, - subst, - column, - Some(call_agg_type.clone()), - alias, - None, - Some(replace_agg_type_var), - out_measure_alias_var, - ); + if measure.is_same_agg_type(call_agg_type, disable_strict_agg_type_match) { + Self::insert_patch_measure( + egraph, + subst, + column, + None, + alias, + None, + Some(replace_agg_type_var), + out_measure_alias_var, + ); - return true; - } - } + return true; + } + + if measure + .allow_replace_agg_type(call_agg_type, disable_strict_agg_type_match) + { + Self::insert_patch_measure( + egraph, + subst, + column, + Some(call_agg_type.clone()), + alias, + None, + Some(replace_agg_type_var), + out_measure_alias_var, + ); + + return true; } } } diff --git a/rust/cubesql/cubesql/src/transport/ext.rs b/rust/cubesql/cubesql/src/transport/ext.rs index 2f9a9609064e2..7502d6564dcc4 100644 --- a/rust/cubesql/cubesql/src/transport/ext.rs +++ b/rust/cubesql/cubesql/src/transport/ext.rs @@ -93,7 +93,9 @@ impl V1CubeMetaMeasureExt for CubeMetaMeasure { | "max" | "count" | "count_distinct" - | "count_distinct_approx" => true, + | "countDistinct" + | "count_distinct_approx" + | "countDistinctApprox" => true, _ => false, } }