From 1e591f022a45362abc3d6b133de49acc35378d2d Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 11 Dec 2023 16:45:56 +0100 Subject: [PATCH] feat(query-orchestrator): Reduce number of touches for pre-aggregations with LRU cache chore: fix --- packages/cubejs-backend-shared/src/env.ts | 31 +++++++++++++- .../src/orchestrator/PreAggregations.ts | 42 ++++++++++++++++--- 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index ab765e50cd940..03b2fddb99752 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -574,12 +574,41 @@ const variables: Record any> = { .default(8192) .asInt(), + /** + * Max number of elements + */ + touchPreAggregationCacheMaxCount: (): number => get('CUBEJS_TOUCH_PRE_AGG_CACHE_MAX_COUNT') + .default(8192) + .asInt(), + + /** + * Max cache + */ + touchPreAggregationCacheMaxAge: (): number => { + // eslint-disable-next-line no-use-before-define + const touchPreAggregationTimeout = getEnv('touchPreAggregationTimeout'); + + const maxAge = get('CUBEJS_TOUCH_PRE_AGG_CACHE_MAX_AGE') + .default(Math.round(touchPreAggregationTimeout / 2)) + .asIntPositive(); + + if (maxAge > touchPreAggregationTimeout) { + throw new InvalidConfiguration( + 'CUBEJS_TOUCH_PRE_AGG_CACHE_MAX_AGE', + maxAge, + `Must be less or equal then CUBEJS_TOUCH_PRE_AGG_TIMEOUT (${touchPreAggregationTimeout}).` + ); + } + + return maxAge; + }, + /** * Expire time for touch records */ touchPreAggregationTimeout: (): number => get('CUBEJS_TOUCH_PRE_AGG_TIMEOUT') .default(60 * 60 * 24) - .asInt(), + .asIntPositive(), /** * Expire time for touch records diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index cb4707ed86672..ce08deeafb89f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -26,6 +26,8 @@ import { UnloadOptions, } from '@cubejs-backend/base-driver'; import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; +import LRUCache from 'lru-cache'; + import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; import { ContinueWaitError } from './ContinueWaitError'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; @@ -1968,6 +1970,8 @@ export class PreAggregations { private readonly getQueueEventsBus: any; + private readonly touchCache: LRUCache; + public constructor( private readonly redisPrefix: string, private readonly driverFactory: DriverFactoryByDataSource, @@ -1984,14 +1988,20 @@ export class PreAggregations { this.usedTablePersistTime = options.usedTablePersistTime || getEnv('dbQueryTimeout'); this.externalRefresh = options.externalRefresh; this.getQueueEventsBus = options.getQueueEventsBus; + this.touchCache = new LRUCache({ + max: getEnv('touchPreAggregationCacheMaxCount'), + maxAge: getEnv('touchPreAggregationCacheMaxAge') * 1000, + stale: false, + updateAgeOnGet: false + }); } - protected tablesUsedRedisKey(tableName) { + protected tablesUsedRedisKey(tableName: string): string { // TODO add dataSource? return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_USED', tableName); } - protected tablesTouchRedisKey(tableName) { + protected tablesTouchRedisKey(tableName: string): string { // TODO add dataSource? return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName); } @@ -2001,8 +2011,12 @@ export class PreAggregations { return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_REFRESH_END_REACHED', ''); } - public async addTableUsed(tableName) { - return this.queryCache.getCacheDriver().set(this.tablesUsedRedisKey(tableName), true, this.usedTablePersistTime); + public async addTableUsed(tableName: string): Promise { + await this.queryCache.getCacheDriver().set( + this.tablesUsedRedisKey(tableName), + true, + this.usedTablePersistTime + ); } public async tablesUsed() { @@ -2010,8 +2024,24 @@ export class PreAggregations { .map(k => k.replace(this.tablesUsedRedisKey(''), '')); } - public async updateLastTouch(tableName) { - return this.queryCache.getCacheDriver().set(this.tablesTouchRedisKey(tableName), new Date().getTime(), this.touchTablePersistTime); + public async updateLastTouch(tableName: string): Promise { + if (this.touchCache.has(tableName)) { + return; + } + + try { + this.touchCache.set(tableName, true); + + await this.queryCache.getCacheDriver().set( + this.tablesTouchRedisKey(tableName), + new Date().getTime(), + this.touchTablePersistTime + ); + } catch (e: unknown) { + this.touchCache.del(tableName); + + throw e; + } } public async tablesTouched() {