Skip to content

Commit

Permalink
fix: Do not renew historical refresh keys during scheduled refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jan 14, 2021
1 parent 925f813 commit e5fbb12
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 3 deletions.
26 changes: 24 additions & 2 deletions packages/cubejs-schema-compiler/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,22 @@ class BaseQuery {
}]);
}

incrementalRefreshKeyRenewalThreshold(query, originalThreshold, updateWindow) {
const timeDimension = query.timeDimensions[0];
if (
updateWindow
) {
const dateToDate = this.inIntegrationTimeZone(timeDimension.dateToFormatted())
.add(this.parseSecondDuration(updateWindow), 'second')
.toDate();
if (dateToDate < new Date()) {
// if dateTo passed just moments ago we want to renew it earlier in case of server and db clock don't match
return Math.min(Math.round((new Date().getTime() - dateToDate.getTime()) / 1000), 24 * 60 * 60);
}
}
return originalThreshold;
}

defaultRefreshKeyRenewalThreshold() {
return 10;
}
Expand All @@ -2011,6 +2027,7 @@ class BaseQuery {
}

let refreshKey = this.everyRefreshKeySql(preAggregation.refreshKey);
let renewalThreshold = this.refreshKeyRenewalThresholdForInterval(preAggregation.refreshKey);
if (preAggregation.refreshKey.incremental) {
if (!preAggregation.partitionGranularity) {
throw new UserError('Incremental refresh key can only be used for partitioned pre-aggregations');
Expand All @@ -2026,12 +2043,17 @@ class BaseQuery {
refreshKey,
{ window: preAggregation.refreshKey.updateWindow }
);
renewalThreshold = this.incrementalRefreshKeyRenewalThreshold(
preAggregationQueryForSql,
renewalThreshold,
preAggregation.refreshKey.updateWindow
);
}
}
}Do
if (preAggregation.refreshKey.every || preAggregation.refreshKey.incremental) {
return {
queries: [this.paramAllocator.buildSqlAndParams(`SELECT ${refreshKey}`)],
refreshKeyRenewalThresholds: [this.refreshKeyRenewalThresholdForInterval(preAggregation.refreshKey)]
refreshKeyRenewalThresholds: [renewalThreshold]
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ describe('PreAggregations', function test() {
dimensionReferences: [source],
timeDimensionReference: createdAt,
granularity: 'day',
partitionGranularity: 'month'
partitionGranularity: 'month',
refreshKey: {
every: '1 hour',
incremental: true,
updateWindow: '7 day'
}
},
partitionedHourly: {
type: 'rollup',
Expand Down Expand Up @@ -822,6 +827,53 @@ describe('PreAggregations', function test() {
});
}));

it('incremental renewal threshold', () => compiler.compile().then(() => {
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: [
'visitors.checkinsTotal'
],
dimensions: [
'visitors.source'
],
timezone: 'America/Los_Angeles',
preAggregationsSchema: '',
timeDimensions: [{
dimension: 'visitors.createdAt',
granularity: 'day',
dateRange: [
new Date(new Date().getTime() - 60 * 24 * 60 * 60 * 1000).toJSON().substring(0, 10),
new Date().toJSON().substring(0, 10)
]
}],
order: [{
id: 'visitors.createdAt'
}],
});

const queryAndParams = query.buildSqlAndParams();
console.log(queryAndParams);
const preAggregationsDescription = query.preAggregations.preAggregationsDescription();
console.log(JSON.stringify(preAggregationsDescription, null, 2));
const partitionedTables = preAggregationsDescription
.filter(({ tableName }) => tableName.indexOf('visitors_partitioned') === 0);

partitionedTables[0].refreshKeyRenewalThresholds[0].should.be.equal(86400);
partitionedTables[partitionedTables.length - 1].refreshKeyRenewalThresholds[0].should.be.equal(300);

const queries = tempTablePreAggregations(preAggregationsDescription);

console.log(JSON.stringify(queries.concat(queryAndParams)));

return dbRunner.testQueries(
queries.concat([queryAndParams]).map(q => replaceTableName(q, preAggregationsDescription, 1042))
).then(res => {
console.log(JSON.stringify(res));
res.should.be.deepEqual(
[]
);
});
}));

it('partitioned', () => compiler.compile().then(() => {
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: [
Expand Down

0 comments on commit e5fbb12

Please sign in to comment.