diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 15620e1b3bad6..3d07c42387563 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -95,6 +95,7 @@ export interface QueueDriverConnectionInterface { release(): void; // getQueriesToCancel(): Promise + // @deprecated getActiveAndToProcess(): Promise; } diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index ae42f1f17131b..bdbb1f33872fe 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -136,34 +136,13 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { } public async getActiveAndToProcess(): Promise { - const rows = await this.driver.query('QUEUE LIST ?', [ - this.options.redisQueuePrefix - ]); - if (rows.length) { - const active: QueryKeysTuple[] = []; - const toProcess: QueryKeysTuple[] = []; - - for (const row of rows) { - if (row.status === 'active') { - active.push([ - row.id as QueryKeyHash, - row.queue_id ? parseInt(row.queue_id, 10) : null, - ]); - } else { - toProcess.push([ - row.id as QueryKeyHash, - row.queue_id ? parseInt(row.queue_id, 10) : null, - ]); - } - } - - return [ - active, - toProcess, - ]; - } - - return [[], []]; + return [ + // We don't return active queries, because it's useless + // There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl + // Cube Store provides strict guarantees that queue item cannot be active & pending in the same time + [], + await this.getToProcessQueries() + ]; } public async getNextProcessingId(): Promise { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 7a37e20e5a00f..9c0dc41b1fb84 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -547,6 +547,13 @@ export class QueryQueue { } })); + /** + * There is a bug somewhere in Redis (maybe in memory too?), + * which doesn't remove queue item from pending, while it's in active state + * + * TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time + * TODO(ovr): Migrate to getToProcessQueries after removal of Redis + */ const [active, toProcess] = await queueConnection.getActiveAndToProcess(); await Promise.all(