diff --git a/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js b/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js index c73649a4bc9f..80b844531283 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js +++ b/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js @@ -45,7 +45,8 @@ const tablesToVersionEntries = (schema, tables) => { }; class PreAggregationLoadCache { - constructor(redisPrefix, clientFactory, queryCache, preAggregations) { + constructor(redisPrefix, clientFactory, queryCache, preAggregations, options) { + options = options || {}; this.redisPrefix = redisPrefix; this.driverFactory = clientFactory; this.queryCache = queryCache; @@ -53,6 +54,7 @@ class PreAggregationLoadCache { this.queryResults = {}; this.cacheDriver = preAggregations.cacheDriver; this.externalDriverFactory = preAggregations.externalDriverFactory; + this.requestId = options.requestId; } async tablesFromCache(preAggregation, forceRenew) { @@ -60,11 +62,12 @@ class PreAggregationLoadCache { if (!tables) { tables = await this.preAggregations.getLoadCacheQueue().executeInQueue( 'query', - preAggregation.preAggregationsSchema, + `Fetch tables for ${preAggregation.preAggregationsSchema}`, { - preAggregation + preAggregation, requestId: this.requestId }, - 0 + 0, + { requestId: this.requestId } ); } return tables; @@ -121,7 +124,8 @@ class PreAggregationLoadCache { 2 * 60, renewalKey: keyQuery, waitForRenew, - priority + priority, + requestId: this.requestId } ); } @@ -358,7 +362,7 @@ class PreAggregationLoader { requestId: this.requestId }, priority, - { stageQueryKey: PreAggregations.preAggregationQueryCacheKey(this.preAggregation) } + { stageQueryKey: PreAggregations.preAggregationQueryCacheKey(this.preAggregation), requestId: this.requestId } ); } @@ -543,7 +547,9 @@ class PreAggregations { loadAllPreAggregationsIfNeeded(queryBody) { const preAggregations = queryBody.preAggregations || []; - const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this); + const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this, { + requestId: queryBody.requestId + }); return preAggregations.map(p => (preAggregationsTablesToTempTables) => { const loader = new PreAggregationLoader( this.redisPrefix, @@ -579,7 +585,7 @@ class PreAggregations { this, preAggregation, preAggregationsTablesToTempTables, - new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this), + new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this, { requestId }), { requestId } ); return loader.refresh(newVersionEntry)(client); @@ -598,9 +604,11 @@ class PreAggregations { if (!this.loadCacheQueue) { this.loadCacheQueue = QueryCache.createQueue(`SQL_PRE_AGGREGATIONS_CACHE_${this.redisPrefix}`, this.driverFactory, (client, q) => { const { - preAggregation + preAggregation, + requestId } = q; - const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this); + const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this, + { requestId }); return loadCache.fetchTables(preAggregation); }, { concurrency: 4, diff --git a/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js b/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js index fb82f783127b..87fb470c8e96 100644 --- a/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js +++ b/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js @@ -58,7 +58,8 @@ describe('QueryOrchestrator', () => { loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20191101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"]], invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]] }], - renewQuery: true + renewQuery: true, + requestId: 'basic' }; const result = await queryOrchestrator.fetchQuery(query); console.log(result.data[0]); @@ -83,7 +84,8 @@ describe('QueryOrchestrator', () => { indexName: 'orders_number_and_count_week20191101' }], }], - renewQuery: true + renewQuery: true, + requestId: 'indexes' }; const result = await queryOrchestrator.fetchQuery(query); console.log(result.data[0]); @@ -105,7 +107,8 @@ describe('QueryOrchestrator', () => { loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count_and_very_very_very_very_very_very_long20191101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2019-11-01T00:00:00Z", "2019-11-30T23:59:59Z"]], invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]], }], - renewQuery: true + renewQuery: true, + requestId: 'silent truncate' }; let thrown = true; try {