Skip to content

Commit cc41f97

Browse files
authored
feat: Rollup mode (#843) Thanks to @jcw-!
* Add option for rollup only mode * Add option for externalRefresh mode * Fix lint warnings and errors in query-orchestrator * Add additional config validation * Start a unit test file for PreAggregations * Update docs * Empty commit to trigger a CI rerun
1 parent 79d7bfd commit cc41f97

File tree

8 files changed

+195
-15
lines changed

8 files changed

+195
-15
lines changed

docs/Cube.js-Backend/@cubejs-backend-server-core.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ CubejsServerCore.create({
351351
});
352352
```
353353

354-
Best practice is to run `scheduledRefreshTimer` in a separate worker Cube.js instance.
354+
Best practice is to run `scheduledRefreshTimer` in a separate worker Cube.js instance.
355355
For serverless deployments [REST API](rest-api#api-reference-v-1-run-scheduled-refresh) should be used instead of timer.
356356

357357
### extendContext
@@ -397,10 +397,14 @@ _Please note that this is advanced configuration._
397397
| Option | Description | Default Value |
398398
| ------ | ----------- | ------------- |
399399
| redisPrefix | Prefix to be set an all Redis keys | `STANDALONE` |
400+
| rollupOnlyMode | When enabled, an error will be thrown if a query can't be served from a pre-aggregation (rollup) | `false`
400401
| queryCacheOptions | Query cache options for DB queries | `{}`
401402
| queryCacheOptions.refreshKeyRenewalThreshold | Time in seconds to cache the result of [refreshKey](cube#parameters-refresh-key) check | `defined by DB dialect`
402403
| queryCacheOptions.backgroundRenew | Controls whether to wait in foreground for refreshed query data if `refreshKey` value has been changed. Refresh key queries or pre-aggregations are never awaited in foreground and always processed in background unless cache is empty. If `true` it immediately returns values from cache if available without [refreshKey](cube#parameters-refresh-key) check to renew in foreground. Default value before 0.15.0 was `true` | `false`
404+
| queryCacheOptions.queueOptions | Query queue options for DB queries | `{}`
403405
| preAggregationsOptions | Query cache options for pre-aggregations | `{}`
406+
| preAggregationsOptions.queueOptions | Query queue options for pre-aggregations | `{}`
407+
| preAggregationsOptions.externalRefresh | When running a separate instance of Cube.js to refresh pre-aggregations in the background, this option can be set on the API instance to prevent it from trying to check for rollup data being current - it won't try to create or refresh them when this option is `true` | `false`
404408

405409
To set options for `queryCache` and `preAggregations`, set an object with key queueOptions. `queryCacheOptions` are used while querying database tables, while `preAggregationsOptions` settings are used to query pre-aggregated tables.
406410

packages/cubejs-query-orchestrator/orchestrator/BaseQueueDriver.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ class BaseQueueDriver {
44
redisHash(queryKey) {
55
return typeof queryKey === 'string' && queryKey.length < 256 ?
66
queryKey :
7-
crypto.createHash('md5').update(JSON.stringify(queryKey)).digest("hex");
7+
crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
88
}
99
}
1010

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class PreAggregationLoadCache {
7878

7979
async fetchTables(preAggregation) {
8080
if (preAggregation.external && !this.externalDriverFactory) {
81-
throw new Error(`externalDriverFactory should be set in order to use external pre-aggregations`);
81+
throw new Error('externalDriverFactory should be set in order to use external pre-aggregations');
8282
}
8383
const client = preAggregation.external ?
8484
await this.externalDriverFactory() :
@@ -180,6 +180,10 @@ class PreAggregationLoader {
180180
this.externalDriverFactory = preAggregations.externalDriverFactory;
181181
this.requestId = options.requestId;
182182
this.structureVersionPersistTime = preAggregations.structureVersionPersistTime;
183+
this.externalRefresh = options.externalRefresh;
184+
if (this.externalRefresh && this.waitForRenew) {
185+
throw new Error("Invalid configuration - when externalRefresh is true, it will not perform a renew, therefore you cannot wait for it using waitForRenew.");
186+
}
183187
}
184188

185189
async loadPreAggregation() {
@@ -199,7 +203,19 @@ class PreAggregationLoader {
199203
const versionEntryByStructureVersion = versionEntries.find(
200204
v => v.table_name === this.preAggregation.tableName && v.structure_version === structureVersion
201205
);
206+
if (this.externalRefresh) {
207+
if (!versionEntryByStructureVersion) {
208+
throw new Error('One or more pre-aggregation tables could not be found to satisfy that query');
209+
}
210+
211+
// the rollups are being maintained independently of this instance of cube.js,
212+
// immediately return the latest data it already has
213+
return this.targetTableName(versionEntryByStructureVersion);
214+
}
215+
202216
if (versionEntryByStructureVersion) {
217+
// this triggers an asyncronous/background load of the pre-aggregation but immediately
218+
// returns the latest data it already has
203219
this.loadPreAggregationWithKeys().catch(e => {
204220
if (!(e instanceof ContinueWaitError)) {
205221
this.logger('Error loading pre-aggregation', {
@@ -211,9 +227,12 @@ class PreAggregationLoader {
211227
});
212228
return this.targetTableName(versionEntryByStructureVersion);
213229
} else {
230+
// no rollup has been built yet - build it syncronously as part of responding to this request
214231
return this.loadPreAggregationWithKeys();
215232
}
216233
} else {
234+
// either we have no data cached for this rollup or waitForRenew is true, either way,
235+
// syncronously renew what data is needed so that the most current data will be returned for the current request
217236
return {
218237
targetTableName: await this.loadPreAggregationWithKeys(),
219238
refreshKeyValues: await this.getInvalidationKeyValues()
@@ -477,7 +496,7 @@ class PreAggregationLoader {
477496
const [sql, params] =
478497
Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []];
479498
if (!client.downloadQueryResults) {
480-
throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadQueryResults()`);
499+
throw new Error('Can\'t load external pre-aggregation: source driver doesn\'t support downloadQueryResults()');
481500
}
482501

