Skip to content

Commit d443585

Browse files
committed
feat: Pre-aggregation indexes support
1 parent 0667b8f commit d443585

File tree

8 files changed

+174
-25
lines changed

8 files changed

+174
-25
lines changed

packages/cubejs-mysql-driver/driver/MySqlDriver.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ const { promisify } = require('util');
44
const BaseDriver = require('@cubejs-backend/query-orchestrator/driver/BaseDriver');
55

66
const GenericTypeToMySql = {
7-
'string': 'varchar(255)'
7+
string: 'varchar(255)',
8+
text: 'varchar(255)'
89
};
910

1011
class MySqlDriver extends BaseDriver {

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ class PreAggregationLoader {
180180
const notLoadedKey = (this.preAggregation.invalidateKeyQueries || [])
181181
.find(keyQuery => !this.loadCache.hasKeyQueryResult(keyQuery));
182182
if (notLoadedKey && !this.waitForRenew) {
183-
const structureVersion = version(this.preAggregation.loadSql);
183+
const structureVersion = this.structureVersion();
184184

185185
const getVersionsStarted = new Date();
186186
const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation);
@@ -217,8 +217,8 @@ class PreAggregationLoader {
217217

218218
async loadPreAggregationWithKeys() {
219219
const invalidationKeys = await this.getInvalidationKeyValues();
220-
const contentVersion = version([this.preAggregation.loadSql, invalidationKeys]);
221-
const structureVersion = version(this.preAggregation.loadSql);
220+
const contentVersion = this.contentVersion(invalidationKeys);
221+
const structureVersion = this.structureVersion();
222222

223223
const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation);
224224

@@ -305,6 +305,22 @@ class PreAggregationLoader {
305305
return this.targetTableName(versionEntry);
306306
}
307307

308+
contentVersion(invalidationKeys) {
309+
return version(
310+
this.preAggregation.indexesSql && this.preAggregation.indexesSql.length ?
311+
[this.preAggregation.loadSql, this.preAggregation.indexesSql, invalidationKeys] :
312+
[this.preAggregation.loadSql, invalidationKeys]
313+
);
314+
}
315+
316+
structureVersion() {
317+
return version(
318+
this.preAggregation.indexesSql && this.preAggregation.indexesSql.length ?
319+
[this.preAggregation.loadSql, this.preAggregation.indexesSql] :
320+
this.preAggregation.loadSql
321+
);
322+
}
323+
308324
priority(defaultValue) {
309325
return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue;
310326
}
@@ -388,6 +404,8 @@ class PreAggregationLoader {
388404
await queryPromise;
389405
if (this.preAggregation.external) {
390406
await this.loadExternalPreAggregation(client, newVersionEntry);
407+
} else {
408+
await this.createIndexes(client, newVersionEntry);
391409
}
392410
await this.loadCache.reset(this.preAggregation);
393411
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry));
@@ -422,10 +440,40 @@ class PreAggregationLoader {
422440
requestId: this.requestId
423441
});
424442
await externalDriver.uploadTable(table, columns, tableData);
443+
await this.createIndexes(externalDriver, newVersionEntry);
425444
await this.loadCache.reset(this.preAggregation);
426445
await this.dropOrphanedTables(externalDriver, table);
427446
}
428447

448+
async createIndexes(driver, newVersionEntry) {
449+
if (!this.preAggregation.indexesSql || !this.preAggregation.indexesSql.length) {
450+
return;
451+
}
452+
for (let i = 0; i < this.preAggregation.indexesSql.length; i++) {
453+
const { sql, indexName } = this.preAggregation.indexesSql[i];
454+
const [query, params] = sql;
455+
const indexVersionEntry = {
456+
...newVersionEntry,
457+
table_name: indexName
458+
};
459+
this.logger('Creating pre-aggregation index', {
460+
preAggregation: this.preAggregation,
461+
requestId: this.requestId,
462+
sql
463+
});
464+
await driver.query(
465+
QueryCache.replacePreAggregationTableNames(
466+
query,
467+
this.preAggregationsTablesToTempTables.concat([
468+
[this.preAggregation.tableName, { targetTableName: this.targetTableName(newVersionEntry) }],
469+
[indexName, { targetTableName: this.targetTableName(indexVersionEntry) }]
470+
])
471+
),
472+
params
473+
);
474+
}
475+
}
476+
429477
async dropOrphanedTables(client, justCreatedTable) {
430478
await this.preAggregations.addTableUsed(justCreatedTable);
431479
const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema);

