Skip to content

Commit c87b525

Browse files
committed
feat: Scheduled refresh for pre-aggregations
1 parent 452086d commit c87b525

File tree

14 files changed

+238
-39
lines changed

14 files changed

+238
-39
lines changed

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,18 @@ class PreAggregationLoadCache {
109109
return this.versionEntries;
110110
}
111111

112-
async keyQueryResult(keyQuery) {
112+
async keyQueryResult(keyQuery, waitForRenew) {
113113
if (!this.queryResults[this.queryCache.queryRedisKey(keyQuery)]) {
114114
this.queryResults[this.queryCache.queryRedisKey(keyQuery)] = await this.queryCache.cacheQueryResult(
115115
Array.isArray(keyQuery) ? keyQuery[0] : keyQuery,
116116
Array.isArray(keyQuery) ? keyQuery[1] : [],
117117
keyQuery,
118118
60 * 60,
119-
{ renewalThreshold: 5 * 60, renewalKey: keyQuery }
119+
{
120+
renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold || 2 * 60,
121+
renewalKey: keyQuery,
122+
waitForRenew
123+
}
120124
);
121125
}
122126
return this.queryResults[this.queryCache.queryRedisKey(keyQuery)];
@@ -265,15 +269,15 @@ class PreAggregationLoader {
265269
preAggregation: this.preAggregation,
266270
requestId: this.requestId
267271
});
268-
await this.executeInQueue(invalidationKeys, 10, newVersionEntry);
272+
await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry);
269273
return mostRecentTargetTableName();
270274
} else if (versionEntry.content_version !== newVersionEntry.content_version) {
271275
if (this.waitForRenew) {
272276
this.logger('Waiting for pre-aggregation renew', {
273277
preAggregation: this.preAggregation,
274278
requestId: this.requestId
275279
});
276-
await this.executeInQueue(invalidationKeys, 0, newVersionEntry);
280+
await this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry);
277281
return mostRecentTargetTableName();
278282
} else {
279283
if (
@@ -291,16 +295,20 @@ class PreAggregationLoader {
291295
preAggregation: this.preAggregation,
292296
requestId: this.requestId
293297
});
294-
await this.executeInQueue(invalidationKeys, 10, newVersionEntry);
298+
await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry);
295299
return mostRecentTargetTableName();
296300
}
297301
return this.targetTableName(versionEntry);
298302
}
299303

304+
priority(defaultValue) {
305+
return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue;
306+
}
307+
300308
getInvalidationKeyValues() {
301309
return Promise.all(
302310
(this.preAggregation.invalidateKeyQueries || [])
303-
.map(keyQuery => this.loadCache.keyQueryResult(keyQuery))
311+
.map(keyQuery => this.loadCache.keyQueryResult(keyQuery, this.waitForRenew))
304312
);
305313
}
306314

@@ -309,7 +317,7 @@ class PreAggregationLoader {
309317
preAggregation: this.preAggregation,
310318
requestId: this.requestId
311319
});
312-
this.executeInQueue(invalidationKeys, 0, newVersionEntry)
320+
this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry)
313321
.then(() => {
314322
delete this.preAggregations.refreshErrors[newVersionEntry.table_name];
315323
})

packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,18 @@ class QueryOrchestrator {
3535
async fetchQuery(queryBody) {
3636
return this.preAggregations.loadAllPreAggregationsIfNeeded(queryBody)
3737
.then(async preAggregationsTablesToTempTables => {
38+
const usedPreAggregations = R.fromPairs(preAggregationsTablesToTempTables);
39+
if (!queryBody.query) {
40+
return {
41+
usedPreAggregations
42+
};
43+
}
3844
const result = await this.queryCache.cachedQueryResult(
3945
queryBody, preAggregationsTablesToTempTables
4046
);
4147
return {
4248
...result,
43-
usedPreAggregations: R.fromPairs(preAggregationsTablesToTempTables)
49+
usedPreAggregations
4450
};
4551
});
4652
}

packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ class QueryQueue {
3737
if (priority == null) {
3838
priority = 0;
3939
}
40-
if (!(priority >= 0 && priority <= 100)) {
41-
throw new Error('Priority should be between 0 and 100');
40+
if (!(priority >= -10000 && priority <= 10000)) {
41+
throw new Error('Priority should be between -10000 and 10000');
4242
}
4343
let result = await redisClient.getResult(queryKey);
4444
if (result) {
4545
return this.parseResult(result);
4646
}
4747
const time = new Date().getTime();
48-
const keyScore = time + (100 - priority) * 1E14;
48+
const keyScore = time + (10000 - priority) * 1E14;
4949

5050
// eslint-disable-next-line no-unused-vars
5151
const [added, b, c, queueSize] = await redisClient.addToQueue(

packages/cubejs-query-orchestrator/test/QueryQueueTest.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,19 @@ const QueryQueueTest = (name, options) => {
8888
should(await queue.getQueryStage('12')).be.eql(undefined);
8989
});
9090

91+
it('negative priority', async () => {
92+
delayCount = 0;
93+
const results = [];
94+
await Promise.all([
95+
queue.executeInQueue('delay', '31', { delay: 100, result: '4' }, -10).then(r => results.push(r)),
96+
queue.executeInQueue('delay', '32', { delay: 100, result: '3' }, -9).then(r => results.push(r)),
97+
queue.executeInQueue('delay', '33', { delay: 100, result: '2' }, -8).then(r => results.push(r)),
98+
queue.executeInQueue('delay', '34', { delay: 100, result: '1' }, -7).then(r => results.push(r))
99+
]);
100+
101+
should(results).be.eql(['10', '21', '32', '43']);
102+
});
103+
91104
it('orphaned', async () => {
92105
for (let i = 1; i <= 4; i++) {
93106
await queue.executeInQueue('delay', `11` + i, { delay: 50, result: `${i}` }, 0);

packages/cubejs-schema-compiler/adapter/BaseQuery.js

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,17 @@ class BaseQuery {
100100
}
101101

102102
get dataSource() {
103-
const dataSources = R.uniq(this.allCubeNames.map(c => this.cubeEvaluator.cubeFromPath(c).dataSource || 'default'));
103+
const dataSources = R.uniq(this.allCubeNames.map(c => this.cubeDataSource(c)));
104104
if (dataSources.length > 1) {
105105
throw new UserError(`Joins across data sources aren't supported in community edition. Found data sources: ${dataSources.join(', ')}`);
106106
}
107107
return dataSources[0];
108108
}
109109

110+
cubeDataSource(cube) {
111+
return this.cubeEvaluator.cubeFromPath(cube).dataSource || 'default';
112+
}
113+
110114
get aliasNameToMember() {
111115
return R.fromPairs(
112116
this.measures.map(m => [m.unescapedAliasName(), m.measure]).concat(
@@ -1335,13 +1339,9 @@ class BaseQuery {
13351339
if (timeDimensions.length) {
13361340
const dimension = timeDimensions.filter(f => f.toLowerCase().indexOf('update') !== -1)[0] || timeDimensions[0];
13371341
const foundMainTimeDimension = this.newTimeDimension({ dimension });
1338-
const cubeNamesForTimeDimension = this.collectFrom(
1339-
[foundMainTimeDimension],
1340-
this.collectCubeNamesFor.bind(this)
1341-
);
1342-
if (cubeNamesForTimeDimension.length === 1 && cubeNamesForTimeDimension[0] === cube) {
1343-
const dimensionSql = this.dimensionSql(foundMainTimeDimension);
1344-
return `select max(${dimensionSql}) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`;
1342+
const aggSelect = this.aggSelectForDimension(cube, foundMainTimeDimension, 'max');
1343+
if (aggSelect) {
1344+
return aggSelect;
13451345
}
13461346
}
13471347
return `select count(*) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`;
@@ -1352,6 +1352,18 @@ class BaseQuery {
13521352
};
13531353
}
13541354

1355+
aggSelectForDimension(cube, dimension, aggFunction) {
1356+
const cubeNamesForTimeDimension = this.collectFrom(
1357+
[dimension],
1358+
this.collectCubeNamesFor.bind(this)
1359+
);
1360+
if (cubeNamesForTimeDimension.length === 1 && cubeNamesForTimeDimension[0] === cube) {
1361+
const dimensionSql = this.dimensionSql(dimension);
1362+
return `select ${aggFunction}(${dimensionSql}) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`;
1363+
}
1364+
return null;
1365+
}
1366+
13551367
cubeCardinalityQueries() { // TODO collect sub queries
13561368
return R.fromPairs(this.collectCubeNames()
13571369
.map(cube => [
@@ -1420,6 +1432,16 @@ class BaseQuery {
14201432
)];
14211433
}
14221434

1435+
preAggregationStartEndQueries(cube, preAggregation) {
1436+
const references = this.cubeEvaluator.evaluatePreAggregationReferences(cube, preAggregation);
1437+
const timeDimension = this.newTimeDimension(references.timeDimensions[0]);
1438+
1439+
return this.evaluateSymbolSqlWithContext(() => [
1440+
this.paramAllocator.buildSqlAndParams(this.aggSelectForDimension(cube, timeDimension, 'min')),
1441+
this.paramAllocator.buildSqlAndParams(this.aggSelectForDimension(cube, timeDimension, 'max'))
1442+
], { preAggregationQuery: true });
1443+
}
1444+
14231445
parametrizedContextSymbols() {
14241446
return Object.assign({
14251447
filterParams: this.filtersProxy(),

packages/cubejs-schema-compiler/adapter/PreAggregations.js

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -422,23 +422,7 @@ class PreAggregations {
422422
}
423423

424424
evaluateAllReferences(cube, aggregation) {
425-
const timeDimensions = aggregation.timeDimensionReference ? [{
426-
dimension: this.evaluateReferences(cube, aggregation.timeDimensionReference),
427-
granularity: this.castGranularity(aggregation.granularity)
428-
}] : [];
429-
return {
430-
dimensions:
431-
(aggregation.dimensionReferences && this.evaluateReferences(cube, aggregation.dimensionReferences) || []).concat(
432-
aggregation.segmentReferences && this.evaluateReferences(cube, aggregation.segmentReferences) || []
433-
),
434-
measures:
435-
aggregation.measureReferences && this.evaluateReferences(cube, aggregation.measureReferences) || [],
436-
timeDimensions
437-
};
438-
}
439-
440-
evaluateReferences(cube, referencesFn) {
441-
return this.query.cubeEvaluator.evaluateReferences(cube, referencesFn);
425+
return this.query.cubeEvaluator.evaluatePreAggregationReferences(cube, aggregation);
442426
}
443427

444428
rollupPreAggregation(preAggregationForQuery) {

packages/cubejs-schema-compiler/compiler/CubeEvaluator.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,22 @@ class CubeEvaluator extends CubeSymbols {
4646
return this.cubeFromPath(path).preAggregations || {};
4747
}
4848

49+
scheduledPreAggregations() {
50+
return Object.keys(this.evaluatedCubes).map(cube => {
51+
const preAggregations = this.preAggregationsForCube(cube);
52+
return Object.keys(preAggregations)
53+
.filter(name => preAggregations[name].scheduledRefresh)
54+
.map(preAggregationName => ({
55+
preAggregationName,
56+
preAggregation: preAggregations[preAggregationName],
57+
cube,
58+
references: preAggregations[preAggregationName].type === 'rollup' ?
59+
this.evaluatePreAggregationReferences(cube, preAggregations[preAggregationName]) :
60+
null
61+
}));
62+
}).reduce((a, b) => a.concat(b), []);
63+
}
64+
4965
isMeasure(measurePath) {
5066
return this.isInstanceOfType('measures', measurePath);
5167
}
@@ -147,6 +163,23 @@ class CubeEvaluator extends CubeSymbols {
147163
const references = arrayOrSingle.map(p => p.toString());
148164
return options.originalSorting ? references : R.sortBy(R.identity, references);
149165
}
166+
167+
evaluatePreAggregationReferences(cube, aggregation) {
168+
const timeDimensions = aggregation.timeDimensionReference ? [{
169+
dimension: this.evaluateReferences(cube, aggregation.timeDimensionReference),
170+
granularity: aggregation.granularity
171+
}] : [];
172+
return {
173+
dimensions:
174+
(aggregation.dimensionReferences && this.evaluateReferences(cube, aggregation.dimensionReferences) || [])
175+
.concat(
176+
aggregation.segmentReferences && this.evaluateReferences(cube, aggregation.segmentReferences) || []
177+
),
178+
measures:
179+
aggregation.measureReferences && this.evaluateReferences(cube, aggregation.measureReferences) || [],
180+
timeDimensions
181+
};
182+
}
150183
}
151184

152185
module.exports = CubeEvaluator;

packages/cubejs-schema-compiler/compiler/CubeValidator.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ const BasePreAggregation = {
6565
}),
6666
useOriginalSqlPreAggregations: Joi.boolean(),
6767
external: Joi.boolean(),
68-
partitionGranularity: Joi.any().valid('day', 'week', 'month', 'year')
68+
partitionGranularity: Joi.any().valid('day', 'week', 'month', 'year'),
69+
scheduledRefresh: Joi.boolean()
6970
};
7071

7172
const cubeSchema = Joi.object().keys({

packages/cubejs-server-core/core/CompilerApi.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ class CompilerApi {
7070
}));
7171
}
7272

73+
async scheduledPreAggregations() {
74+
const { cubeEvaluator } = await this.getCompilers();
75+
return cubeEvaluator.scheduledPreAggregations();
76+
}
77+
7378
createQuery(compilers, dbType, query) {
7479
return QueryBuilder.query(
7580
compilers,

packages/cubejs-server-core/core/OrchestratorApi.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class OrchestratorApi {
1414
}
1515

1616
async executeQuery(query) {
17-
const queryForLog = query.query.replace(/\s+/g, ' ');
17+
const queryForLog = query.query && query.query.replace(/\s+/g, ' ');
1818
const startQueryTime = (new Date()).getTime();
1919

2020
try {

0 commit comments

Comments
 (0)