Skip to content

Commit 931fb7c

Browse files
committed
feat: Serve pre-aggregated data right from external database without hitting main one if pre-aggregation is available
1 parent 0db2282 commit 931fb7c

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ class PreAggregationLoadCache {
122122
return this.queryResults[this.queryCache.queryRedisKey(keyQuery)];
123123
}
124124

125+
hasKeyQueryResult(keyQuery) {
126+
return !!this.queryResults[this.queryCache.queryRedisKey(keyQuery)];
127+
}
128+
125129
async getQueryStage(stageQueryKey) {
126130
const queue = this.preAggregations.getQueue();
127131
if (!this.queryStageState) {
@@ -164,6 +168,30 @@ class PreAggregationLoader {
164168
}
165169

166170
async loadPreAggregation() {
171+
const notLoadedKey = (this.preAggregation.invalidateKeyQueries || [])
172+
.find(keyQuery => !this.loadCache.hasKeyQueryResult(keyQuery));
173+
if (notLoadedKey && !this.waitForRenew) {
174+
const structureVersion = version(this.preAggregation.loadSql);
175+
const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation);
176+
const versionEntryByStructureVersion = versionEntries.find(
177+
v => v.table_name === this.preAggregation.tableName && v.structure_version === structureVersion
178+
);
179+
if (versionEntryByStructureVersion) {
180+
this.loadPreAggregationWithKeys().catch(e => {
181+
if (!(e instanceof ContinueWaitError)) {
182+
this.logger('Error loading pre-aggregation', { error: (e.stack || e), preAggregation: this.preAggregation });
183+
}
184+
});
185+
return this.targetTableName(versionEntryByStructureVersion);
186+
} else {
187+
return this.loadPreAggregationWithKeys();
188+
}
189+
} else {
190+
return this.loadPreAggregationWithKeys();
191+
}
192+
}
193+
194+
async loadPreAggregationWithKeys() {
167195
const invalidationKeys = await Promise.all(
168196
(this.preAggregation.invalidateKeyQueries || [])
169197
.map(keyQuery => this.loadCache.keyQueryResult(keyQuery))
@@ -182,6 +210,7 @@ class PreAggregationLoader {
182210
return this.targetTableName(versionEntryByContentVersion);
183211
}
184212

213+
// TODO this check can be redundant due to structure version is already checked in loadPreAggregation()
185214
if (
186215
!this.waitForRenew &&
187216
await this.loadCache.getQueryStage(PreAggregations.preAggregationQueryCacheKey(this.preAggregation))

0 commit comments

Comments
 (0)