Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,12 +574,41 @@ const variables: Record<string, (...args: any) => any> = {
.default(8192)
.asInt(),

/**
* Max number of elements
*/
touchPreAggregationCacheMaxCount: (): number => get('CUBEJS_TOUCH_PRE_AGG_CACHE_MAX_COUNT')
.default(8192)
.asInt(),

/**
* Max cache
*/
touchPreAggregationCacheMaxAge: (): number => {
// eslint-disable-next-line no-use-before-define
const touchPreAggregationTimeout = getEnv('touchPreAggregationTimeout');

const maxAge = get('CUBEJS_TOUCH_PRE_AGG_CACHE_MAX_AGE')
.default(Math.round(touchPreAggregationTimeout / 2))
.asIntPositive();

if (maxAge > touchPreAggregationTimeout) {
throw new InvalidConfiguration(
'CUBEJS_TOUCH_PRE_AGG_CACHE_MAX_AGE',
maxAge,
`Must be less or equal then CUBEJS_TOUCH_PRE_AGG_TIMEOUT (${touchPreAggregationTimeout}).`
);
}

return maxAge;
},

/**
* Expire time for touch records
*/
touchPreAggregationTimeout: (): number => get('CUBEJS_TOUCH_PRE_AGG_TIMEOUT')
.default(60 * 60 * 24)
.asInt(),
.asIntPositive(),

/**
* Expire time for touch records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
UnloadOptions,
} from '@cubejs-backend/base-driver';
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import LRUCache from 'lru-cache';

import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
import { ContinueWaitError } from './ContinueWaitError';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
Expand Down Expand Up @@ -1968,6 +1970,8 @@ export class PreAggregations {

private readonly getQueueEventsBus: any;

private readonly touchCache: LRUCache<string, true>;

public constructor(
private readonly redisPrefix: string,
private readonly driverFactory: DriverFactoryByDataSource,
Expand All @@ -1984,14 +1988,20 @@ export class PreAggregations {
this.usedTablePersistTime = options.usedTablePersistTime || getEnv('dbQueryTimeout');
this.externalRefresh = options.externalRefresh;
this.getQueueEventsBus = options.getQueueEventsBus;
this.touchCache = new LRUCache({
max: getEnv('touchPreAggregationCacheMaxCount'),
maxAge: getEnv('touchPreAggregationCacheMaxAge') * 1000,
stale: false,
updateAgeOnGet: false
});
}

protected tablesUsedRedisKey(tableName) {
protected tablesUsedRedisKey(tableName: string): string {
// TODO add dataSource?
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_USED', tableName);
}

protected tablesTouchRedisKey(tableName) {
protected tablesTouchRedisKey(tableName: string): string {
// TODO add dataSource?
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName);
}
Expand All @@ -2001,17 +2011,37 @@ export class PreAggregations {
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_REFRESH_END_REACHED', '');
}

public async addTableUsed(tableName) {
return this.queryCache.getCacheDriver().set(this.tablesUsedRedisKey(tableName), true, this.usedTablePersistTime);
public async addTableUsed(tableName: string): Promise<void> {
await this.queryCache.getCacheDriver().set(
this.tablesUsedRedisKey(tableName),
true,
this.usedTablePersistTime
);
}

public async tablesUsed() {
return (await this.queryCache.getCacheDriver().keysStartingWith(this.tablesUsedRedisKey('')))
.map(k => k.replace(this.tablesUsedRedisKey(''), ''));
}

public async updateLastTouch(tableName) {
return this.queryCache.getCacheDriver().set(this.tablesTouchRedisKey(tableName), new Date().getTime(), this.touchTablePersistTime);
public async updateLastTouch(tableName: string): Promise<void> {
if (this.touchCache.has(tableName)) {
return;
}

try {
this.touchCache.set(tableName, true);

await this.queryCache.getCacheDriver().set(
this.tablesTouchRedisKey(tableName),
new Date().getTime(),
this.touchTablePersistTime
);
} catch (e: unknown) {
this.touchCache.del(tableName);

throw e;
}
}

public async tablesTouched() {
Expand Down