packages/cubejs-query-orchestrator/test/QueryOrchestratorTest.js

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
/* globals describe, it, should */
1+
/* globals describe, it, should, before */
22
const QueryOrchestrator = require('../orchestrator/QueryOrchestrator');
33

44
class MockDriver {
55
constructor() {
66
this.tables = [];
7+
this.executedQueries = [];
78
}
89

910
async query(query) {
11+
this.executedQueries.push(query);
1012
return [query];
1113
}
1214

@@ -25,7 +27,12 @@ class MockDriver {
2527
}
2628

2729
describe('QueryOrchestrator', () => {
28-
const mockDriver = new MockDriver();
30+
let mockDriver = null;
31+
32+
before(() => {
33+
mockDriver = new MockDriver();
34+
});
35+
2936
const queryOrchestrator = new QueryOrchestrator(
3037
'TEST', async () => mockDriver, (msg, params) => console.log(msg, params)
3138
);
@@ -50,4 +57,30 @@ describe('QueryOrchestrator', () => {
5057
console.log(result.data[0]);
5158
should(result.data[0]).match(/orders_number_and_count20191101_kjypcoio_5yftl5il/);
5259
});
60+
61+
it('indexes', async () => {
62+
const query = {
63+
query: "SELECT \"orders__created_at_week\" \"orders__created_at_week\", sum(\"orders__count\") \"orders__count\" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20191101) as partition_union WHERE (\"orders__created_at_week\" >= ($1::timestamptz::timestamptz AT TIME ZONE 'UTC') AND \"orders__created_at_week\" <= ($2::timestamptz::timestamptz AT TIME ZONE 'UTC')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000",
64+
values: ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"],
65+
cacheKeyQueries: {
66+
renewalThreshold: 21600,
67+
queries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
68+
},
69+
preAggregations: [{
70+
preAggregationsSchema: "stb_pre_aggregations",
71+
tableName: "stb_pre_aggregations.orders_number_and_count20191101",
72+
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20191101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"]],
73+
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]],
74+
indexesSql: [{
75+
sql: ['CREATE INDEX orders_number_and_count_week20191101 ON stb_pre_aggregations.orders_number_and_count20191101 ("orders__created_at_week")', []],
76+
indexName: 'orders_number_and_count_week20191101'
77+
}],
78+
}],
79+
renewQuery: true
80+
};
81+
const result = await queryOrchestrator.fetchQuery(query);
82+
console.log(result.data[0]);
83+
should(result.data[0]).match(/orders_number_and_count20191101_l3kvjcmu_khbemovd/);
84+
should(mockDriver.executedQueries).matchAny(/CREATE INDEX orders_number_and_count_week20191101_l3kvjcmu_khbemovd/);
85+
});
5386
});