483502
this.logExecutingSql(invalidationKeys, sql, params, this.targetTableName(newVersionEntry), newVersionEntry);
@@ -496,7 +515,7 @@ class PreAggregationLoader {
496515

497516
async downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn) {
498517
if (!client.downloadTable) {
499-
throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadTable()`);
518+
throw new Error('Can\'t load external pre-aggregation: source driver doesn\'t support downloadTable()');
500519
}
501520
const table = this.targetTableName(newVersionEntry);
502521
this.logger('Downloading external pre-aggregation', {
@@ -512,7 +531,7 @@ class PreAggregationLoader {
512531
const table = this.targetTableName(newVersionEntry);
513532
const externalDriver = await this.externalDriverFactory();
514533
if (!externalDriver.uploadTable) {
515-
throw new Error(`Can't load external pre-aggregation: destination driver doesn't support uploadTable()`);
534+
throw new Error('Can\'t load external pre-aggregation: destination driver doesn\'t support uploadTable()');
516535
}
517536
this.logger('Uploading external pre-aggregation', {
518537
preAggregation: this.preAggregation,
@@ -599,6 +618,7 @@ class PreAggregations {
599618
this.externalDriverFactory = options.externalDriverFactory;
600619
this.structureVersionPersistTime = options.structureVersionPersistTime || 60 * 60 * 24 * 30;
601620
this.usedTablePersistTime = options.usedTablePersistTime || 600;
621+
this.externalRefresh = options.externalRefresh;
602622
}
603623

604624
tablesUsedRedisKey(tableName) {
@@ -629,7 +649,7 @@ class PreAggregations {
629649
p,
630650
preAggregationsTablesToTempTables,
631651
loadCache,
632-
{ waitForRenew: queryBody.renewQuery, requestId: queryBody.requestId }
652+
{ waitForRenew: queryBody.renewQuery, requestId: queryBody.requestId, externalRefresh: this.externalRefresh }
633653
);
634654
const preAggregationPromise = () => loader.loadPreAggregation().then(async targetTableName => {
635655
const usedPreAggregation = typeof targetTableName === 'string' ? { targetTableName } : targetTableName;
@@ -655,7 +675,7 @@ class PreAggregations {
655675
preAggregation,
656676
preAggregationsTablesToTempTables,
657677
new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this, { requestId }),
658-
{ requestId }
678+
{ requestId, externalRefresh: this.externalRefresh }
659679
);
660680
return loader.refresh(newVersionEntry, invalidationKeys)(client);
661681
}, {

packages/cubejs-query-orchestrator/orchestrator/QueryCache.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ class QueryCache {
111111
}
112112

113113
static replaceAll(replaceThis, withThis, inThis) {
114-
withThis = withThis.replace(/\$/g, "$$$$");
114+
withThis = withThis.replace(/\$/g, '$$$$');
115115
return inThis.replace(
116-
new RegExp(replaceThis.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, "\\$&"), "g"),
116+
new RegExp(replaceThis.replace(/([/,!\\^${}[\]().*+?|<>\-&])/g, '\\$&'), 'g'),
117117
withThis
118118
);
119119
}
@@ -377,7 +377,7 @@ class QueryCache {
377377
}
378378

379379
queryRedisKey(cacheKey) {
380-
return `SQL_QUERY_RESULT_${this.redisPrefix}_${crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest("hex")}`;
380+
return `SQL_QUERY_RESULT_${this.redisPrefix}_${crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest('hex')}`;
381381
}
382382
}
383383

packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class QueryOrchestrator {
1414
process.env.NODE_ENV === 'production' || process.env.REDIS_URL ? 'redis' : 'memory'
1515
);
1616
if (cacheAndQueueDriver !== 'redis' && cacheAndQueueDriver !== 'memory') {
17-
throw new Error(`Only 'redis' or 'memory' are supported for cacheAndQueueDriver option`);
17+
throw new Error('Only \'redis\' or \'memory\' are supported for cacheAndQueueDriver option');
1818
}
1919
const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool() : undefined;
2020

@@ -35,12 +35,16 @@ class QueryOrchestrator {
3535
...options.preAggregationsOptions
3636
}
3737
);
38+
this.rollupOnlyMode = options.rollupOnlyMode;
3839
}
3940

4041
async fetchQuery(queryBody) {
4142
return this.preAggregations.loadAllPreAggregationsIfNeeded(queryBody)
4243
.then(async preAggregationsTablesToTempTables => {
4344
const usedPreAggregations = R.fromPairs(preAggregationsTablesToTempTables);
45+
if (this.rollupOnlyMode && Object.keys(usedPreAggregations).length === 0) {
46+
throw new Error('No pre-aggregation exists for that query');
47+
}
4448
if (!queryBody.query) {
4549
return {
4650
usedPreAggregations

packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ class QueryQueue {
247247
try {
248248
return redisClient.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId);
249249
} catch (e) {
250-
this.logger(`Error while query update`, {
250+
this.logger('Error while query update', {
251251
queryKey: query.queryKey,
252252
error: e.stack || e,
253253
queuePrefix: this.redisQueuePrefix,
@@ -301,7 +301,7 @@ class QueryQueue {
301301
if (!(await redisClient.setResultAndRemoveQuery(queryKey, executionResult, processingId))) {
302302
this.logger('Orphaned execution result', {
303303
processingId,
304-
warn: `Result for query was not set due to processing lock wasn't acquired`,
304+
warn: 'Result for query was not set due to processing lock wasn\'t acquired',
305305
queryKey: query.queryKey,
306306
queuePrefix: this.redisQueuePrefix,
307307
requestId: query.requestId
@@ -342,7 +342,7 @@ class QueryQueue {
342342
}
343343
await this.cancelHandlers[queryHandler](query);
344344
} catch (e) {
345-
this.logger(`Error while cancel`, {
345+
this.logger('Error while cancel', {
346346
queryKey: query.queryKey,
347347
error: e.stack || e,
348348
queuePrefix: this.redisQueuePrefix,
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/* globals describe, beforeAll, afterAll, beforeEach, test, expect */
2+
const crypto = require('crypto');
3+
4+
class MockDriver {
5+
constructor() {
6+
this.tables = [];
7+
this.executedQueries = [];
8+
this.cancelledQueries = [];
9+
}
10+
11+
query(query) {
12+
this.executedQueries.push(query);
13+
let promise = Promise.resolve([query]);
14+
if (query.match(`orders_too_big`)) {
15+
promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 3000)));
16+
}
17+
promise.cancel = async () => {
18+
this.cancelledQueries.push(query);
19+
};
20+
return promise;
21+
}
22+
23+
async getTablesQuery(schema) {
24+
return this.tables.map(t => ({ table_name: t.replace(`${schema}.`, '') }));
25+
}
26+
27+
async createSchemaIfNotExists(schema) {
28+
this.schema = schema;
29+
return null;
30+
}
31+
32+
loadPreAggregationIntoTable(preAggregationTableName, loadSql) {
33+
this.tables.push(preAggregationTableName.substring(0, 100));
34+
return this.query(loadSql);
35+
}
36+
37+
async dropTable(tableName) {
38+
this.tables = this.tables.filter(t => t !== tableName);
39+
return this.query(`DROP TABLE ${tableName}`);
40+
}
41+
}
42+
43+
describe('PreAggregations', () => {
44+
let mockDriver = null;
45+
let queryCache = null;
46+
const basicQuery = {
47+
query: "SELECT \"orders__created_at_week\" \"orders__created_at_week\", sum(\"orders__count\") \"orders__count\" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20191101) as partition_union WHERE (\"orders__created_at_week\" >= ($1::timestamptz::timestamptz AT TIME ZONE 'UTC') AND \"orders__created_at_week\" <= ($2::timestamptz::timestamptz AT TIME ZONE 'UTC')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000",
48+
values: ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"],
49+
cacheKeyQueries: {
50+
renewalThreshold: 21600,
51+
queries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
52+
},
53+
preAggregations: [{
54+
preAggregationsSchema: "stb_pre_aggregations",
55+
tableName: "stb_pre_aggregations.orders_number_and_count20191101",
56+
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20191101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"]],
57+
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
58+
}],
59+
requestId: 'basic'
60+
};
61+
const basicQueryWithRenew = {
62+
query: "SELECT \"orders__created_at_week\" \"orders__created_at_week\", sum(\"orders__count\") \"orders__count\" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20191101) as partition_union WHERE (\"orders__created_at_week\" >= ($1::timestamptz::timestamptz AT TIME ZONE 'UTC') AND \"orders__created_at_week\" <= ($2::timestamptz::timestamptz AT TIME ZONE 'UTC')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000",
63+
values: ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"],
64+
cacheKeyQueries: {
65+
renewalThreshold: 21600,
66+
queries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
67+
},
68+
preAggregations: [{
69+
preAggregationsSchema: "stb_pre_aggregations",
70+
tableName: "stb_pre_aggregations.orders_number_and_count20191101",
71+
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20191101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"]],
72+
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
73+
}],
74+
renewQuery: true,
75+
requestId: 'basic'
76+
};
77+
78+
beforeEach(() => {
79+
mockDriver = new MockDriver();
80+
81+
jest.resetModules();
82+
const QueryCache = require('../orchestrator/QueryCache');
83+
queryCache = new QueryCache(
84+
"TEST",
85+
async () => mockDriver,
86+
(msg, params) => {},
87+
{
88+
queueOptions: {
89+
executionTimeout: 1
90+
},
91+
},
92+
);
93+
});
94+
95+
describe('loadAllPreAggregationsIfNeeded', () => {
96+
let preAggregations = null;
97+
98+
beforeEach(async () => {
99+
const PreAggregations = require('../orchestrator/PreAggregations');
100+
preAggregations = new PreAggregations(
101+
"TEST",
102+
async () => mockDriver,
103+
(msg, params) => {},
104+
queryCache,
105+
{
106+
queueOptions: {
107+
executionTimeout: 1
108+
},
109+
},
110+
);
111+
});
112+
113+
test('syncronously create rollup from scratch', async () => {
114+
const result = await preAggregations.loadAllPreAggregationsIfNeeded(basicQueryWithRenew);
115+
expect(result[0][1].targetTableName).toMatch(/stb_pre_aggregations.orders_number_and_count20191101_kjypcoio_5yftl5il/);
116+
});
117+
});
118+
119+
describe(`loadAllPreAggregationsIfNeeded with externalRefresh true`, () => {
120+
let preAggregations = null;
121+
122+
beforeEach(async () => {
123+
const PreAggregations = require('../orchestrator/PreAggregations');
124+
preAggregations = new PreAggregations(
125+
"TEST",
126+
async () => mockDriver,
127+
(msg, params) => {},
128+
queryCache,
129+
{
130+
queueOptions: {
131+
executionTimeout: 1
132+
},
133+
externalRefresh: true,
134+
},
135+
);
136+
});
137+
138+
test('fail if waitForRenew is also specified', async () => {
139+
await expect(async () => {
140+
await preAggregations.loadAllPreAggregationsIfNeeded(basicQueryWithRenew);
141+
}).rejects.toThrowError(/Invalid configuration/);
142+
});
143+
144+
test('fail if rollup doesn\'t already exist', async () => {
145+
await expect(async () => {
146+
await preAggregations.loadAllPreAggregationsIfNeeded(basicQuery);
147+
}).rejects.toThrowError(/One or more pre-aggregation tables could not be found to satisfy that query/);
148+
});
149+
});
150+
});

packages/cubejs-server-core/core/index.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ declare module "@cubejs-backend/server-core" {
4343
redisPrefix?: string;
4444
queryCacheOptions?: QueryCacheOptions;
4545
preAggregationsOptions?: PreAggregationsOptions;
46+
rollupOnlyMode?: boolean;
4647
}
4748

4849
export interface QueryCacheOptions {
@@ -53,6 +54,7 @@ declare module "@cubejs-backend/server-core" {
5354

5455
export interface PreAggregationsOptions {
5556
queueOptions?: QueueOptions;
57+
externalRefresh?: boolean;
5658
}
5759

5860
export interface QueueOptions {

0 commit comments

Comments
 (0)