Skip to content

Commit

Permalink
fix: continueWaitTimeout is ignored: expose single centralized contin…
Browse files Browse the repository at this point in the history
…ueWaitTimeout option (#2120)

* fix: continueWaitTimeout is ignored: expose single centralized continueWaitTimeout option

Fixes #225

* chore: Fix intermittent test fail
  • Loading branch information
paveltiunov committed Feb 17, 2021
1 parent 0d97357 commit 2735d2c
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 5 deletions.
2 changes: 1 addition & 1 deletion packages/cubejs-backend-shared/test/promises.test.ts
Expand Up @@ -338,7 +338,7 @@ describe('asyncMemoize', () => {

expect(called).toEqual(2);

await pausePromise(500);
await pausePromise(800);

expect(await memCall('test') !== firstCallRandomValue).toEqual(true);

Expand Down
Expand Up @@ -773,7 +773,8 @@ type PreAggregationsOptions = {
loadCacheQueueOptions?: any;
queueOptions?: object | ((dataSource: String) => object);
redisPool?: any;
cacheAndQueueDriver?: 'redis' | 'memory'
continueWaitTimeout?: number;
cacheAndQueueDriver?: 'redis' | 'memory';
};

export class PreAggregations {
Expand Down Expand Up @@ -895,6 +896,8 @@ export class PreAggregations {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
// Centralized continueWaitTimeout that can be overridden in queueOptions
continueWaitTimeout: this.options.continueWaitTimeout,
...(typeof this.options.queueOptions === 'function' ?
this.options.queueOptions(dataSource) :
this.options.queueOptions
Expand Down
Expand Up @@ -31,6 +31,7 @@ export class QueryCache {
backgroundRenew?: Boolean;
queueOptions?: object | ((dataSource: String) => object);
redisPool?: any;
continueWaitTimeout?: number;
cacheAndQueueDriver?: 'redis' | 'memory';
} = {}
) {
Expand Down Expand Up @@ -202,6 +203,8 @@ export class QueryCache {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
// Centralized continueWaitTimeout that can be overridden in queueOptions
continueWaitTimeout: this.options.continueWaitTimeout,
...(typeof this.options.queueOptions === 'function' ?
this.options.queueOptions(dataSource) :
this.options.queueOptions
Expand All @@ -227,6 +230,8 @@ export class QueryCache {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
// Centralized continueWaitTimeout that can be overridden in queueOptions
continueWaitTimeout: this.options.continueWaitTimeout,
...this.options.externalQueueOptions
}
);
Expand Down
Expand Up @@ -13,6 +13,7 @@ interface QueryOrchestratorOptions {
queryCacheOptions?: any;
preAggregationsOptions?: any;
rollupOnlyMode?: boolean;
continueWaitTimeout?: number;
}

export class QueryOrchestrator {
Expand Down Expand Up @@ -45,7 +46,7 @@ export class QueryOrchestrator {
}

const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool(options.redisPoolOptions) : undefined;
const { externalDriverFactory } = options;
const { externalDriverFactory, continueWaitTimeout } = options;

this.driverFactory = driverFactory;

Expand All @@ -54,6 +55,7 @@ export class QueryOrchestrator {
externalDriverFactory,
cacheAndQueueDriver,
redisPool,
continueWaitTimeout,
...options.queryCacheOptions,
}
);
Expand All @@ -63,6 +65,7 @@ export class QueryOrchestrator {
externalDriverFactory,
cacheAndQueueDriver,
redisPool,
continueWaitTimeout,
...options.preAggregationsOptions
}
);
Expand Down
6 changes: 4 additions & 2 deletions packages/cubejs-server-core/src/core/OrchestratorApi.ts
Expand Up @@ -8,10 +8,12 @@ export class OrchestratorApi {
protected readonly orchestrator: QueryOrchestrator;

protected readonly externalDriverFactory: any;

protected readonly continueWaitTimeout: number;

public constructor(protected driverFactory, protected logger, protected options: any = {}) {
const { externalDriverFactory } = options;

this.continueWaitTimeout = this.options.continueWaitTimeout || 5;
this.orchestrator = new QueryOrchestrator(options.redisPrefix || 'STANDALONE', driverFactory, logger, options);
this.driverFactory = driverFactory;
this.externalDriverFactory = externalDriverFactory;
Expand All @@ -33,7 +35,7 @@ export class OrchestratorApi {
this.orchestrator.loadRefreshKeys(query) :
this.orchestrator.fetchQuery(query);

fetchQueryPromise = pt.timeout(fetchQueryPromise, 5 * 1000);
fetchQueryPromise = pt.timeout(fetchQueryPromise, this.continueWaitTimeout * 1000);

const data = await fetchQueryPromise;

Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/src/core/optionsValidate.ts
Expand Up @@ -70,6 +70,7 @@ const schemaOptions = Joi.object().keys({
Joi.func(),
Joi.object().keys({
redisPrefix: Joi.string().allow(''),
continueWaitTimeout: Joi.number().min(0).integer(),
queryCacheOptions: Joi.object().keys({
refreshKeyRenewalThreshold: Joi.number().min(0).integer(),
backgroundRenew: Joi.boolean(),
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/test/unit/index.test.ts
Expand Up @@ -100,6 +100,7 @@ describe('index.test', () => {
allowUngroupedWithoutPrimaryKey: true,
scheduledRefreshConcurrency: 4,
orchestratorOptions: {
continueWaitTimeout: 10,
redisPrefix: 'some-prefix',
queryCacheOptions: {
refreshKeyRenewalThreshold: 1000,
Expand Down

0 comments on commit 2735d2c

Please sign in to comment.