Skip to content

Commit 6a2b9dd

Browse files
committed
fix: Recursive pre-aggregation description generation: support propagateFiltersToSubQuery with partitioned originalSql
1 parent 4a37216 commit 6a2b9dd

File tree

3 files changed

+52
-45
lines changed

3 files changed

+52
-45
lines changed

packages/cubejs-schema-compiler/adapter/BaseQuery.js

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1533,6 +1533,11 @@ class BaseQuery {
15331533
};
15341534
}
15351535
}
1536+
1537+
return this.refreshKeysByCubes(this.allCubeNames, transformFn);
1538+
}
1539+
1540+
refreshKeysByCubes(cubes, transformFn) {
15361541
let refreshKeyAllSetManually = true;
15371542
const refreshKeyQueryByCube = cube => {
15381543
const cubeFromPath = this.cubeEvaluator.cubeFromPath(cube);
@@ -1562,15 +1567,14 @@ class BaseQuery {
15621567
{ preAggregationQuery: true }
15631568
);
15641569
};
1565-
const cubeNames = this.allCubeNames;
1566-
const queries = cubeNames
1570+
const queries = cubes
15671571
.map(cube => [cube, refreshKeyQueryByCube(cube)])
15681572
.map(([cube, sql]) => (transformFn ? transformFn(sql, cube) : sql))
15691573
.map(paramAnnotatedSql => this.paramAllocator.buildSqlAndParams(paramAnnotatedSql));
15701574
return {
15711575
queries,
15721576
renewalThreshold: this.renewalThreshold(refreshKeyAllSetManually),
1573-
refreshKeyRenewalThresholds: cubeNames.map(c => {
1577+
refreshKeyRenewalThresholds: cubes.map(c => {
15741578
const cubeFromPath = this.cubeEvaluator.cubeFromPath(c);
15751579
if (cubeFromPath.refreshKey && cubeFromPath.refreshKey.every) {
15761580
return this.refreshKeyRenewalThresholdForInterval(cubeFromPath.refreshKey.every);
@@ -1760,7 +1764,7 @@ class BaseQuery {
17601764
if (!preAggregation.partitionGranularity) {
17611765
throw new UserError(`Incremental refresh key can only be used for partitioned pre-aggregations`);
17621766
}
1763-
// TOOD Case when partitioned originalSql is resolved for query without time dimension.
1767+
// TODO Case when partitioned originalSql is resolved for query without time dimension.
17641768
// Consider fallback to not using such originalSql for consistency?
17651769
if (preAggregationQueryForSql.timeDimensions.length) {
17661770
refreshKey = this.incrementalRefreshKey(
@@ -1777,6 +1781,12 @@ class BaseQuery {
17771781
};
17781782
}
17791783
}
1784+
if (preAggregation.type === 'originalSql') {
1785+
return this.evaluateSymbolSqlWithContext(
1786+
() => this.refreshKeysByCubes([cube]),
1787+
{ preAggregationQuery: true }
1788+
);
1789+
}
17801790
if (
17811791
preAggregation.partitionGranularity &&
17821792
!preAggregationQueryForSql.allCubeNames.find(c => {

packages/cubejs-schema-compiler/adapter/PreAggregations.js

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,29 @@ class PreAggregations {
1919
}
2020

2121
preAggregationsDescriptionLocal() {
22-
const preAggregationForQuery = this.findPreAggregationForQuery();
23-
if (preAggregationForQuery) {
24-
if (preAggregationForQuery.preAggregation.useOriginalSqlPreAggregations) {
25-
const { preAggregations, result } =
26-
this.collectOriginalSqlPreAggregations(
27-
() => this.preAggregationDescriptionsFor(preAggregationForQuery.cube, preAggregationForQuery)
28-
);
29-
30-
const queryForEval = this.query.preAggregationQueryForSqlEvaluation(
31-
preAggregationForQuery.cube,
32-
preAggregationForQuery.preAggregation
33-
);
34-
35-
// TODO consider recursive pre-aggregation descriptions instead of duplication of sub query logic
36-
return R.unnest(preAggregations.map(
37-
p => this.preAggregationDescriptionsFor(p.cube, p).concat(
38-
R.unnest(
39-
queryForEval.subQueryDimensions.map(d => queryForEval.subQueryDescription(d).subQuery)
40-
.map(q => q.preAggregations.preAggregationDescriptionsFor(p.cube, p))
41-
)
42-
)
43-
)).concat(result);
22+
const isInPreAggregationQuery = this.query.options.preAggregationQuery;
23+
if (!isInPreAggregationQuery) {
24+
const preAggregationForQuery = this.findPreAggregationForQuery();
25+
if (preAggregationForQuery) {
26+
return this.preAggregationDescriptionsFor(preAggregationForQuery.cube, preAggregationForQuery);
4427
}
45-
return this.preAggregationDescriptionsFor(preAggregationForQuery.cube, preAggregationForQuery);
4628
}
47-
return R.pipe(
48-
R.map(cube => {
49-
const foundPreAggregation = this.findPreAggregationToUseForCube(cube);
50-
if (foundPreAggregation) {
51-
return this.preAggregationDescriptionsFor(cube, foundPreAggregation);
52-
}
53-
return null;
54-
}),
55-
R.filter(R.identity),
56-
R.unnest
57-
)(this.preAggregationCubes());
29+
if (
30+
!isInPreAggregationQuery ||
31+
isInPreAggregationQuery && this.query.options.useOriginalSqlPreAggregationsInPreAggregation) {
32+
return R.pipe(
33+
R.map(cube => {
34+
const foundPreAggregation = this.findPreAggregationToUseForCube(cube);
35+
if (foundPreAggregation) {
36+
return this.preAggregationDescriptionsFor(cube, foundPreAggregation);
37+
}
38+
return null;
39+
}),
40+
R.filter(R.identity),
41+
R.unnest
42+
)(this.preAggregationCubes());
43+
}
44+
return [];
5845
}
5946

6047
preAggregationCubes() {
@@ -65,13 +52,13 @@ class PreAggregations {
6552
preAggregationDescriptionsFor(cube, foundPreAggregation) {
6653
if (this.canPartitionsBeUsed(foundPreAggregation)) {
6754
const { dimension, partitionDimension } = this.partitionDimension(foundPreAggregation);
68-
return partitionDimension.timeSeries().map(
69-
range => this.preAggregationDescriptionFor(
55+
return R.unnest(partitionDimension.timeSeries().map(
56+
range => this.preAggregationDescriptionsForRecursive(
7057
cube, this.addPartitionRangeTo(foundPreAggregation, dimension, range)
7158
)
72-
);
59+
));
7360
}
74-
return [this.preAggregationDescriptionFor(cube, foundPreAggregation)];
61+
return this.preAggregationDescriptionsForRecursive(cube, foundPreAggregation);
7562
}
7663

7764
canPartitionsBeUsed(foundPreAggregation) {
@@ -103,6 +90,12 @@ class PreAggregations {
10390
return { dimension, partitionDimension };
10491
}
10592

93+
preAggregationDescriptionsForRecursive(cube, foundPreAggregation) {
94+
const query = this.query.preAggregationQueryForSqlEvaluation(cube, foundPreAggregation.preAggregation);
95+
const descriptions = query !== this.query ? query.preAggregations.preAggregationsDescription() : [];
96+
return descriptions.concat(this.preAggregationDescriptionFor(cube, foundPreAggregation));
97+
}
98+
10699
preAggregationDescriptionFor(cube, foundPreAggregation) {
107100
const { preAggregationName, preAggregation } = foundPreAggregation;
108101
const tableName = this.preAggregationTableName(cube, preAggregationName, preAggregation);

packages/cubejs-schema-compiler/test/PreAggregationsTest.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ describe('PreAggregations', function test() {
7373
checkinsCount: {
7474
type: 'number',
7575
sql: \`\${visitor_checkins.count}\`,
76-
subQuery: true
76+
subQuery: true,
77+
propagateFiltersToSubQuery: true
7778
}
7879
},
7980
@@ -248,8 +249,11 @@ describe('PreAggregations', function test() {
248249
console.log(toReplace);
249250
preAggregation = Array.isArray(preAggregation) ? preAggregation : [preAggregation];
250251
return [
251-
preAggregation.reduce((replacedQuery, desc) =>
252-
replacedQuery.replace(new RegExp(desc.tableName, 'g'), desc.tableName + '_' + suffix), toReplace
252+
preAggregation.reduce(
253+
(replacedQuery, desc) => replacedQuery
254+
.replace(new RegExp(desc.tableName, 'g'), desc.tableName + '_' + suffix)
255+
.replace(/CREATE INDEX (?!i_)/, `CREATE INDEX i_${suffix}_`),
256+
toReplace
253257
),
254258
params
255259
];

0 commit comments

Comments
 (0)