From e322e932e3086c7ad5202621ed449b45a3c679bf Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 23 Jan 2023 22:56:15 +0300 Subject: [PATCH 01/10] feat(cubestore-driver): Support persistent flag for query --- package.json | 3 +-- packages/cubejs-backend-shared/package.json | 3 ++- packages/cubejs-backend-shared/src/index.ts | 1 + packages/cubejs-backend-shared/src/process.ts | 13 +++++++++++++ .../src/queue-driver.interface.ts | 4 +++- .../src/CubeStoreQueueDriver.ts | 11 +++++++++-- packages/cubejs-query-orchestrator/package.json | 3 +-- .../src/orchestrator/QueryQueue.js | 7 +++---- .../src/orchestrator/utils.ts | 17 +++-------------- .../test/unit/QueryQueue.abstract.ts | 17 +++++++++++------ yarn.lock | 11 ----------- 11 files changed, 47 insertions(+), 43 deletions(-) create mode 100644 packages/cubejs-backend-shared/src/process.ts diff --git a/package.json b/package.json index e64fc283eb3cc..6f028f4050327 100644 --- a/package.json +++ b/package.json @@ -73,8 +73,7 @@ }, "resolutions": { "@types/node": "^12", - "@types/ramda": "0.27.40", - "rc-tree": "4.1.5" + "@types/ramda": "0.27.40" }, "license": "MIT", "packageManager": "yarn@1.22.19" diff --git a/packages/cubejs-backend-shared/package.json b/packages/cubejs-backend-shared/package.json index 7a74aa87fd2f9..79a2e4769e0f8 100644 --- a/packages/cubejs-backend-shared/package.json +++ b/packages/cubejs-backend-shared/package.json @@ -47,7 +47,8 @@ "moment-timezone": "^0.5.33", "node-fetch": "^2.6.1", "shelljs": "^0.8.5", - "throttle-debounce": "^3.0.1" + "throttle-debounce": "^3.0.1", + "uuid": "^8.3.2" }, "publishConfig": { "access": "public" diff --git a/packages/cubejs-backend-shared/src/index.ts b/packages/cubejs-backend-shared/src/index.ts index 7f250060ddfe7..26e3116249779 100644 --- a/packages/cubejs-backend-shared/src/index.ts +++ b/packages/cubejs-backend-shared/src/index.ts @@ -17,3 +17,4 @@ export * from './http-utils'; export * from './cli'; export * from './proxy'; export * from './time'; +export * from './process'; diff --git a/packages/cubejs-backend-shared/src/process.ts b/packages/cubejs-backend-shared/src/process.ts new file mode 100644 index 0000000000000..c37b2c185e474 --- /dev/null +++ b/packages/cubejs-backend-shared/src/process.ts @@ -0,0 +1,13 @@ +import { v1, v5 } from 'uuid'; + +/** + * Unique process ID (aka 00000000-0000-0000-0000-000000000000). + */ +const processUid = v5(v1(), v1()).toString(); + +/** + * Returns unique process ID. + */ +export function getProcessUid(): string { + return processUid; +} diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 44bdf41156ed7..b64d7f84e199c 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -1,5 +1,7 @@ export type QueryDef = unknown; -export type QueryKey = string | [string, any[]]; +export type QueryKey = (string | [string, any[]]) & { + persistent?: true, +}; export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number]; export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record]; diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index 90e5013305925..405d066c3b80b 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -1,3 +1,4 @@ +import crypto from 'crypto'; import { QueueDriverInterface, QueueDriverConnectionInterface, @@ -8,12 +9,18 @@ import { AddToQueueQuery, AddToQueueOptions, AddToQueueResponse, QueryKey, } from '@cubejs-backend/base-driver'; +import { getProcessUid } from '@cubejs-backend/shared'; -import crypto from 'crypto'; import { CubeStoreDriver } from './CubeStoreDriver'; function hashQueryKey(queryKey: QueryKey) { - return crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex'); + const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex'); + + if (typeof queryKey === 'object' && queryKey.persistent) { + return `${hash}@${getProcessUid()}`; + } + + return hash; } class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { diff --git a/packages/cubejs-query-orchestrator/package.json b/packages/cubejs-query-orchestrator/package.json index c8ec3f422a76d..1acfe9d6a1519 100644 --- a/packages/cubejs-query-orchestrator/package.json +++ b/packages/cubejs-query-orchestrator/package.json @@ -43,8 +43,7 @@ "moment-range": "^4.0.2", "moment-timezone": "^0.5.33", "ramda": "^0.27.2", - "redis": "^3.0.2", - "uuid": "^8.3.2" + "redis": "^3.0.2" }, "devDependencies": { "@cubejs-backend/linter": "^0.31.0", diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 1636da15a3aec..b74028452a94a 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -1,13 +1,12 @@ import R from 'ramda'; -import { getEnv } from '@cubejs-backend/shared'; +import { getEnv, getProcessUid } from '@cubejs-backend/shared'; import { QueueDriverInterface } from '@cubejs-backend/base-driver'; -import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; +import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; import { TimeoutError } from './TimeoutError'; import { ContinueWaitError } from './ContinueWaitError'; import { RedisQueueDriver } from './RedisQueueDriver'; import { LocalQueueDriver } from './LocalQueueDriver'; -import { getProcessUid } from './utils'; import { QueryStream } from './QueryStream'; /** @@ -559,7 +558,7 @@ export class QueryQueue { R.pipe( R.filter(p => { if (active.indexOf(p) === -1) { - const subKeys = p.split('::'); + const subKeys = p.split('@'); if (subKeys.length === 1) { // common queries return true; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts index 0910323178825..b98119949a914 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts @@ -1,8 +1,9 @@ /* eslint-disable no-restricted-syntax */ import * as querystring from 'querystring'; -import { v1, v5 } from 'uuid'; import crypto from 'crypto'; +import { getProcessUid } from '@cubejs-backend/shared'; + function parseHostPort(addr: string): { host: string, port: number } { if (addr.includes(':')) { const parts = addr.split(':'); @@ -178,18 +179,6 @@ export function parseRedisUrl(url: Readonly): RedisParsedResult { return parseUrl(url, result, parseHostPartBasic); } -/** - * Unique process ID (aka 00000000-0000-0000-0000-000000000000). - */ -const processUid = v5(v1(), v1()).toString(); - -/** - * Returns unique process ID. - */ -export function getProcessUid(): string { - return processUid; -} - /** * Unique process ID regexp. */ @@ -208,7 +197,7 @@ export function getCacheHash(queryKey) { .digest('hex') }${ typeof queryKey === 'object' && queryKey.persistent - ? `::${getProcessUid()}` + ? `@${getProcessUid()}` : '' }`; } diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index a294b60ac9bdb..6b02cd1933074 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -171,8 +171,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} await queue.executeInQueue('delay', '114', { delay: 50, result: '4' }, 0); }); - // TODO: CubeStore queue support - nonCubeStoreTest('queue hash process persistent flag properly', () => { + test('queue hash process persistent flag properly', () => { const query = ['select * from table']; const key1 = queue.redisHash(query); // @ts-ignore @@ -184,11 +183,17 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} const key4 = queue.redisHash(query); expect(key1).toEqual(key2); - expect(key1.split('::').length).toBe(1); + expect(key1.split('@').length).toBe(1); + expect(key3).toEqual(key4); - expect(key3.split('::').length).toBe(2); - expect(processUidRE.test(key3.split('::')[1])).toBeTruthy(); - expect(queue.redisHash('string')).toBe('string'); + expect(key3.split('@').length).toBe(2); + expect(processUidRE.test(key3.split('@')[1])).toBeTruthy(); + + if (options.cacheAndQueueDriver === 'cubestore') { + expect(queue.redisHash('string')).toBe('095d71cf12556b9d5e330ad575b3df5d'); + } else { + expect(queue.redisHash('string')).toBe('string'); + } }); test('removed before reconciled', async () => { diff --git a/yarn.lock b/yarn.lock index 46433d354fd39..2efb702f6af5f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -23889,17 +23889,6 @@ rc-tree-select@~4.3.0: rc-tree "^4.0.0" rc-util "^5.0.5" -rc-tree@4.1.5, rc-tree@^4.0.0, rc-tree@~4.2.1: - version "4.1.5" - resolved "https://registry.yarnpkg.com/rc-tree/-/rc-tree-4.1.5.tgz#734ab1bfe835e78791be41442ca0e571147ab6fa" - integrity sha512-q2vjcmnBDylGZ9/ZW4F9oZMKMJdbFWC7um+DAQhZG1nqyg1iwoowbBggUDUaUOEryJP+08bpliEAYnzJXbI5xQ== - dependencies: - "@babel/runtime" "^7.10.1" - classnames "2.x" - rc-motion "^2.0.1" - rc-util "^5.0.0" - rc-virtual-list "^3.0.1" - rc-trigger@^5.0.0, rc-trigger@^5.0.4, rc-trigger@^5.1.2, rc-trigger@^5.2.10: version "5.2.10" resolved "https://registry.yarnpkg.com/rc-trigger/-/rc-trigger-5.2.10.tgz#8a0057a940b1b9027eaa33beec8a6ecd85cce2b1" From c95cd738bf49657565ff1643c0e4aec5a23bde79 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 25 Jan 2023 19:57:59 +0300 Subject: [PATCH 02/10] chore: dont use redisHash twice --- .../src/orchestrator/QueryQueue.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index b74028452a94a..1a879f6a301e8 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -750,7 +750,7 @@ export class QueryQueue { const activated = activeKeys && activeKeys.indexOf(this.redisHash(queryKey)) !== -1; if (!query) { - query = await queueConnection.getQueryDef(this.redisHash(queryKey)); + query = await queueConnection.getQueryDef(queryKey); } if (query && insertedCount && activated && processingLockAcquired) { @@ -778,10 +778,9 @@ export class QueryQueue { ); try { const handler = query?.queryHandler; - let target; switch (handler) { case 'stream': - target = this.getQueryStream(this.redisHash(queryKey)); + const target = this.getQueryStream(queryKey); await this.queryTimeout(this.queryHandlers.stream(query.query, target)); break; default: @@ -898,7 +897,7 @@ export class QueryQueue { }); // closing stream if (query?.queryHandler === 'stream') { - const stream = this.getQueryStream(this.redisHash(queryKey)); + const stream = this.getQueryStream(queryKey); stream.destroy(); } const currentProcessingId = await queueConnection.freeProcessingLock(queryKey, processingId, activated); From e8b565f2430561af2b7931858a0ef0fd7dd9dbab Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 25 Jan 2023 19:57:59 +0300 Subject: [PATCH 03/10] chore: dont use redisHash twice --- .../src/CubeStoreQueueDriver.ts | 6 +- .../src/orchestrator/QueryCache.ts | 4 +- .../src/orchestrator/QueryQueue.js | 65 +++++++++++-------- 3 files changed, 43 insertions(+), 32 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index 405d066c3b80b..fc5e5ddb0b213 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -244,14 +244,14 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { // nothing to release } - public async retrieveForProcessing(queryKey: string, _processingId: string): Promise { + public async retrieveForProcessing(queryKeyHashed: string, _processingId: string): Promise { const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [ this.options.concurrency, - this.prefixKey(queryKey), + this.prefixKey(queryKeyHashed), ]); if (rows && rows.length) { const addedCount = 1; - const active = [this.redisHash(queryKey)]; + const active = [queryKeyHashed]; const toProcess = 0; const lockAcquired = true; const def = this.decodeQueryDefFromRow(rows[0], 'retrieveForProcessing'); diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index b50ea960b01ab..f0c07908b0264 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -477,11 +477,11 @@ export class QueryCache { if (!persistent) { return queue.executeInQueue('query', cacheKey, _query, priority, opt); } else { - const _stream = queue.getQueryStream(cacheKey, aliasNameToMember); + const stream = queue.setQueryStream(cacheKey, aliasNameToMember); // we don't want to handle error here as we want it to bubble up // to the api gateway queue.executeInQueue('stream', cacheKey, _query, priority, opt); - return _stream; + return stream; } } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 1a879f6a301e8..d41c9e47af7f7 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -139,20 +139,30 @@ export class QueryQueue { /** * Returns stream object which will be used to pipe data from data source. * + * @param {*} queryKeyHash + */ + getQueryStream(queryKeyHash) { + if (!this.streams.queued.has(queryKeyHash)) { + throw new Error(`Unable to find stream for persisted query with id: ${queryKeyHash}`); + } + + return this.streams.queued.get(queryKeyHash); + } + + /** * @param {*} queryKey * @param {{ [alias: string]: string }} aliasNameToMember */ - getQueryStream(queryKey, aliasNameToMember) { + setQueryStream(queryKey, aliasNameToMember) { const key = this.redisHash(queryKey); - if (!this.streams.queued.has(key)) { - const _stream = new QueryStream({ - key, - maps: this.streams, - aliasNameToMember, - }); - this.streams.queued.set(key, _stream); - } - return this.streams.queued.get(key); + const stream = new QueryStream({ + key, + maps: this.streams, + aliasNameToMember, + }); + this.streams.queued.set(key, stream); + + return stream; } /** @@ -728,10 +738,10 @@ export class QueryQueue { * Processing query specified by the `queryKey`. This method incapsulate most * of the logic related with the queues updates, heartbeating, etc. * - * @param {string} queryKey + * @param {string} queryKeyHashed * @return {Promise<{ result: undefined | Object, error: string | undefined }>} */ - async processQuery(queryKey) { + async processQuery(queryKeyHashed) { const queueConnection = await this.queueDriver.createConnection(); let insertedCount; @@ -742,15 +752,15 @@ export class QueryQueue { let processingLockAcquired; try { const processingId = await queueConnection.getNextProcessingId(); - const retrieveResult = await queueConnection.retrieveForProcessing(queryKey, processingId); + const retrieveResult = await queueConnection.retrieveForProcessing(queryKeyHashed, processingId); if (retrieveResult) { [insertedCount, _removedCount, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult; } - const activated = activeKeys && activeKeys.indexOf(this.redisHash(queryKey)) !== -1; + const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1; if (!query) { - query = await queueConnection.getQueryDef(queryKey); + query = await queueConnection.getQueryDef(queryKeyHashed); } if (query && insertedCount && activated && processingLockAcquired) { @@ -770,18 +780,19 @@ export class QueryQueue { preAggregation: query.query?.preAggregation, addedToQueueTime: query.addedToQueueTime, }); - await queueConnection.optimisticQueryUpdate(queryKey, { startQueryTime }, processingId); + await queueConnection.optimisticQueryUpdate(queryKeyHashed, { startQueryTime }, processingId); const heartBeatTimer = setInterval( - () => queueConnection.updateHeartBeat(queryKey), + () => queueConnection.updateHeartBeat(queryKeyHashed), this.heartBeatInterval * 1000 ); try { const handler = query?.queryHandler; switch (handler) { case 'stream': - const target = this.getQueryStream(queryKey); - await this.queryTimeout(this.queryHandlers.stream(query.query, target)); + await this.queryTimeout( + this.queryHandlers.stream(query.query, this.getQueryStream(queryKeyHashed)) + ); break; default: executionResult = { @@ -790,7 +801,7 @@ export class QueryQueue { query.query, async (cancelHandler) => { try { - return queueConnection.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId); + return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId); } catch (e) { this.logger('Error while query update', { queryKey: query.queryKey, @@ -846,7 +857,7 @@ export class QueryQueue { error: (e.stack || e).toString() }); if (e instanceof TimeoutError) { - const queryWithCancelHandle = await queueConnection.getQueryDef(queryKey); + const queryWithCancelHandle = await queueConnection.getQueryDef(queryKeyHashed); if (queryWithCancelHandle) { this.logger('Cancelling query due to timeout', { processingId, @@ -866,7 +877,7 @@ export class QueryQueue { clearInterval(heartBeatTimer); - if (!(await queueConnection.setResultAndRemoveQuery(queryKey, executionResult, processingId))) { + if (!(await queueConnection.setResultAndRemoveQuery(queryKeyHashed, executionResult, processingId))) { this.logger('Orphaned execution result', { processingId, warn: 'Result for query was not set due to processing lock wasn\'t acquired', @@ -885,7 +896,7 @@ export class QueryQueue { } else { this.logger('Skip processing', { processingId, - queryKey: query && query.queryKey || queryKey, + queryKey: query && query.queryKey || queryKeyHashed, requestId: query && query.requestId, queuePrefix: this.redisQueuePrefix, processingLockAcquired, @@ -897,15 +908,15 @@ export class QueryQueue { }); // closing stream if (query?.queryHandler === 'stream') { - const stream = this.getQueryStream(queryKey); + const stream = this.getQueryStream(queryKeyHashed); stream.destroy(); } - const currentProcessingId = await queueConnection.freeProcessingLock(queryKey, processingId, activated); + const currentProcessingId = await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated); if (currentProcessingId) { this.logger('Skipping free processing lock', { processingId, currentProcessingId, - queryKey: query && query.queryKey || queryKey, + queryKey: query && query.queryKey || queryKeyHashed, requestId: query && query.requestId, queuePrefix: this.redisQueuePrefix, processingLockAcquired, @@ -919,7 +930,7 @@ export class QueryQueue { } } catch (e) { this.logger('Queue storage error', { - queryKey: query && query.queryKey || queryKey, + queryKey: query && query.queryKey || queryKeyHashed, requestId: query && query.requestId, error: (e.stack || e).toString(), queuePrefix: this.redisQueuePrefix From e6ab38fe8a0f8b8703303257ad8dffccf68156c4 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 25 Jan 2023 21:14:45 +0300 Subject: [PATCH 04/10] fix: specify empty object as result for streams --- .../cubejs-query-orchestrator/src/orchestrator/QueryQueue.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index d41c9e47af7f7..3113effe36876 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -793,6 +793,8 @@ export class QueryQueue { await this.queryTimeout( this.queryHandlers.stream(query.query, this.getQueryStream(queryKeyHashed)) ); + + executionResult = {}; break; default: executionResult = { From 3bd230362ed0c2fcf4fba03c2e4de9b4b513bdc8 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 25 Jan 2023 21:57:07 +0300 Subject: [PATCH 05/10] feat(cubestore): special handling ack with null --- .../src/CubeStoreQueueDriver.ts | 2 +- .../src/orchestrator/QueryQueue.js | 3 +- .../src/cachestore/cache_rocksstore.rs | 82 +++++++++++++------ .../cubestore/src/cachestore/lazy.rs | 2 +- .../cubestore/src/cachestore/listener.rs | 6 +- .../cubestore/src/cachestore/queue_item.rs | 9 +- rust/cubestore/cubestore/src/sql/mod.rs | 20 +---- rust/cubestore/cubestore/src/sql/parser.rs | 16 ++-- 8 files changed, 84 insertions(+), 56 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index fc5e5ddb0b213..9d77547990696 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -279,7 +279,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { public async setResultAndRemoveQuery(queryKey: string, executionResult: any, _processingId: any): Promise { await this.driver.query('QUEUE ACK ? ? ', [ this.prefixKey(queryKey), - JSON.stringify(executionResult) + executionResult ? JSON.stringify(executionResult) : executionResult ]); return true; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 3113effe36876..78ae3e1bee2fb 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -794,7 +794,8 @@ export class QueryQueue { this.queryHandlers.stream(query.query, this.getQueryStream(queryKeyHashed)) ); - executionResult = {}; + // CubeStore has special handling for null + executionResult = null; break; default: executionResult = { diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 7662da003df0a..df7f4be241b4b 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -3,7 +3,7 @@ use crate::cachestore::cache_item::{ }; use crate::cachestore::queue_item::{ QueueItem, QueueItemIndexKey, QueueItemRocksIndex, QueueItemRocksTable, QueueItemStatus, - QueueResultAckEvent, + QueueResultAckEvent, QueueResultAckEventResult, }; use crate::cachestore::queue_result::{ QueueResultIndexKey, QueueResultRocksIndex, QueueResultRocksTable, @@ -28,6 +28,7 @@ use rocksdb::{Options, DB}; use crate::cachestore::compaction::CompactionPreloadedState; use crate::cachestore::listener::RocksCacheStoreListener; +use crate::table::{Row, TableValue}; use chrono::Utc; use itertools::Itertools; use log::trace; @@ -254,7 +255,7 @@ impl RocksCacheStore { result_schema.try_delete(queue_result.get_id(), batch_pipe)?; Ok(Some(QueueResultResponse::Success { - value: queue_result.row.value, + value: Some(queue_result.row.value), })) } else { Ok(None) @@ -283,7 +284,22 @@ pub struct QueueAddResponse { #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub enum QueueResultResponse { - Success { value: String }, + Success { value: Option }, +} + +impl QueueResultResponse { + pub fn into_queue_result_row(self) -> Row { + match self { + QueueResultResponse::Success { value } => Row::new(vec![ + if let Some(v) = value { + TableValue::String(v) + } else { + TableValue::Null + }, + TableValue::String("success".to_string()), + ]), + } + } } #[cuberpc::service] @@ -325,7 +341,7 @@ pub trait CacheStore: DIService + Send + Sync { key: String, allow_concurrency: u32, ) -> Result>, CubeError>; - async fn queue_ack(&self, key: String, result: String) -> Result<(), CubeError>; + async fn queue_ack(&self, key: String, result: Option) -> Result<(), CubeError>; async fn queue_result(&self, key: String) -> Result, CubeError>; async fn queue_result_blocking( &self, @@ -696,7 +712,7 @@ impl CacheStore for RocksCacheStore { .await } - async fn queue_ack(&self, path: String, result: String) -> Result<(), CubeError> { + async fn queue_ack(&self, path: String, result: Option) -> Result<(), CubeError> { self.store .write_operation(move |db_ref, batch_pipe| { let queue_schema = QueueItemRocksTable::new(db_ref.clone()); @@ -708,14 +724,23 @@ impl CacheStore for RocksCacheStore { if let Some(item_row) = item_row { queue_schema.delete(item_row.get_id(), batch_pipe)?; - let queue_result = QueueResult::new(path.clone(), result.clone()); - let result_row = result_schema.insert(queue_result, batch_pipe)?; - - batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { - row_id: result_row.get_id(), - path, - result, - })); + if let Some(result) = result { + let queue_result = QueueResult::new(path.clone(), result.clone()); + let result_row = result_schema.insert(queue_result, batch_pipe)?; + + batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { + path, + result: QueueResultAckEventResult::WithResult { + row_id: result_row.get_id(), + result, + }, + })); + } else { + batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { + path, + result: QueueResultAckEventResult::Empty {}, + })); + } Ok(()) } else { @@ -750,18 +775,23 @@ impl CacheStore for RocksCacheStore { if let Ok(res) = fut.await { match res { - Ok(Some(ack_event)) => { - self.store - .write_operation(move |db_ref, batch_pipe| { - let queue_schema = QueueResultRocksTable::new(db_ref.clone()); - queue_schema.try_delete(ack_event.row_id, batch_pipe)?; - - Ok(Some(QueueResultResponse::Success { - value: ack_event.result, - })) - }) - .await - } + Ok(Some(ack_event)) => match ack_event.result { + QueueResultAckEventResult::Empty => { + Ok(Some(QueueResultResponse::Success { value: None })) + } + QueueResultAckEventResult::WithResult { row_id, result } => { + self.store + .write_operation(move |db_ref, batch_pipe| { + let queue_schema = QueueResultRocksTable::new(db_ref.clone()); + queue_schema.try_delete(row_id, batch_pipe)?; + + Ok(Some(QueueResultResponse::Success { + value: Some(result), + })) + }) + .await + } + }, Ok(None) => Ok(None), Err(e) => Err(e), } @@ -899,7 +929,7 @@ impl CacheStore for ClusterCacheStoreClient { panic!("CacheStore cannot be used on the worker node! queue_retrieve was used.") } - async fn queue_ack(&self, _key: String, _result: String) -> Result<(), CubeError> { + async fn queue_ack(&self, _key: String, _result: Option) -> Result<(), CubeError> { panic!("CacheStore cannot be used on the worker node! queue_ack was used.") } diff --git a/rust/cubestore/cubestore/src/cachestore/lazy.rs b/rust/cubestore/cubestore/src/cachestore/lazy.rs index 1821b22d1143a..310e2abc59cfb 100644 --- a/rust/cubestore/cubestore/src/cachestore/lazy.rs +++ b/rust/cubestore/cubestore/src/cachestore/lazy.rs @@ -259,7 +259,7 @@ impl CacheStore for LazyRocksCacheStore { .await } - async fn queue_ack(&self, key: String, result: String) -> Result<(), CubeError> { + async fn queue_ack(&self, key: String, result: Option) -> Result<(), CubeError> { self.init().await?.queue_ack(key, result).await } diff --git a/rust/cubestore/cubestore/src/cachestore/listener.rs b/rust/cubestore/cubestore/src/cachestore/listener.rs index 63f93a13decf4..a044138235ae7 100644 --- a/rust/cubestore/cubestore/src/cachestore/listener.rs +++ b/rust/cubestore/cubestore/src/cachestore/listener.rs @@ -18,9 +18,9 @@ impl RocksCacheStoreListener { ) -> Result, CubeError> { loop { let event = self.receiver.recv().await?; - if let MetaStoreEvent::AckQueueItem(payload) = event { - if payload.path == path { - return Ok(Some(payload)); + if let MetaStoreEvent::AckQueueItem(ack_event) = event { + if ack_event.path == path { + return Ok(Some(ack_event)); } } } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index 32a3d78103759..58c1b656ed77e 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -25,11 +25,16 @@ fn merge(a: serde_json::Value, b: serde_json::Value) -> Option { let ack_result = self.cachestore.queue_result(key.value).await?; let rows = if let Some(ack_result) = ack_result { - match ack_result { - QueueResultResponse::Success { value } => { - vec![Row::new(vec![ - TableValue::String(value), - TableValue::String("success".to_string()), - ])] - } - } + vec![ack_result.into_queue_result_row()] } else { vec![] }; @@ -1366,14 +1359,7 @@ impl SqlService for SqlServiceImpl { .await?; let rows = if let Some(ack_result) = ack_result { - match ack_result { - QueueResultResponse::Success { value } => { - vec![Row::new(vec![ - TableValue::String(value), - TableValue::String("success".to_string()), - ])] - } - } + vec![ack_result.into_queue_result_row()] } else { vec![] }; diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index 3e4c466b22404..6d016086a7908 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -119,7 +119,7 @@ pub enum QueueCommand { }, Ack { key: Ident, - result: String, + result: Option, }, MergeExtra { key: Ident, @@ -362,10 +362,16 @@ impl<'a> CubeStoreParser<'a> { "heartbeat" => QueueCommand::Heartbeat { key: self.parser.parse_identifier()?, }, - "ack" => QueueCommand::Ack { - key: self.parser.parse_identifier()?, - result: self.parser.parse_literal_string()?, - }, + "ack" => { + let key = self.parser.parse_identifier()?; + let result = if self.parser.parse_keyword(Keyword::NULL) { + None + } else { + Some(self.parser.parse_literal_string()?) + }; + + QueueCommand::Ack { key, result } + } "merge_extra" => QueueCommand::MergeExtra { key: self.parser.parse_identifier()?, payload: self.parser.parse_literal_string()?, From 7e642852f73628fbd852272d359e76b8b58a9456 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 26 Jan 2023 01:15:56 +0300 Subject: [PATCH 06/10] test(query-orchestrator): stream --- .../test/unit/QueryQueue.abstract.ts | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 6b02cd1933074..97dcfc64b9e7d 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -1,5 +1,6 @@ import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { pausePromise } from '@cubejs-backend/shared'; +import type { QueryKey } from '@cubejs-backend/base-driver'; import { QueryQueue } from '../../src'; import { processUidRE } from '../../src/orchestrator/utils'; @@ -14,7 +15,11 @@ export type QueryQueueTestOptions = { export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}) => { describe(`QueryQueue${name}`, () => { let delayCount = 0; + let streamCount = 0; + const delayFn = (result, delay) => new Promise(resolve => setTimeout(() => resolve(result), delay)); + const logger = jest.fn((message, event) => console.log(`${message} ${JSON.stringify(event)}`)); + let cancelledQuery; const queue = new QueryQueue('test_query_queue', { queryHandlers: { @@ -24,7 +29,10 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} delayCount += 1; await setCancelHandler(result); return delayFn(result, query.delay); - } + }, + stream: async () => { + streamCount++; + }, }, cancelHandlers: { delay: (query) => { @@ -36,7 +44,8 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} executionTimeout: 2, orphanedTimeout: 2, concurrency: 1, - ...options + ...options, + logger, }); if (options?.cacheAndQueueDriver === 'cubestore') { @@ -44,6 +53,8 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} afterEach(async () => { await queue.shutdown(); await pausePromise(2500); + + logger.mockClear(); }); } @@ -196,6 +207,22 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} } }); + test('stream handler', async () => { + const key: QueryKey = ['select * from table', []]; + key.persistent = true; + + queue.setQueryStream(key, {}); + + await queue.executeInQueue('stream', key, { }, 0); + await queue.shutdown(); + // TODO(ovr): await with shutdown + await pausePromise(1500); + + expect(streamCount).toEqual(1); + expect(logger.mock.calls.length).toEqual(3); + expect(logger.mock.calls[2][0]).toEqual('Performing query completed'); + }); + test('removed before reconciled', async () => { const query = ['select * from']; const key = queue.redisHash(query); From c139999239e167798572f8086a7c735ce5fff74c Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 26 Jan 2023 02:02:31 +0300 Subject: [PATCH 07/10] test --- .../test/unit/QueryQueue.abstract.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 97dcfc64b9e7d..f371076a29694 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -48,15 +48,19 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} logger, }); - if (options?.cacheAndQueueDriver === 'cubestore') { - // TODO: Find all problems with queue - afterEach(async () => { + afterEach(async () => { + if (options?.cacheAndQueueDriver === 'cubestore') { await queue.shutdown(); + // TODO(ovr): Await with shutdown await pausePromise(2500); + } - logger.mockClear(); - }); - } + logger.mockClear(); + }); + + beforeEach(async () => { + logger.mockClear(); + }); afterAll(async () => { await queue.shutdown(); From 35546c99a25279ee0bf303eb6b45a6fc47ea9365 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 26 Jan 2023 11:21:10 +0300 Subject: [PATCH 08/10] test: correct await processing for queue tests --- .../test/unit/QueryQueue.abstract.ts | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index f371076a29694..91c44992fac35 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -1,6 +1,6 @@ import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; -import { pausePromise } from '@cubejs-backend/shared'; import type { QueryKey } from '@cubejs-backend/base-driver'; +import { pausePromise } from '@cubejs-backend/shared'; import { QueryQueue } from '../../src'; import { processUidRE } from '../../src/orchestrator/utils'; @@ -14,13 +14,15 @@ export type QueryQueueTestOptions = { export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}) => { describe(`QueryQueue${name}`, () => { - let delayCount = 0; - let streamCount = 0; - const delayFn = (result, delay) => new Promise(resolve => setTimeout(() => resolve(result), delay)); const logger = jest.fn((message, event) => console.log(`${message} ${JSON.stringify(event)}`)); + let delayCount = 0; + let streamCount = 0; + let processMessagePromises = []; + let processCancelPromises = []; let cancelledQuery; + const queue = new QueryQueue('test_query_queue', { queryHandlers: { foo: async (query) => `${query[0]} bar`, @@ -34,6 +36,12 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} streamCount++; }, }, + sendProcessMessageFn: async (queryKeyHashed) => { + processMessagePromises.push(queue.processQuery.bind(queue)(queryKeyHashed)); + }, + sendCancelMessageFn: async (query) => { + processCancelPromises.push(queue.processCancel.bind(queue)(query)); + }, cancelHandlers: { delay: (query) => { console.log(`cancel call: ${JSON.stringify(query)}`); @@ -48,22 +56,29 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} logger, }); - afterEach(async () => { - if (options?.cacheAndQueueDriver === 'cubestore') { - await queue.shutdown(); - // TODO(ovr): Await with shutdown - await pausePromise(2500); - } + async function awaitProcessing() { + await queue.shutdown(); + await Promise.all(processMessagePromises); + await Promise.all(processCancelPromises); + // stdout conflict with console.log + await pausePromise(100); - logger.mockClear(); + processMessagePromises = []; + processCancelPromises = []; + } + + afterEach(async () => { + await awaitProcessing(); }); - beforeEach(async () => { + beforeEach(() => { logger.mockClear(); + delayCount = 0; + streamCount = 0; }); afterAll(async () => { - await queue.shutdown(); + await awaitProcessing(); if (options?.afterAll) { await options?.afterAll(); @@ -93,7 +108,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('priority', async () => { - delayCount = 0; const result = await Promise.all([ queue.executeInQueue('delay', '11', { delay: 600, result: '1' }, 1), queue.executeInQueue('delay', '12', { delay: 100, result: '2' }, 0), @@ -103,7 +117,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('timeout', async () => { - delayCount = 0; const query = ['select * from 2']; let errorString = ''; @@ -124,7 +137,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('stage reporting', async () => { - delayCount = 0; const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, { stageQueryKey: '1' }); await delayFn(null, 50); expect((await queue.getQueryStage('1')).stage).toBe('Executing query'); @@ -133,7 +145,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('priority stage reporting', async () => { - delayCount = 0; const resultPromise1 = queue.executeInQueue('delay', '31', { delay: 200, result: '1' }, 20, { stageQueryKey: '12' }); await delayFn(null, 50); const resultPromise2 = queue.executeInQueue('delay', '32', { delay: 200, result: '1' }, 10, { stageQueryKey: '12' }); @@ -146,7 +157,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} }); test('negative priority', async () => { - delayCount = 0; const results = []; queue.executeInQueue('delay', '31', { delay: 400, result: '4' }, -10); @@ -218,9 +228,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} queue.setQueryStream(key, {}); await queue.executeInQueue('stream', key, { }, 0); - await queue.shutdown(); - // TODO(ovr): await with shutdown - await pausePromise(1500); + await awaitProcessing(); expect(streamCount).toEqual(1); expect(logger.mock.calls.length).toEqual(3); From fd1914594b1d6d5e5de5fc87e50d65c63e2bc473 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 26 Jan 2023 11:46:48 +0300 Subject: [PATCH 09/10] test: move timing --- .../test/unit/QueryQueue.abstract.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 91c44992fac35..a8a40ef578c2b 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -60,8 +60,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} await queue.shutdown(); await Promise.all(processMessagePromises); await Promise.all(processCancelPromises); - // stdout conflict with console.log - await pausePromise(100); processMessagePromises = []; processCancelPromises = []; @@ -79,6 +77,8 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} afterAll(async () => { await awaitProcessing(); + // stdout conflict with console.log + await pausePromise(100); if (options?.afterAll) { await options?.afterAll(); From 1af10e34391f3a6080a353467d6469d7f4687f9d Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 26 Jan 2023 11:48:23 +0300 Subject: [PATCH 10/10] chore: cube store supports orphaned test --- .../test/unit/QueryQueue.abstract.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index a8a40ef578c2b..8861037093036 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -172,10 +172,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} expect(results.map(r => parseInt(r[0], 10) - parseInt(results[0][0], 10))).toEqual([0, 1, 2]); }); - const nonCubeStoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest; - - // TODO: CubeStore queue support - nonCubeStoreTest('orphaned', async () => { + test('orphaned', async () => { for (let i = 1; i <= 4; i++) { await queue.executeInQueue('delay', `11${i}`, { delay: 50, result: `${i}` }, 0); } @@ -243,6 +240,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} expect(result).toBe('select * from bar'); }); + const nonCubeStoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest; nonCubeStoreTest('queue driver lock obtain race condition', async () => { const redisClient: any = await queue.queueDriver.createConnection(); const redisClient2: any = await queue.queueDriver.createConnection();