packages/cubejs-schema-compiler/adapter/BaseQuery.js

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -256,11 +256,7 @@ class BaseQuery {
256256
buildSqlAndParams() {
257257
if (!this.options.preAggregationQuery && this.externalQueryClass) {
258258
if (this.externalPreAggregationQuery()) { // TODO performance
259-
const ExternalQuery = this.externalQueryClass;
260-
return new ExternalQuery(this.compilers, {
261-
...this.options,
262-
externalQueryClass: null
263-
}).buildSqlAndParams();
259+
return this.externalQuery().buildSqlAndParams();
264260
}
265261
}
266262
return this.compilers.compiler.withQuery(
@@ -269,6 +265,14 @@ class BaseQuery {
269265
);
270266
}
271267

268+
externalQuery() {
269+
const ExternalQuery = this.externalQueryClass;
270+
return new ExternalQuery(this.compilers, {
271+
...this.options,
272+
externalQueryClass: null
273+
});
274+
}
275+
272276
runningTotalDateJoinCondition() {
273277
return this.timeDimensions.map(d =>
274278
[d, (dateFrom, dateTo, dateField, dimensionDateFrom, dimensionDateTo) =>
@@ -1418,8 +1422,8 @@ class BaseQuery {
14181422
return `EXTRACT(EPOCH FROM ${this.nowTimestampSql()})`;
14191423
}
14201424

1421-
preAggregationTableName(cube, preAggregationName) {
1422-
return `${this.preAggregationSchema() && `${this.preAggregationSchema()}.`}${this.aliasName(`${cube}_${preAggregationName}`)}`;
1425+
preAggregationTableName(cube, preAggregationName, skipSchema) {
1426+
return `${skipSchema ? '' : this.preAggregationSchema() && `${this.preAggregationSchema()}.`}${this.aliasName(`${cube}_${preAggregationName}`)}`;
14231427
}
14241428

14251429
preAggregationSchema() {
@@ -1431,6 +1435,37 @@ class BaseQuery {
14311435
return [`CREATE TABLE ${tableName} ${this.asSyntaxTable} ${sqlAndParams[0]}`, sqlAndParams[1]];
14321436
}
14331437

1438+
indexSql(cube, preAggregation, index, indexName, tableName) {
1439+
if (preAggregation.external && this.externalQueryClass) {
1440+
return this.externalQuery().indexSql(cube, preAggregation, index, indexName, tableName);
1441+
}
1442+
if (index.columns) {
1443+
const columns = this.cubeEvaluator.evaluateReferences(cube, index.columns, { originalSorting: true });
1444+
const escapedColumns = columns.map(column => {
1445+
const path = column.split('.');
1446+
if (path[0] &&
1447+
this.cubeEvaluator.cubeExists(path[0]) &&
1448+
(
1449+
this.cubeEvaluator.isMeasure(path) ||
1450+
this.cubeEvaluator.isDimension(path) ||
1451+
this.cubeEvaluator.isSegment(path)
1452+
)
1453+
) {
1454+
return this.aliasName(column);
1455+
} else {
1456+
return column;
1457+
}
1458+
}).map(c => this.escapeColumnName(c));
1459+
return this.paramAllocator.buildSqlAndParams(this.createIndexSql(indexName, tableName, escapedColumns));
1460+
} else {
1461+
throw new Error(`Index SQL support is not implemented`);
1462+
}
1463+
}
1464+
1465+
createIndexSql(indexName, tableName, escapedColumns) {
1466+
return `CREATE INDEX ${indexName} ON ${tableName} (${escapedColumns.join(', ')})`;
1467+
}
1468+
14341469
preAggregationSql(cube, preAggregation) {
14351470
if (preAggregation.type === 'autoRollup') {
14361471
return this.preAggregations.autoRollupPreAggregationQuery(cube, preAggregation).buildSqlAndParams();

packages/cubejs-schema-compiler/adapter/PreAggregations.js

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,34 @@ class PreAggregations {
8989
loadSql: this.query.preAggregationLoadSql(cube, preAggregation, tableName),
9090
invalidateKeyQueries: refreshKeyQueries.queries,
9191
refreshKeyRenewalThresholds: refreshKeyQueries.refreshKeyRenewalThresholds,
92-
external: preAggregation.external
92+
external: preAggregation.external,
93+
indexesSql: Object.keys(preAggregation.indexes || {}).map(
94+
index => {
95+
const indexName = this.preAggregationTableName(cube, `${preAggregationName}_${index}`, preAggregation, true);
96+
return {
97+
indexName,
98+
sql: this.query.indexSql(
99+
cube,
100+
preAggregation,
101+
preAggregation.indexes[index],
102+
indexName,
103+
tableName
104+
)
105+
};
106+
}
107+
)
93108
};
94109
}
95110

96-
preAggregationTableName(cube, preAggregationName, preAggregation) {
111+
preAggregationTableName(cube, preAggregationName, preAggregation, skipSchema) {
97112
return this.query.preAggregationTableName(
98113
cube, preAggregationName + (
99-
preAggregation.partitionTimeDimensions ?
100-
preAggregation.partitionTimeDimensions[0].dateRange[0].replace('T00:00:00.000', '').replace(/-/g, '') :
101-
''
102-
));
114+
preAggregation.partitionTimeDimensions ?
115+
preAggregation.partitionTimeDimensions[0].dateRange[0].replace('T00:00:00.000', '').replace(/-/g, '') :
116+
''
117+
),
118+
skipSchema
119+
);
103120
}
104121

105122
findPreAggregationToUseForCube(cube) {

packages/cubejs-schema-compiler/compiler/CubePropContextTranspiler.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ class CubePropContextTranspiler {
3434

3535
sqlAndReferencesFieldVisitor(cubeName) {
3636
return this.knownIdentifiersInjectVisitor(
37-
/^(sql|measureReferences|dimensionReferences|segmentReferences|timeDimensionReference|drillMembers|drillMemberReferences|contextMembers)$/,
38-
name => this.cubeSymbols.resolveSymbol(cubeName, name) || this.cubeSymbols.isCurrentCube(name)
37+
/^(sql|measureReferences|dimensionReferences|segmentReferences|timeDimensionReference|drillMembers|drillMemberReferences|contextMembers|columns)$/,
38+
name => this.cubeSymbols.resolveSymbol(cubeName, name) || this.cubeSymbols.isCurrentCube(name)
3939
);
4040
}
4141

packages/cubejs-schema-compiler/compiler/CubeValidator.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,15 @@ const BasePreAggregation = {
7474
useOriginalSqlPreAggregations: Joi.boolean(),
7575
external: Joi.boolean(),
7676
partitionGranularity: Joi.any().valid('day', 'week', 'month', 'year'),
77-
scheduledRefresh: Joi.boolean()
77+
scheduledRefresh: Joi.boolean(),
78+
indexes: Joi.object().pattern(identifierRegex, Joi.alternatives().try(
79+
Joi.object().keys({
80+
sql: Joi.func().required()
81+
}),
82+
Joi.object().keys({
83+
columns: Joi.func().required()
84+
})
85+
)),
7886
};
7987

8088
const cubeSchema = Joi.object().keys({

packages/cubejs-schema-compiler/test/PreAggregationsTest.js

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ describe('PreAggregations', function test() {
8787
type: 'originalSql',
8888
refreshKey: {
8989
sql: 'select NOW()'
90+
},
91+
indexes: {
92+
source: {
93+
columns: ['source', 'created_at']
94+
}
9095
}
9196
},
9297
googleRollup: {
@@ -225,10 +230,12 @@ describe('PreAggregations', function test() {
225230
}
226231

227232
function tempTablePreAggregations(preAggregationsDescriptions) {
228-
return R.unnest(preAggregationsDescriptions.map(desc =>
229-
desc.invalidateKeyQueries.concat([
233+
return R.unnest(preAggregationsDescriptions.map(
234+
desc => desc.invalidateKeyQueries.concat([
230235
[desc.loadSql[0].replace('CREATE TABLE', 'CREATE TEMP TABLE'), desc.loadSql[1]]
231-
])
236+
]).concat(
237+
(desc.indexesSql || []).map(({ sql }) => sql)
238+
)
232239
));
233240
}
234241

0 commit comments

Comments
 (0)