Skip to content

Commit

Permalink
feat: Default refreshKey implementations for mutable and immutable pr…
Browse files Browse the repository at this point in the history
…e-aggregations.
  • Loading branch information
paveltiunov committed Jan 17, 2020
1 parent ca8dab3 commit bef0626
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 33 deletions.
103 changes: 73 additions & 30 deletions packages/cubejs-schema-compiler/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -1320,33 +1320,44 @@ class BaseQuery {
);
}

cacheKeyQueries() { // TODO collect sub queries
const preAggregationForQuery = this.preAggregations.findPreAggregationForQuery();
if (preAggregationForQuery) {
return {
renewalThreshold: this.renewalThreshold(!!preAggregationForQuery.refreshKey),
queries: this.preAggregationInvalidateKeyQueries(preAggregationForQuery.cube, preAggregationForQuery)
};
cacheKeyQueries(transformFn) { // TODO collect sub queries
if (!this.safeEvaluateSymbolContext().preAggregationQuery) {
const preAggregationForQuery = this.preAggregations.findPreAggregationForQuery();
if (preAggregationForQuery) {
return {
renewalThreshold: this.renewalThreshold(!!preAggregationForQuery.refreshKey),
queries: this.preAggregationInvalidateKeyQueries(preAggregationForQuery.cube, preAggregationForQuery)
};
}
}
let refreshKeyAllSetManually = true;
const queries = this.collectCubeNames()
.map(cube => {
const cubeFromPath = this.cubeEvaluator.cubeFromPath(cube);
if (cubeFromPath.refreshKey) {
return this.evaluateSql(cube, cubeFromPath.refreshKey.sql);
}
refreshKeyAllSetManually = false;
const timeDimensions = this.cubeEvaluator.timeDimensionPathsForCube(cube);
if (timeDimensions.length) {
const dimension = timeDimensions.filter(f => f.toLowerCase().indexOf('update') !== -1)[0] || timeDimensions[0];
const foundMainTimeDimension = this.newTimeDimension({ dimension });
const aggSelect = this.aggSelectForDimension(cube, foundMainTimeDimension, 'max');
if (aggSelect) {
return aggSelect;
}
const refreshKeyQueryByCube = cube => {
const cubeFromPath = this.cubeEvaluator.cubeFromPath(cube);
if (cubeFromPath.refreshKey && cubeFromPath.refreshKey.sql) {
return this.evaluateSql(cube, cubeFromPath.refreshKey.sql);
}
refreshKeyAllSetManually = false;
const timeDimensions =
!(cubeFromPath.refreshKey && cubeFromPath.refreshKey.immutable) ?
this.cubeEvaluator.timeDimensionPathsForCube(cube) :
[];
if (timeDimensions.length) {
const dimension = timeDimensions.filter(f => f.toLowerCase().indexOf('update') !== -1)[0] || timeDimensions[0];
const foundMainTimeDimension = this.newTimeDimension({ dimension });
const aggSelect = this.aggSelectForDimension(cube, foundMainTimeDimension, 'max');
if (aggSelect) {
return aggSelect;
}
return `select count(*) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`;
}).map(paramAnnotatedSql => this.paramAllocator.buildSqlAndParams(paramAnnotatedSql));
}
return this.evaluateSymbolSqlWithContext(
() => `select count(*) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`,
{ preAggregationQuery: true }
);
};
const queries = this.collectCubeNames()
.map(cube => [cube, refreshKeyQueryByCube(cube)])
.map(([cube, sql]) => (transformFn ? transformFn(sql, cube) : sql))
.map(paramAnnotatedSql => this.paramAllocator.buildSqlAndParams(paramAnnotatedSql));
return {
queries,
renewalThreshold: this.renewalThreshold(refreshKeyAllSetManually)
Expand Down Expand Up @@ -1423,14 +1434,46 @@ class BaseQuery {
}

preAggregationInvalidateKeyQueries(cube, preAggregation) {
const preAggregationQueryForSql = this.preAggregationQueryForSqlEvaluation(cube, preAggregation);
if (preAggregation.refreshKey) {
return [this.paramAllocator.buildSqlAndParams(
this.preAggregationQueryForSqlEvaluation(cube, preAggregation).evaluateSql(cube, preAggregation.refreshKey.sql)
)];
if (preAggregation.refreshKey.sql) {
return [this.paramAllocator.buildSqlAndParams(
preAggregationQueryForSql.evaluateSql(cube, preAggregation.refreshKey.sql)
)];
}
}
if (preAggregation.partitionGranularity) {
const cubeFromPath = this.cubeEvaluator.cubeFromPath(cube);
return preAggregationQueryForSql.evaluateSymbolSqlWithContext(
() => preAggregationQueryForSql.cacheKeyQueries(
(originalRefreshKey, refreshKeyCube) => {
if (cubeFromPath.refreshKey && cubeFromPath.refreshKey.immutable) {
return preAggregationQueryForSql.evaluateSql(
cube,
(FILTER_PARAMS) => `SELECT ${preAggregationQueryForSql.caseWhenStatement([{
sql: FILTER_PARAMS[
preAggregationQueryForSql.timeDimensions[0].path()[0]
][
preAggregationQueryForSql.timeDimensions[0].path()[1]
].filter((from, to) => `${preAggregationQueryForSql.nowTimestampSql()} < ${this.timeStampCast(to)}`),
label: `(${originalRefreshKey})`
}])}`
);
} else {
// TODO handle WHERE while generating originalRefreshKey
return refreshKeyCube === preAggregationQueryForSql.timeDimensions[0].path()[0] ?
`${originalRefreshKey} WHERE ${preAggregationQueryForSql.timeDimensions[0].filterToWhere()}` :
originalRefreshKey;
}
}
),
{ preAggregationQuery: true }
).queries;
}
return [this.paramAllocator.buildSqlAndParams(
`SELECT ${this.timeGroupedColumn('hour', this.convertTz(this.nowTimestampSql()))} as current_hour`
)];
return preAggregationQueryForSql.evaluateSymbolSqlWithContext(
() => preAggregationQueryForSql.cacheKeyQueries(),
{ preAggregationQuery: true }
).queries;
}

preAggregationStartEndQueries(cube, preAggregation) {
Expand Down
11 changes: 8 additions & 3 deletions packages/cubejs-schema-compiler/compiler/CubeValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ const BasePreAggregation = {
const cubeSchema = Joi.object().keys({
name: identifier,
sql: Joi.func().required(),
refreshKey: Joi.object().keys({
sql: Joi.func().required()
}),
refreshKey: Joi.alternatives().try(
Joi.object().keys({
sql: Joi.func().required()
}),
Joi.object().keys({
immutable: Joi.boolean().required()
})
),
fileName: Joi.string().required(),
extends: Joi.func(),
allDefinitions: Joi.func(),
Expand Down
106 changes: 106 additions & 0 deletions packages/cubejs-schema-compiler/test/PreAggregationsTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ describe('PreAggregations', function test() {
})
cube('GoogleVisitors', {
refreshKey: {
immutable: true,
},
extends: visitors,
sql: \`select v.* from \${visitors.sql()} v where v.source = 'google'\`
})
Expand Down Expand Up @@ -338,6 +341,109 @@ describe('PreAggregations', function test() {
});
});

it('immutable partition default refreshKey', () => {
return compiler.compile().then(() => {
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: [
'GoogleVisitors.checkinsTotal'
],
dimensions: [
'GoogleVisitors.source'
],
timeDimensions: [{
dimension: 'GoogleVisitors.createdAt',
granularity: 'day',
dateRange: ['2017-01-01', '2017-01-30']
}],
timezone: 'America/Los_Angeles',
order: [{
id: 'GoogleVisitors.createdAt'
}],
preAggregationsSchema: ''
});

const queryAndParams = query.buildSqlAndParams();
console.log(queryAndParams);
const preAggregationsDescription = query.preAggregations.preAggregationsDescription();
console.log(JSON.stringify(preAggregationsDescription, null, 2));

preAggregationsDescription[0].invalidateKeyQueries[0][0].should.match(/NOW\(\) </)

return dbRunner.testQueries(tempTablePreAggregations(preAggregationsDescription).concat([
query.buildSqlAndParams()
]).map(q => replaceTableName(q, preAggregationsDescription, 101))).then(res => {
res.should.be.deepEqual(
[
{
"google_visitors__source": "google",
"google_visitors__created_at_day": "2017-01-05T00:00:00.000Z",
"google_visitors__checkins_total": "1"
}
]
);
});
});
});

it('mutable partition default refreshKey', () => {
return compiler.compile().then(() => {
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: [
'visitors.checkinsTotal'
],
dimensions: [
'visitors.source'
],
timeDimensions: [{
dimension: 'visitors.createdAt',
granularity: 'day',
dateRange: ['2017-01-01', '2017-01-30']
}],
timezone: 'America/Los_Angeles',
order: [{
id: 'visitors.createdAt'
}],
preAggregationsSchema: ''
});

const queryAndParams = query.buildSqlAndParams();
console.log(queryAndParams);
const preAggregationsDescription = query.preAggregations.preAggregationsDescription();
console.log(JSON.stringify(preAggregationsDescription, null, 2));

preAggregationsDescription[0].invalidateKeyQueries[0][0].should.match(/>=/);

return dbRunner.testQueries(tempTablePreAggregations(preAggregationsDescription).concat([
query.buildSqlAndParams()
]).map(q => replaceTableName(q, preAggregationsDescription, 102))).then(res => {
res.should.be.deepEqual(
[
{
visitors__source: 'some',
visitors__created_at_day: '2017-01-02T00:00:00.000Z',
visitors__checkins_total: '3'
},
{
visitors__source: 'some',
visitors__created_at_day: '2017-01-04T00:00:00.000Z',
visitors__checkins_total: '2'
},
{
visitors__source: 'google',
visitors__created_at_day: '2017-01-05T00:00:00.000Z',
visitors__checkins_total: '1'
},
{
visitors__source: null,
visitors__created_at_day: '2017-01-06T00:00:00.000Z',
visitors__checkins_total: '0'
}
]
);
});
});
});

it('hll bigquery rollup', () => {
return compiler.compile().then(() => {
const query = new BigqueryQuery({ joinGraph, cubeEvaluator, compiler }, {
Expand Down

0 comments on commit bef0626

Please sign in to comment.