Skip to content

Commit

Permalink
fix: requestId isn't propagating to all pre-aggregations messages
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Mar 15, 2020
1 parent 217895f commit 650dd6e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
28 changes: 18 additions & 10 deletions packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js
Expand Up @@ -45,26 +45,29 @@ const tablesToVersionEntries = (schema, tables) => {
};

class PreAggregationLoadCache {
constructor(redisPrefix, clientFactory, queryCache, preAggregations) {
constructor(redisPrefix, clientFactory, queryCache, preAggregations, options) {
options = options || {};
this.redisPrefix = redisPrefix;
this.driverFactory = clientFactory;
this.queryCache = queryCache;
this.preAggregations = preAggregations;
this.queryResults = {};
this.cacheDriver = preAggregations.cacheDriver;
this.externalDriverFactory = preAggregations.externalDriverFactory;
this.requestId = options.requestId;
}

async tablesFromCache(preAggregation, forceRenew) {
let tables = forceRenew ? null : await this.cacheDriver.get(this.tablesRedisKey(preAggregation));
if (!tables) {
tables = await this.preAggregations.getLoadCacheQueue().executeInQueue(
'query',
preAggregation.preAggregationsSchema,
`Fetch tables for ${preAggregation.preAggregationsSchema}`,
{
preAggregation
preAggregation, requestId: this.requestId
},
0
0,
{ requestId: this.requestId }
);
}
return tables;
Expand Down Expand Up @@ -121,7 +124,8 @@ class PreAggregationLoadCache {
2 * 60,
renewalKey: keyQuery,
waitForRenew,
priority
priority,
requestId: this.requestId
}
);
}
Expand Down Expand Up @@ -358,7 +362,7 @@ class PreAggregationLoader {
requestId: this.requestId
},
priority,
{ stageQueryKey: PreAggregations.preAggregationQueryCacheKey(this.preAggregation) }
{ stageQueryKey: PreAggregations.preAggregationQueryCacheKey(this.preAggregation), requestId: this.requestId }
);
}

Expand Down Expand Up @@ -543,7 +547,9 @@ class PreAggregations {

loadAllPreAggregationsIfNeeded(queryBody) {
const preAggregations = queryBody.preAggregations || [];
const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this);
const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this, {
requestId: queryBody.requestId
});
return preAggregations.map(p => (preAggregationsTablesToTempTables) => {
const loader = new PreAggregationLoader(
this.redisPrefix,
Expand Down Expand Up @@ -579,7 +585,7 @@ class PreAggregations {
this,
preAggregation,
preAggregationsTablesToTempTables,
new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this),
new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this, { requestId }),
{ requestId }
);
return loader.refresh(newVersionEntry)(client);
Expand All @@ -598,9 +604,11 @@ class PreAggregations {
if (!this.loadCacheQueue) {
this.loadCacheQueue = QueryCache.createQueue(`SQL_PRE_AGGREGATIONS_CACHE_${this.redisPrefix}`, this.driverFactory, (client, q) => {
const {
preAggregation
preAggregation,
requestId
} = q;
const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this);
const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this,
{ requestId });
return loadCache.fetchTables(preAggregation);
}, {
concurrency: 4,
Expand Down
Expand Up @@ -58,7 +58,8 @@ describe('QueryOrchestrator', () => {
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20191101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"]],
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
}],
renewQuery: true
renewQuery: true,
requestId: 'basic'
};
const result = await queryOrchestrator.fetchQuery(query);
console.log(result.data[0]);
Expand All @@ -83,7 +84,8 @@ describe('QueryOrchestrator', () => {
indexName: 'orders_number_and_count_week20191101'
}],
}],
renewQuery: true
renewQuery: true,
requestId: 'indexes'
};
const result = await queryOrchestrator.fetchQuery(query);
console.log(result.data[0]);
Expand All @@ -105,7 +107,8 @@ describe('QueryOrchestrator', () => {
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count_and_very_very_very_very_very_very_long20191101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"]],
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]],
}],
renewQuery: true
renewQuery: true,
requestId: 'silent truncate'
};
let thrown = true;
try {
Expand Down

0 comments on commit 650dd6e

Please sign in to comment.