Skip to content

Commit b17e841

Browse files
authored
feat: Support external rollups from readonly source (#395)
* Add downloadQueryResults & readOnly to base driver * Support including field type info in pg driver * Add support for download-only refresh strategy * Fix linting * Fix missing parameter * Describe both the create table and select query * Use correct sql * Update docs * Address review feedback
1 parent c46c721 commit b17e841

File tree

5 files changed

+132
-44
lines changed

5 files changed

+132
-44
lines changed

docs/Schema/pre-aggregations.md

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ cube(`Orders`, {
9797
sql: `amount`,
9898
type: `sum`
9999
},
100-
100+
101101
averageRevenue: {
102102
sql: `${revenue} / ${count}`,
103103
type: `number`
@@ -271,6 +271,28 @@ cube(`Orders`, {
271271
In order to make external pre-aggregations work you should set
272272
[externalDriverFactory](@cubejs-backend-server-core#external-driver-factory) and [externalDbType](@cubejs-backend-server-core#external-db-type) params while creating your server instance.
273273

274+
Note that by default, Cube.js materializes the pre-aggregration query results as new tables in the source database. For external pre-aggregations, these source tables are temporary - once downloaded and uploaded to the external database, they are cleaned-up.
275+
276+
However, it may not be possible to stage pre-aggregation query results in materialized tables in the source database like this - for example, if the driver doesn't support it, or if your source database is read-only. To fallback to a strategy where the pre-aggreation query results are downloaded without first being materialized, set the `readOnly` param of [driverFactory](@cubejs-backend-server-core#driver-factory) in your configuration:
277+
278+
```javascript
279+
const CubejsServer = require('@cubejs-backend/server');
280+
const PostgresDriver = require('@cubejs-backend/postgres-driver');
281+
282+
const options = {
283+
driverFactory: () => new PostgresDriver({
284+
readOnly: true
285+
}),
286+
externalDbType: 'postgres',
287+
externalDriverFactory: () => new PostgresDriver({
288+
host: 'my_host',
289+
database: 'my_db',
290+
user: 'my_user',
291+
password: 'my_pw'
292+
})
293+
};
294+
```
295+
274296
## refreshKey
275297

276298
Cube.js also takes care of keeping pre-aggregations up to date.
@@ -320,7 +342,7 @@ In case of partitioned rollups incremental `refreshKey` can be used as follows:
320342
```javascript
321343
cube(`Orders`, {
322344
sql: `select * from orders`,
323-
345+
324346
// ...
325347

326348
preAggregations: {
@@ -358,7 +380,7 @@ Example usage:
358380
```javascript
359381
cube(`Orders`, {
360382
sql: `select * from orders`,
361-
383+
362384
// ...
363385

364386
preAggregations: {
@@ -373,4 +395,3 @@ cube(`Orders`, {
373395
}
374396
});
375397
```
376-

packages/cubejs-postgres-driver/driver/PostgresDriver.js

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ const GenericTypeToPostgres = {
99
string: 'text'
1010
};
1111

12+
const DataTypeMapping = {};
13+
Object.entries(types.builtins).forEach(pair => {
14+
const [key, value] = pair;
15+
DataTypeMapping[value] = key;
16+
});
17+
1218
const timestampDataTypes = [1114, 1184];
1319

1420
const timestampTypeParser = val => moment.utc(val).format(moment.HTML5_FMT.DATETIME_LOCAL_MS);
@@ -44,7 +50,7 @@ class PostgresDriver extends BaseDriver {
4450
}
4551
}
4652

47-
async query(query, values) {
53+
async queryResponse(query, values) {
4854
const client = await this.pool.connect();
4955
try {
5056
await client.query(`SET TIME ZONE '${this.config.storeTimezone || 'UTC'}'`);
@@ -65,12 +71,31 @@ class PostgresDriver extends BaseDriver {
6571
},
6672
},
6773
});
68-
return res && res.rows;
74+
return res;
6975
} finally {
7076
await client.release();
7177
}
7278
}
7379

80+
async query(query, values) {
81+
return (await this.queryResponse(query, values)).rows;
82+
}
83+
84+
async downloadQueryResults(query, values) {
85+
const res = await this.queryResponse(query, values);
86+
return {
87+
rows: res.rows,
88+
types: res.fields.map(f => ({
89+
name: f.name,
90+
type: this.toGenericType(DataTypeMapping[f.dataTypeID].toLowerCase())
91+
})),
92+
};
93+
}
94+
95+
readOnly() {
96+
return !!this.config.readOnly;
97+
}
98+
7499
async uploadTable(table, columns, tableData) {
75100
if (!tableData.rows) {
76101
throw new Error(`${this.constructor} driver supports only rows upload`);

packages/cubejs-query-orchestrator/driver/BaseDriver.js

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,19 @@ class BaseDriver {
3838
}
3939

4040
testConnection() {
41-
throw 'Not implemented';
41+
throw new Error('Not implemented');
4242
}
4343

4444
query() {
45-
throw 'Not implemented';
45+
throw new Error('Not implemented');
46+
}
47+
48+
downloadQueryResults() {
49+
throw new Error('Not implemented');
50+
}
51+
52+
readOnly() {
53+
return false;
4654
}
4755

4856
tablesSchema() {
@@ -134,7 +142,7 @@ class BaseDriver {
134142
columns.table_name,
135143
columns.table_schema,
136144
columns.data_type
137-
FROM information_schema.columns
145+
FROM information_schema.columns
138146
WHERE table_name = ${this.param(0)} AND table_schema = ${this.param(1)}`,
139147
[name, schema]
140148
);

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 67 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -387,42 +387,70 @@ class PreAggregationLoader {
387387

388388
refresh(newVersionEntry) {
389389
return (client) => {
390-
const [loadSql, params] =
391-
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
392-
let queryPromise = null;
393-
const refreshImpl = async () => {
394-
if (this.preAggregation.external) { // TODO optimize
395-
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema);
396-
}
397-
queryPromise = client.loadPreAggregationIntoTable(
398-
this.targetTableName(newVersionEntry),
399-
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
400-
.replace(
401-
this.preAggregation.tableName,
402-
this.targetTableName(newVersionEntry)
403-
),
404-
params
405-
);
406-
await queryPromise;
407-
if (this.preAggregation.external) {
408-
await this.loadExternalPreAggregation(client, newVersionEntry);
409-
} else {
410-
await this.createIndexes(client, newVersionEntry);
411-
}
412-
await this.loadCache.reset(this.preAggregation);
413-
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry));
414-
if (!this.preAggregation.external) {
415-
await this.loadCache.reset(this.preAggregation);
416-
}
417-
};
418-
419-
const resultPromise = refreshImpl();
420-
resultPromise.cancel = () => queryPromise.cancel(); // TODO cancel for external upload
390+
let refreshStrategy = this.refreshImplStoreInSourceStrategy;
391+
if (this.preAggregation.external) {
392+
refreshStrategy = client.readOnly() ? this.refreshImplStreamExternalStrategy : this.refreshImplTempTableExternalStrategy;
393+
}
394+
const resultPromise = refreshStrategy.bind(this)(client, newVersionEntry);
395+
resultPromise.cancel = () => {} // TODO implement cancel (loading rollup into table and external upload)
421396
return resultPromise;
422397
};
423398
}
424399

425-
async loadExternalPreAggregation(client, newVersionEntry) {
400+
async refreshImplStoreInSourceStrategy(client, newVersionEntry) {
401+
const [loadSql, params] =
402+
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
403+
await client.loadPreAggregationIntoTable(
404+
this.targetTableName(newVersionEntry),
405+
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
406+
.replace(
407+
this.preAggregation.tableName,
408+
this.targetTableName(newVersionEntry)
409+
),
410+
params
411+
);
412+
await this.createIndexes(client, newVersionEntry);
413+
await this.loadCache.reset(this.preAggregation);
414+
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry));
415+
await this.loadCache.reset(this.preAggregation);
416+
}
417+
418+
async refreshImplTempTableExternalStrategy(client, newVersionEntry) {
419+
const [loadSql, params] =
420+
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
421+
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema);
422+
await client.loadPreAggregationIntoTable(
423+
this.targetTableName(newVersionEntry),
424+
QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
425+
.replace(
426+
this.preAggregation.tableName,
427+
this.targetTableName(newVersionEntry)
428+
),
429+
params
430+
);
431+
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry);
432+
await this.uploadExternalPreAggregation(tableData, newVersionEntry);
433+
await this.loadCache.reset(this.preAggregation);
434+
await this.dropOrphanedTables(client, this.targetTableName(newVersionEntry));
435+
}
436+
437+
async refreshImplStreamExternalStrategy(client, newVersionEntry) {
438+
const [sql, params] =
439+
Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []];
440+
if (!client.downloadQueryResults) {
441+
throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadQueryResults()`);
442+
}
443+
444+
this.logger('Downloading external pre-aggregation via query', {
445+
preAggregation: this.preAggregation,
446+
requestId: this.requestId
447+
});
448+
const tableData = await client.downloadQueryResults(sql, params);
449+
await this.uploadExternalPreAggregation(tableData, newVersionEntry);
450+
await this.loadCache.reset(this.preAggregation);
451+
}
452+
453+
async downloadTempExternalPreAggregation(client, newVersionEntry) {
426454
if (!client.downloadTable) {
427455
throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadTable()`);
428456
}
@@ -432,7 +460,12 @@ class PreAggregationLoader {
432460
requestId: this.requestId
433461
});
434462
const tableData = await client.downloadTable(table);
435-
const columns = await client.tableColumnTypes(table);
463+
tableData.types = await client.tableColumnTypes(table);
464+
return tableData;
465+
}
466+
467+
async uploadExternalPreAggregation(tableData, newVersionEntry) {
468+
const table = this.targetTableName(newVersionEntry);
436469
const externalDriver = await this.externalDriverFactory();
437470
if (!externalDriver.uploadTable) {
438471
throw new Error(`Can't load external pre-aggregation: destination driver doesn't support uploadTable()`);
@@ -441,7 +474,7 @@ class PreAggregationLoader {
441474
preAggregation: this.preAggregation,
442475
requestId: this.requestId
443476
});
444-
await externalDriver.uploadTable(table, columns, tableData);
477+
await externalDriver.uploadTable(table, tableData.types, tableData);
445478
await this.createIndexes(externalDriver, newVersionEntry);
446479
await this.loadCache.reset(this.preAggregation);
447480
await this.dropOrphanedTables(externalDriver, table);

packages/cubejs-schema-compiler/adapter/PreAggregations.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class PreAggregations {
8787
preAggregationsSchema: this.query.preAggregationSchema(),
8888
tableName,
8989
loadSql: this.query.preAggregationLoadSql(cube, preAggregation, tableName),
90+
sql: this.query.preAggregationSql(cube, preAggregation),
9091
invalidateKeyQueries: refreshKeyQueries.queries,
9192
refreshKeyRenewalThresholds: refreshKeyQueries.refreshKeyRenewalThresholds,
9293
external: preAggregation.external,
@@ -462,7 +463,7 @@ class PreAggregations {
462463
preAggregationForQuery.preAggregation.measures :
463464
this.evaluateAllReferences(preAggregationForQuery.cube, preAggregationForQuery.preAggregation).measures
464465
);
465-
466+
466467
const rollupGranularity = this.castGranularity(preAggregationForQuery.preAggregation.granularity) || 'day';
467468

468469
return this.query.evaluateSymbolSqlWithContext(

0 commit comments

Comments
 (0)