From 280a99cc11398fccc3c7545cff20b38c926f5788 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 15 Jan 2024 15:03:18 +0100 Subject: [PATCH] feat(query-orchestrator): Queue - reduce trafic for processing (Cube Store only) Signed-off-by: Dmitry Patsura --- .../src/queue-driver.interface.ts | 1 + .../src/CubeStoreQueueDriver.ts | 35 ++++--------------- .../src/orchestrator/QueryQueue.js | 7 ++++ 3 files changed, 15 insertions(+), 28 deletions(-) diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 15620e1b3bad6..3d07c42387563 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -95,6 +95,7 @@ export interface QueueDriverConnectionInterface { release(): void; // getQueriesToCancel(): Promise + // @deprecated getActiveAndToProcess(): Promise; } diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index ae42f1f17131b..bdbb1f33872fe 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -136,34 +136,13 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { } public async getActiveAndToProcess(): Promise { - const rows = await this.driver.query('QUEUE LIST ?', [ - this.options.redisQueuePrefix - ]); - if (rows.length) { - const active: QueryKeysTuple[] = []; - const toProcess: QueryKeysTuple[] = []; - - for (const row of rows) { - if (row.status === 'active') { - active.push([ - row.id as QueryKeyHash, - row.queue_id ? parseInt(row.queue_id, 10) : null, - ]); - } else { - toProcess.push([ - row.id as QueryKeyHash, - row.queue_id ? parseInt(row.queue_id, 10) : null, - ]); - } - } - - return [ - active, - toProcess, - ]; - } - - return [[], []]; + return [ + // We don't return active queries, because it's useless + // There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl + // Cube Store provides strict guarantees that queue item cannot be active & pending in the same time + [], + await this.getToProcessQueries() + ]; } public async getNextProcessingId(): Promise { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 7a37e20e5a00f..9c0dc41b1fb84 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -547,6 +547,13 @@ export class QueryQueue { } })); + /** + * There is a bug somewhere in Redis (maybe in memory too?), + * which doesn't remove queue item from pending, while it's in active state + * + * TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time + * TODO(ovr): Migrate to getToProcessQueries after removal of Redis + */ const [active, toProcess] = await queueConnection.getActiveAndToProcess(); await Promise.all(