From e20981dd9c6e1e43068cf5c6351e47301d8f3f90 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 10 Oct 2022 16:49:00 +0300 Subject: [PATCH 01/10] feat(query-orchestrator): Introduce CubeStoreCacheDriver --- .../src}/cache-driver.interface.ts | 0 packages/cubejs-base-driver/src/index.ts | 2 + .../src/queue-driver.interface.ts | 23 ++++++ packages/cubejs-cubestore-driver/index.js | 18 ++--- packages/cubejs-cubestore-driver/package.json | 1 - .../src/CubeStoreCacheDriver.ts | 76 +++++++++++++++++++ .../src/CubeStoreDriver.ts | 9 +-- packages/cubejs-cubestore-driver/src/index.ts | 2 +- .../test/CubeStoreQuery.test.ts | 31 ++++---- .../cubejs-query-orchestrator/package.json | 4 +- .../src/orchestrator/BaseQueueDriver.ts | 7 +- .../src/orchestrator/LocalCacheDriver.ts | 3 +- .../src/orchestrator/PreAggregations.ts | 2 +- .../src/orchestrator/QueryCache.ts | 23 ++++-- .../src/orchestrator/QueryOrchestrator.ts | 4 +- .../src/orchestrator/QueryQueue.js | 34 ++++++--- .../src/orchestrator/RedisCacheDriver.ts | 2 +- .../cubestore/QueryCacheCubestore.test.ts | 8 ++ .../{ => redis}/QueryCacheRedis.test.ts | 4 +- .../{ => redis}/QueryQueueRedis.test.ts | 4 +- .../src/adapter}/CubeStoreQuery.ts | 4 +- .../src/adapter/QueryBuilder.js | 2 + .../src/adapter/index.ts | 1 + .../src/core/OptsHandler.ts | 6 +- tsconfig.json | 12 +-- 25 files changed, 208 insertions(+), 74 deletions(-) rename packages/{cubejs-query-orchestrator/src/orchestrator => cubejs-base-driver/src}/cache-driver.interface.ts (100%) create mode 100644 packages/cubejs-base-driver/src/queue-driver.interface.ts create mode 100644 packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts create mode 100644 packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts rename packages/cubejs-query-orchestrator/test/integration/{ => redis}/QueryCacheRedis.test.ts (83%) rename packages/cubejs-query-orchestrator/test/integration/{ => redis}/QueryQueueRedis.test.ts (83%) rename packages/{cubejs-cubestore-driver/src => cubejs-schema-compiler/src/adapter}/CubeStoreQuery.ts (98%) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/cache-driver.interface.ts b/packages/cubejs-base-driver/src/cache-driver.interface.ts similarity index 100% rename from packages/cubejs-query-orchestrator/src/orchestrator/cache-driver.interface.ts rename to packages/cubejs-base-driver/src/cache-driver.interface.ts diff --git a/packages/cubejs-base-driver/src/index.ts b/packages/cubejs-base-driver/src/index.ts index ef9ec2c4b1939..84e8d26c5e63b 100644 --- a/packages/cubejs-base-driver/src/index.ts +++ b/packages/cubejs-base-driver/src/index.ts @@ -1,3 +1,5 @@ export * from './BaseDriver'; export * from './utils'; export * from './driver.interface'; +export * from './queue-driver.interface'; +export * from './cache-driver.interface'; diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts new file mode 100644 index 0000000000000..1f3d9790abe9f --- /dev/null +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -0,0 +1,23 @@ +export interface LocalQueueDriverConnectionInterface { + getResultBlocking(queryKey: string): Promise; + getResult(queryKey: string): Promise; + addToQueue(queryKey: string): Promise; + getToProcessQueries(): Promise; + getActiveQueries(): Promise; + getOrphanedQueries(): Promise; + getStalledQueries(): Promise; + getQueryStageState(onlyKeys: any): Promise; + updateHeartBeat(queryKey: string): Promise; + getNextProcessingId(): Promise; + retrieveForProcessing(queryKey: string, processingId: string): Promise; + freeProcessingLock(queryKe: string, processingId: string, activated: unknown): Promise; + optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise; + cancelQuery(queryKey: string): Promise; + setResultAndRemoveQuery(queryKey: string, executionResult: any, processingId: any): Promise; + release(): Promise; +} + +export interface QueueDriverInterface { + createConnection(): Promise; + release(connection: LocalQueueDriverConnectionInterface): Promise; +} diff --git a/packages/cubejs-cubestore-driver/index.js b/packages/cubejs-cubestore-driver/index.js index 8437347185946..3a614c05b3beb 100644 --- a/packages/cubejs-cubestore-driver/index.js +++ b/packages/cubejs-cubestore-driver/index.js @@ -1,17 +1,15 @@ +const fromExports = require('./dist/src'); const { CubeStoreDriver } = require('./dist/src/CubeStoreDriver'); -const { CubeStoreDevDriver } = require('./dist/src/CubeStoreDevDriver'); -const { isCubeStoreSupported, CubeStoreHandler } = require('./dist/src/rexport'); /** * After 5 years working with TypeScript, now I know * that commonjs and nodejs require is not compatibility with using export default */ -module.exports = CubeStoreDriver; +const toExport = CubeStoreDriver; -/** - * It's needed to move our CLI to destructing style on import - * Please sync this file with src/index.ts - */ -module.exports.CubeStoreDevDriver = CubeStoreDevDriver; -module.exports.isCubeStoreSupported = isCubeStoreSupported; -module.exports.CubeStoreHandler = CubeStoreHandler; +// eslint-disable-next-line no-restricted-syntax +for (const [key, module] of Object.entries(fromExports)) { + toExport[key] = module; +} + +module.exports = toExport; diff --git a/packages/cubejs-cubestore-driver/package.json b/packages/cubejs-cubestore-driver/package.json index b5c27a4cf82b3..bd1cff4bcdfa7 100644 --- a/packages/cubejs-cubestore-driver/package.json +++ b/packages/cubejs-cubestore-driver/package.json @@ -31,7 +31,6 @@ "dependencies": { "@cubejs-backend/base-driver": "^0.31.32", "@cubejs-backend/cubestore": "^0.31.32", - "@cubejs-backend/schema-compiler": "^0.31.32", "@cubejs-backend/shared": "^0.31.32", "csv-write-stream": "^2.0.0", "flatbuffers": "^1.12.0", diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts new file mode 100644 index 0000000000000..377d375f991d7 --- /dev/null +++ b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts @@ -0,0 +1,76 @@ +import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared'; +import { CacheDriverInterface } from '@cubejs-backend/base-driver'; + +import { CubeStoreDriver } from './CubeStoreDriver'; + +interface CubeStoreCacheDriverOptions {} + +export class CubeStoreCacheDriver implements CacheDriverInterface { + protected readonly connection: CubeStoreDriver; + + public constructor(options: CubeStoreCacheDriverOptions) { + this.connection = new CubeStoreDriver({}); + } + + public withLock = ( + key: string, + cb: () => MaybeCancelablePromise, + expiration: number = 60, + freeAfter: boolean = true, + ) => createCancelablePromise(async (tkn) => { + if (tkn.isCanceled()) { + return false; + } + + const rows = await this.connection.query(`CACHE SET NX TTL ${expiration} ? ?`, [key, '1']); + if (rows && rows.length === 1 && rows[0]?.success === 'true') { + try { + await tkn.with(cb()); + } finally { + if (freeAfter) { + await this.connection.query(`CACHE REMOVE "${key}"`, []); + } + } + + return true; + } + + return false; + }); + + public async get(key: string) { + const rows = await this.connection.query(`CACHE GET "${key}"`, []); + if (rows && rows.length === 1) { + return JSON.parse(rows[0].value); + } + + return null; + } + + public async set(key: string, value, expiration) { + const strValue = JSON.stringify(value); + await this.connection.query(`CACHE SET TTL ${expiration} ? ?`, [key, strValue]); + + return { + key, + bytes: Buffer.byteLength(strValue), + }; + } + + public async remove(key: string) { + await this.connection.query(`CACHE REMOVE "${key}"`, []); + } + + public async keysStartingWith(prefix: string) { + const rows = await this.connection.query(`CACHE KEYS "${prefix}"`, []); + return rows.map((row) => row.key); + } + + public async cleanup(): Promise { + // + } + + public async testConnection(): Promise { + return this.connection.testConnection(); + } +} diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index 9caf54f665f2e..71f780a2c2e55 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -10,15 +10,14 @@ import { ExternalCreateTableOptions, DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex, StreamTableData, - DriverCapabilities, StreamingSourceTableData, QueryOptions, + ExternalDriverCompatibilities, } from '@cubejs-backend/base-driver'; import { getEnv } from '@cubejs-backend/shared'; import { format as formatSql, escape } from 'sqlstring'; import fetch from 'node-fetch'; -import { CubeStoreQuery } from './CubeStoreQuery'; import { ConnectionConfig } from './types'; import { WebSocketConnection } from './WebSocketConnection'; @@ -390,11 +389,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { return this.createTableWithOptions(table, columns, options, queryTracingObj); } - public static dialectClass() { - return CubeStoreQuery; - } - - public capabilities(): DriverCapabilities { + public capabilities(): ExternalDriverCompatibilities { return { csvImport: true, streamImport: true, diff --git a/packages/cubejs-cubestore-driver/src/index.ts b/packages/cubejs-cubestore-driver/src/index.ts index 77fdd6ac28627..3783bc7eb435f 100644 --- a/packages/cubejs-cubestore-driver/src/index.ts +++ b/packages/cubejs-cubestore-driver/src/index.ts @@ -1,4 +1,4 @@ -export * from './CubeStoreQuery'; +export * from './CubeStoreCacheDriver'; export * from './CubeStoreDriver'; export * from './CubeStoreDevDriver'; export * from './rexport'; diff --git a/packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts b/packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts index e9ba296d9b529..9c81aac56d46b 100644 --- a/packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts +++ b/packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts @@ -1,17 +1,14 @@ -import { createQueryTestCase, CubeStoreDBRunner, QueryTestAbstract } from '@cubejs-backend/testing-shared'; -import { CubeStoreDriver, CubeStoreQuery } from '../src'; - -class CubeStoreQueryTest extends QueryTestAbstract { - public getQueryClass() { - return CubeStoreQuery; - } -} - -createQueryTestCase(new CubeStoreQueryTest(), { - name: 'CubeStore', - connectionFactory: (container) => new CubeStoreDriver({ - host: container.getHost(), - port: container.getMappedPort(3030) - }), - DbRunnerClass: CubeStoreDBRunner, -}); +// import { createQueryTestCase, CubeStoreDBRunner, QueryTestAbstract } from '@cubejs-backend/testing-shared'; +// import { CubeStoreDriver } from '../src'; +// +// class CubeStoreQueryTest extends QueryTestAbstract { +// } +// +// createQueryTestCase(new CubeStoreQueryTest(), { +// name: 'CubeStore', +// connectionFactory: (container) => new CubeStoreDriver({ +// host: container.getHost(), +// port: container.getMappedPort(3030) +// }), +// DbRunnerClass: CubeStoreDBRunner, +// }); diff --git a/packages/cubejs-query-orchestrator/package.json b/packages/cubejs-query-orchestrator/package.json index 72d4a07b1b4c9..29cd34fc9784d 100644 --- a/packages/cubejs-query-orchestrator/package.json +++ b/packages/cubejs-query-orchestrator/package.json @@ -20,7 +20,8 @@ "test": "npm run unit && npm run integration", "unit": "jest --runInBand --coverage --verbose dist/test/unit", "integration": "npm run integration:redis dist/test/integration", - "integration:redis": "jest --runInBand --verbose dist/test/integration", + "integration:redis": "jest --runInBand --verbose dist/test/integration/redis", + "integration:cubestore": "jest --runInBand --verbose dist/test/integration/cubestore", "lint": "eslint src/* test/* --ext .ts,.js", "lint:fix": "eslint --fix src/* test/* --ext .ts,.js" }, @@ -33,6 +34,7 @@ "dependencies": { "@cubejs-backend/base-driver": "^0.31.32", "@cubejs-backend/shared": "^0.31.32", + "@cubejs-backend/cubestore-driver": "^0.31.0", "csv-write-stream": "^2.0.0", "es5-ext": "0.10.53", "generic-pool": "^3.7.1", diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts index cfbf3f109e8dc..91930b02fee3a 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts @@ -1,7 +1,12 @@ import { getCacheHash } from './utils'; +import { LocalQueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver'; -export abstract class BaseQueueDriver { +export abstract class BaseQueueDriver implements QueueDriverInterface { public redisHash(queryKey) { return getCacheHash(queryKey); } + + abstract createConnection(): Promise; + + abstract release(connection: LocalQueueDriverConnectionInterface): Promise; } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts index 2bb1d9ca55244..49877b9894807 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts @@ -1,6 +1,5 @@ import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared'; - -import { CacheDriverInterface } from './cache-driver.interface'; +import { CacheDriverInterface } from '@cubejs-backend/base-driver'; interface ItemBucket { value: any, diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 83093f44be98d..678ef35b56319 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -23,7 +23,7 @@ import { InlineTable, SaveCancelFn, StreamOptions, - UnloadOptions, + UnloadOptions } from '@cubejs-backend/base-driver'; import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; import { ContinueWaitError } from './ContinueWaitError'; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 2e4f119c0c762..d354e7179ecaf 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -1,13 +1,13 @@ import csvWriter from 'csv-write-stream'; import LRUCache from 'lru-cache'; import { MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared'; +import { CubeStoreCacheDriver } from '@cubejs-backend/cubestore-driver'; +import { BaseDriver, InlineTables, CacheDriverInterface } from '@cubejs-backend/base-driver'; -import { BaseDriver, InlineTables } from '@cubejs-backend/base-driver'; import { QueryQueue } from './QueryQueue'; import { ContinueWaitError } from './ContinueWaitError'; import { RedisCacheDriver } from './RedisCacheDriver'; import { LocalCacheDriver } from './LocalCacheDriver'; -import { CacheDriverInterface } from './cache-driver.interface'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { PreAggregationDescription } from './PreAggregations'; import { getCacheHash } from './utils'; @@ -129,14 +129,25 @@ export class QueryCache { }>; redisPool?: any; continueWaitTimeout?: number; - cacheAndQueueDriver?: 'redis' | 'memory'; + cacheAndQueueDriver?: 'redis' | 'memory' | 'cubestore'; maxInMemoryCacheEntries?: number; skipExternalCacheAndQueue?: boolean; } = {} ) { - this.cacheDriver = options.cacheAndQueueDriver === 'redis' ? - new RedisCacheDriver({ pool: options.redisPool }) : - new LocalCacheDriver(); + switch (options.cacheAndQueueDriver || 'memory') { + case 'redis': + this.cacheDriver = new RedisCacheDriver({ pool: options.redisPool }); + break; + case 'memory': + this.cacheDriver = new LocalCacheDriver(); + break; + case 'cubestore': + this.cacheDriver = new CubeStoreCacheDriver({}); + break; + default: + throw new Error(`Unknown cache driver: ${options.cacheAndQueueDriver}`); + } + this.memoryCache = new LRUCache({ max: options.maxInMemoryCacheEntries || 10000 }); diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index e9f1cfa96ac51..fee68aba666ab 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -55,8 +55,8 @@ export class QueryOrchestrator { ); this.cacheAndQueueDriver = cacheAndQueueDriver; - if (!['redis', 'memory'].includes(cacheAndQueueDriver)) { - throw new Error('Only \'redis\' or \'memory\' are supported for cacheAndQueueDriver option'); + if (!['redis', 'memory', 'cubestore'].includes(cacheAndQueueDriver)) { + throw new Error('Only \'redis\', \'memory\' or \'cubestore\' are supported for cacheAndQueueDriver option'); } const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool(options.redisPoolOptions) : undefined; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 5c80e30f7ec46..f48114c11b2fe 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -83,12 +83,28 @@ export class QueryQueue { getQueueEventsBus: options.getQueueEventsBus }; - /** - * @type {LocalQueueDriver | RedisQueueDriver} - */ - this.queueDriver = options.cacheAndQueueDriver === 'redis' ? - new RedisQueueDriver(queueDriverOptions) : - new LocalQueueDriver(queueDriverOptions); + switch (options.cacheAndQueueDriver || 'memory') { + case 'redis': + /** + * @type {LocalQueueDriver | RedisQueueDriver} + */ + this.queueDriver = new RedisQueueDriver(queueDriverOptions); + break; + case 'memory': + /** + * @type {LocalQueueDriver | RedisQueueDriver} + */ + this.queueDriver = new LocalQueueDriver(queueDriverOptions); + break; + case 'cubestore': + /** + * @type {LocalQueueDriver | RedisQueueDriver} + */ + this.queueDriver = new LocalQueueDriver(queueDriverOptions); + break; + default: + throw new Error(`Unknown queue driver: ${options.cacheAndQueueDriver}`); + } /** * @type {boolean} @@ -151,7 +167,7 @@ export class QueryQueue { // query (initialized by the /cubejs-system/v1/pre-aggregations/jobs // endpoint). let result = !query.forceBuild && await redisClient.getResult(queryKey); - + if (result) { return this.parseResult(result); } @@ -208,7 +224,7 @@ export class QueryQueue { // Result here won't be fetched for a jobed build query (initialized by // the /cubejs-system/v1/pre-aggregations/jobs endpoint). result = !query.isJob && await redisClient.getResultBlocking(queryKey); - + // We don't want to throw the ContinueWaitError for a jobed build query. if (!query.isJob && !result) { throw new ContinueWaitError(); @@ -312,7 +328,7 @@ export class QueryQueue { status: [] }; } - + obj[query.queryKey].status.push(status); }); return obj; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts index cc8d840b151c0..17039ad0659c0 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts @@ -1,7 +1,7 @@ import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared'; +import { CacheDriverInterface } from '@cubejs-backend/base-driver'; import { RedisPool } from './RedisPool'; -import { CacheDriverInterface } from './cache-driver.interface'; interface RedisCacheDriverOptions { pool: RedisPool, diff --git a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts new file mode 100644 index 0000000000000..c717b140b27ae --- /dev/null +++ b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts @@ -0,0 +1,8 @@ +import { QueryCacheTest } from '../../unit/QueryCache.abstract'; + +QueryCacheTest( + 'Cubestore Cache Driver', + { + cacheAndQueueDriver: 'cubestore', + } +); diff --git a/packages/cubejs-query-orchestrator/test/integration/QueryCacheRedis.test.ts b/packages/cubejs-query-orchestrator/test/integration/redis/QueryCacheRedis.test.ts similarity index 83% rename from packages/cubejs-query-orchestrator/test/integration/QueryCacheRedis.test.ts rename to packages/cubejs-query-orchestrator/test/integration/redis/QueryCacheRedis.test.ts index 1851a003039e6..c1822f7dae96f 100644 --- a/packages/cubejs-query-orchestrator/test/integration/QueryCacheRedis.test.ts +++ b/packages/cubejs-query-orchestrator/test/integration/redis/QueryCacheRedis.test.ts @@ -1,6 +1,6 @@ import { getEnv } from '@cubejs-backend/shared'; -import { QueryCacheTest } from '../unit/QueryCache.abstract'; -import { RedisPool } from '../../src/orchestrator/RedisPool'; +import { QueryCacheTest } from '../../unit/QueryCache.abstract'; +import { RedisPool } from '../../../src/orchestrator/RedisPool'; function doRedisTest(useIORedis: boolean) { process.env.CUBEJS_REDIS_USE_IOREDIS = useIORedis; diff --git a/packages/cubejs-query-orchestrator/test/integration/QueryQueueRedis.test.ts b/packages/cubejs-query-orchestrator/test/integration/redis/QueryQueueRedis.test.ts similarity index 83% rename from packages/cubejs-query-orchestrator/test/integration/QueryQueueRedis.test.ts rename to packages/cubejs-query-orchestrator/test/integration/redis/QueryQueueRedis.test.ts index 6970074b2525e..e362810e677dc 100644 --- a/packages/cubejs-query-orchestrator/test/integration/QueryQueueRedis.test.ts +++ b/packages/cubejs-query-orchestrator/test/integration/redis/QueryQueueRedis.test.ts @@ -1,6 +1,6 @@ import { getEnv } from '@cubejs-backend/shared'; -import { QueryQueueTest } from '../unit/QueryQueue.abstract'; -import { RedisPool } from '../../src/orchestrator/RedisPool'; +import { QueryQueueTest } from '../../unit/QueryQueue.abstract'; +import { RedisPool } from '../../../src/orchestrator/RedisPool'; function doRedisTest(useIORedis: boolean) { process.env.CUBEJS_REDIS_USE_IOREDIS = useIORedis; diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQuery.ts b/packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts similarity index 98% rename from packages/cubejs-cubestore-driver/src/CubeStoreQuery.ts rename to packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts index 74e07c0764947..f146f34c5fe00 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts @@ -1,5 +1,7 @@ import moment from 'moment-timezone'; -import { BaseFilter, BaseQuery, BaseMeasure } from '@cubejs-backend/schema-compiler'; +import { BaseQuery } from './BaseQuery'; +import { BaseFilter } from './BaseFilter'; +import { BaseMeasure } from './BaseMeasure'; const GRANULARITY_TO_INTERVAL: Record = { day: 'day', diff --git a/packages/cubejs-schema-compiler/src/adapter/QueryBuilder.js b/packages/cubejs-schema-compiler/src/adapter/QueryBuilder.js index 9d25187d0be11..d3bf931980c6c 100644 --- a/packages/cubejs-schema-compiler/src/adapter/QueryBuilder.js +++ b/packages/cubejs-schema-compiler/src/adapter/QueryBuilder.js @@ -14,6 +14,7 @@ import { OracleQuery } from './OracleQuery'; import { SqliteQuery } from './SqliteQuery'; import { AWSElasticSearchQuery } from './AWSElasticSearchQuery'; import { ElasticSearchQuery } from './ElasticSearchQuery'; +import { CubeStoreQuery } from './CubeStoreQuery'; const ADAPTERS = { postgres: PostgresQuery, @@ -36,6 +37,7 @@ const ADAPTERS = { awselasticsearch: AWSElasticSearchQuery, elasticsearch: ElasticSearchQuery, materialize: PostgresQuery, + cubestore: CubeStoreQuery, }; export const queryClass = (dbType, dialectClass) => dialectClass || ADAPTERS[dbType]; diff --git a/packages/cubejs-schema-compiler/src/adapter/index.ts b/packages/cubejs-schema-compiler/src/adapter/index.ts index 7ed33bfa3ac89..94aeb4a7ef199 100644 --- a/packages/cubejs-schema-compiler/src/adapter/index.ts +++ b/packages/cubejs-schema-compiler/src/adapter/index.ts @@ -10,6 +10,7 @@ export * from './PreAggregations'; export * from './QueryBuilder'; export * from './QueryCache'; export * from './QueryFactory'; +export * from './CubeStoreQuery'; // Base queries that can be re-used across different drivers export * from './MysqlQuery'; diff --git a/packages/cubejs-server-core/src/core/OptsHandler.ts b/packages/cubejs-server-core/src/core/OptsHandler.ts index 9bdb8f4e661eb..8a802658a36c8 100644 --- a/packages/cubejs-server-core/src/core/OptsHandler.ts +++ b/packages/cubejs-server-core/src/core/OptsHandler.ts @@ -86,7 +86,7 @@ export class OptsHandler { 'must be specified' ); } - + // TODO (buntarb): this assertion should be restored after documentation // will be added. // @@ -372,7 +372,7 @@ export class OptsHandler { }) ); - let externalDialectFactory = () => ( + const externalDialectFactory = () => ( typeof externalDbType === 'string' && lookupDriverClass(externalDbType).dialectClass && lookupDriverClass(externalDbType).dialectClass() @@ -430,8 +430,6 @@ export class OptsHandler { // Lazy loading for Cube Store externalDriverFactory = () => new cubeStorePackage.CubeStoreDevDriver(cubeStoreHandler); - externalDialectFactory = - () => cubeStorePackage.CubeStoreDevDriver.dialectClass(); } else { this.core.logger('Cube Store is not supported on your system', { warning: ( diff --git a/tsconfig.json b/tsconfig.json index e558bd13be41a..373304a355013 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -22,6 +22,12 @@ { "path": "packages/cubejs-base-driver" }, + { + "path": "rust/cubestore" + }, + { + "path": "packages/cubejs-cubestore-driver" + }, { "path": "packages/cubejs-query-orchestrator" }, @@ -88,12 +94,6 @@ { "path": "packages/cubejs-trino-driver" }, - { - "path": "rust/cubestore" - }, - { - "path": "packages/cubejs-cubestore-driver" - }, { "path": "packages/cubejs-server-core" }, From 25bcccfa3c84d4aa9ea7bcdc5b89dd214d798252 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 23 Dec 2022 21:51:50 +0300 Subject: [PATCH 02/10] chore: fixes --- packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts | 4 ++++ .../src/orchestrator/QueryOrchestrator.ts | 2 +- .../src/orchestrator/RedisCacheDriver.ts | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts index 377d375f991d7..46f216c9d8d7e 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts @@ -24,6 +24,10 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { const rows = await this.connection.query(`CACHE SET NX TTL ${expiration} ? ?`, [key, '1']); if (rows && rows.length === 1 && rows[0]?.success === 'true') { + if (tkn.isCanceled()) { + return false; + } + try { await tkn.with(cb()); } finally { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index fee68aba666ab..cd3541b27296a 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -8,7 +8,7 @@ import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { RedisQueueEventsBus } from './RedisQueueEventsBus'; import { LocalQueueEventsBus } from './LocalQueueEventsBus'; -export type CacheAndQueryDriverType = 'redis' | 'memory'; +export type CacheAndQueryDriverType = 'redis' | 'memory' | 'cubestore'; export enum DriverType { External = 'external', diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts index 17039ad0659c0..4586ea626aa84 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts @@ -52,6 +52,10 @@ export class RedisCacheDriver implements CacheDriverInterface { ); if (response === 'OK') { + if (tkn.isCanceled()) { + return false; + } + try { await tkn.with(cb()); } finally { From 43e0ac6801478f1ae96657121a2d206dd8540dca Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sun, 25 Dec 2022 22:23:49 +0300 Subject: [PATCH 03/10] chore: testing --- .../src/CubeStoreCacheDriver.ts | 12 ++--- packages/cubejs-query-orchestrator/.gitignore | 1 + .../src/orchestrator/BaseQueueDriver.ts | 2 +- .../src/orchestrator/PreAggregations.ts | 3 +- .../src/orchestrator/QueryCache.ts | 46 +++++++++++-------- .../src/orchestrator/QueryOrchestrator.ts | 5 ++ .../cubestore/QueryCacheCubestore.test.ts | 20 +++++++- .../test/unit/QueryCache.abstract.ts | 19 +++++++- .../cubejs-testing-shared/src/db/mssql.ts | 2 +- 9 files changed, 76 insertions(+), 34 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts index 46f216c9d8d7e..788a6dfd2d64f 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts @@ -3,14 +3,10 @@ import { CacheDriverInterface } from '@cubejs-backend/base-driver'; import { CubeStoreDriver } from './CubeStoreDriver'; -interface CubeStoreCacheDriverOptions {} - export class CubeStoreCacheDriver implements CacheDriverInterface { - protected readonly connection: CubeStoreDriver; - - public constructor(options: CubeStoreCacheDriverOptions) { - this.connection = new CubeStoreDriver({}); - } + public constructor( + protected readonly connection: CubeStoreDriver + ) {} public withLock = ( key: string, @@ -22,7 +18,7 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { return false; } - const rows = await this.connection.query(`CACHE SET NX TTL ${expiration} ? ?`, [key, '1']); + const rows = await this.connection.query('CACHE SET NX TTL ? ? ?', [expiration, key, '1']); if (rows && rows.length === 1 && rows[0]?.success === 'true') { if (tkn.isCanceled()) { return false; diff --git a/packages/cubejs-query-orchestrator/.gitignore b/packages/cubejs-query-orchestrator/.gitignore index 1521c8b7652b1..835ba6e1cc667 100644 --- a/packages/cubejs-query-orchestrator/.gitignore +++ b/packages/cubejs-query-orchestrator/.gitignore @@ -1 +1,2 @@ dist +.cubestore diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts index 91930b02fee3a..c15bd99a4efeb 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts @@ -1,5 +1,5 @@ -import { getCacheHash } from './utils'; import { LocalQueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver'; +import { getCacheHash } from './utils'; export abstract class BaseQueueDriver implements QueueDriverInterface { public redisHash(queryKey) { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 678ef35b56319..ddf11f4f4c77a 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -30,6 +30,7 @@ import { ContinueWaitError } from './ContinueWaitError'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { QueryQueue } from './QueryQueue'; import { LargeStreamWarning } from './StreamObjectsCounter'; +import { CacheAndQueryDriverType } from './QueryOrchestrator'; /// Name of the inline table containing the lambda rows. export const LAMBDA_TABLE_PREFIX = 'lambda'; @@ -1832,7 +1833,7 @@ type PreAggregationsOptions = { }>; redisPool?: any; continueWaitTimeout?: number; - cacheAndQueueDriver?: 'redis' | 'memory'; + cacheAndQueueDriver?: CacheAndQueryDriverType; skipExternalCacheAndQueue?: boolean; }; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index d354e7179ecaf..91b8fce2c4c83 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -1,7 +1,7 @@ import csvWriter from 'csv-write-stream'; import LRUCache from 'lru-cache'; import { MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared'; -import { CubeStoreCacheDriver } from '@cubejs-backend/cubestore-driver'; +import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { BaseDriver, InlineTables, CacheDriverInterface } from '@cubejs-backend/base-driver'; import { QueryQueue } from './QueryQueue'; @@ -11,6 +11,7 @@ import { LocalCacheDriver } from './LocalCacheDriver'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { PreAggregationDescription } from './PreAggregations'; import { getCacheHash } from './utils'; +import { CacheAndQueryDriverType } from './QueryOrchestrator'; type QueryOptions = { external?: boolean; @@ -102,6 +103,26 @@ type CacheEntry = { renewalKey: string; }; +export interface QueryCacheOptions { + refreshKeyRenewalThreshold?: number; + externalQueueOptions?: any; + externalDriverFactory?: DriverFactory; + backgroundRenew?: Boolean; + queueOptions?: (dataSource: string) => Promise<{ + concurrency: number; + continueWaitTimeout?: number; + executionTimeout?: number; + orphanedTimeout?: number; + heartBeatInterval?: number; + }>; + redisPool?: any; + cubeStoreDriver?: CubeStoreDriver, + continueWaitTimeout?: number; + cacheAndQueueDriver?: CacheAndQueryDriverType; + maxInMemoryCacheEntries?: number; + skipExternalCacheAndQueue?: boolean; +} + export class QueryCache { protected readonly cacheDriver: CacheDriverInterface; @@ -115,24 +136,7 @@ export class QueryCache { protected readonly redisPrefix: string, protected readonly driverFactory: DriverFactoryByDataSource, protected readonly logger: any, - public readonly options: { - refreshKeyRenewalThreshold?: number; - externalQueueOptions?: any; - externalDriverFactory?: DriverFactory; - backgroundRenew?: Boolean; - queueOptions?: (dataSource: string) => Promise<{ - concurrency: number; - continueWaitTimeout?: number; - executionTimeout?: number; - orphanedTimeout?: number; - heartBeatInterval?: number; - }>; - redisPool?: any; - continueWaitTimeout?: number; - cacheAndQueueDriver?: 'redis' | 'memory' | 'cubestore'; - maxInMemoryCacheEntries?: number; - skipExternalCacheAndQueue?: boolean; - } = {} + public readonly options: QueryCacheOptions = {} ) { switch (options.cacheAndQueueDriver || 'memory') { case 'redis': @@ -142,7 +146,9 @@ export class QueryCache { this.cacheDriver = new LocalCacheDriver(); break; case 'cubestore': - this.cacheDriver = new CubeStoreCacheDriver({}); + this.cacheDriver = new CubeStoreCacheDriver( + options.cubeStoreDriver || new CubeStoreDriver({}) + ); break; default: throw new Error(`Unknown cache driver: ${options.cacheAndQueueDriver}`); diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index cd3541b27296a..71d5cb2f5da66 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -61,6 +61,10 @@ export class QueryOrchestrator { const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool(options.redisPoolOptions) : undefined; this.redisPool = redisPool; + + // TODO: Re-use connection from external database + const cubeStoreDriver = undefined; + const { externalDriverFactory, continueWaitTimeout, skipExternalCacheAndQueue } = options; this.queryCache = new QueryCache( @@ -71,6 +75,7 @@ export class QueryOrchestrator { externalDriverFactory, cacheAndQueueDriver, redisPool, + cubeStoreDriver, continueWaitTimeout, skipExternalCacheAndQueue, ...options.queryCacheOptions, diff --git a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts index c717b140b27ae..accad2bc647e4 100644 --- a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts +++ b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts @@ -1,8 +1,26 @@ +import { CubeStoreDevDriver, CubeStoreHandler } from '@cubejs-backend/cubestore-driver'; import { QueryCacheTest } from '../../unit/QueryCache.abstract'; +const cubeStoreHandler = new CubeStoreHandler({ + stdout: (data) => { + console.log(data.toString().trim()); + }, + stderr: (data) => { + console.log(data.toString().trim()); + }, + onRestart: (code) => console.log({ + warning: `Instance exit with ${code}, restarting`, + }), +}); + QueryCacheTest( - 'Cubestore Cache Driver', + 'CubeStore Cache Driver', { cacheAndQueueDriver: 'cubestore', + cubeStoreDriver: new CubeStoreDevDriver(cubeStoreHandler), + beforeAll: async () => { + await cubeStoreHandler.acquire(); + }, + afterAll: async () => cubeStoreHandler.release(true) } ); diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts index 8561ee699dcca..3e7af2c955103 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts @@ -1,9 +1,14 @@ import crypto from 'crypto'; import { createCancelablePromise, pausePromise } from '@cubejs-backend/shared'; -import { QueryCache } from '../../src'; +import { QueryCache, QueryCacheOptions } from '../../src'; -export const QueryCacheTest = (name: string, options?: any) => { +export type QueryCacheTestOptions = QueryCacheOptions & { + beforeAll?: () => Promise, + afterAll?: () => Promise, +}; + +export const QueryCacheTest = (name: string, options?: QueryCacheTestOptions) => { describe(`QueryQueue${name}`, () => { const cache = new QueryCache( crypto.randomBytes(16).toString('hex'), @@ -14,8 +19,18 @@ export const QueryCacheTest = (name: string, options?: any) => { options, ); + beforeAll(async () => { + if (options?.beforeAll) { + await options?.beforeAll(); + } + }); + afterAll(async () => { await cache.cleanup(); + + if (options?.afterAll) { + await options?.afterAll(); + } }); it('withLock', async () => { diff --git a/packages/cubejs-testing-shared/src/db/mssql.ts b/packages/cubejs-testing-shared/src/db/mssql.ts index fecb199fb7e27..277707a2f7e04 100644 --- a/packages/cubejs-testing-shared/src/db/mssql.ts +++ b/packages/cubejs-testing-shared/src/db/mssql.ts @@ -14,7 +14,7 @@ export class MssqlDbRunner extends DbRunnerAbstract { .withEnv('ACCEPT_EULA', 'Y') .withEnv('MSSQL_SA_PASSWORD', process.env.TEST_DB_PASSWORD || 'Test1test') .withExposedPorts(1433) - .withWaitStrategy(Wait.forLogMessage("Service Broker manager has started")) + .withWaitStrategy(Wait.forLogMessage('Service Broker manager has started')) .withStartupTimeout(30 * 1000); if (options.volumes) { From 509197b11400e0e184619473971e95403ad13b46 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sun, 25 Dec 2022 23:17:15 +0300 Subject: [PATCH 04/10] chore: fix lock & cancelation --- .../src/CubeStoreCacheDriver.ts | 4 ++ .../src/orchestrator/RedisCacheDriver.ts | 5 ++- .../cubestore/QueryCacheCubestore.test.ts | 44 ++++++++++++------- 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts index 788a6dfd2d64f..f2c1b1b6089a4 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts @@ -21,6 +21,10 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { const rows = await this.connection.query('CACHE SET NX TTL ? ? ?', [expiration, key, '1']); if (rows && rows.length === 1 && rows[0]?.success === 'true') { if (tkn.isCanceled()) { + if (freeAfter) { + await this.connection.query(`CACHE REMOVE "${key}"`, []); + } + return false; } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts index 4586ea626aa84..c3a460406829f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/RedisCacheDriver.ts @@ -50,9 +50,12 @@ export class RedisCacheDriver implements CacheDriverInterface { 'EX', expiration ); - if (response === 'OK') { if (tkn.isCanceled()) { + if (freeAfter) { + await client.delAsync(key); + } + return false; } diff --git a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts index accad2bc647e4..756fa1b9f8cee 100644 --- a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts +++ b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts @@ -1,26 +1,36 @@ -import { CubeStoreDevDriver, CubeStoreHandler } from '@cubejs-backend/cubestore-driver'; +import { CubeStoreDevDriver, CubeStoreDriver, CubeStoreHandler } from '@cubejs-backend/cubestore-driver'; import { QueryCacheTest } from '../../unit/QueryCache.abstract'; -const cubeStoreHandler = new CubeStoreHandler({ - stdout: (data) => { - console.log(data.toString().trim()); - }, - stderr: (data) => { - console.log(data.toString().trim()); - }, - onRestart: (code) => console.log({ - warning: `Instance exit with ${code}, restarting`, - }), -}); +let beforeAll; +let afterAll; +let cubeStoreDriver = new CubeStoreDriver({}); + +if ((process.env.CUBEJS_TESTING_CUBESTORE_AUTO_PROVISIONING || 'true') === 'true') { + const cubeStoreHandler = new CubeStoreHandler({ + stdout: (data) => { + console.log(data.toString().trim()); + }, + stderr: (data) => { + console.log(data.toString().trim()); + }, + onRestart: (code) => console.log({ + warning: `Instance exit with ${code}, restarting`, + }), + }); + + beforeAll = async () => { + await cubeStoreHandler.acquire(); + }; + afterAll = async () => cubeStoreHandler.release(true); + cubeStoreDriver = new CubeStoreDevDriver(cubeStoreHandler); +} QueryCacheTest( 'CubeStore Cache Driver', { cacheAndQueueDriver: 'cubestore', - cubeStoreDriver: new CubeStoreDevDriver(cubeStoreHandler), - beforeAll: async () => { - await cubeStoreHandler.acquire(); - }, - afterAll: async () => cubeStoreHandler.release(true) + cubeStoreDriver, + beforeAll, + afterAll } ); From 4a0897370995d24f53998452bac3fa8e153597aa Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sun, 25 Dec 2022 23:20:27 +0300 Subject: [PATCH 05/10] chore(ci): run integration tests --- .github/workflows/push.yml | 57 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index fe2522fa866c4..56d300a8e61e8 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -299,6 +299,63 @@ jobs: CUBEJS_REDIS_USE_IOREDIS: true CUBEJS_REDIS_SENTINEL: "redis+sentinel://localhost:5000,localhost:5001,localhost:5002/mymaster/0" + integration-cubestore: + needs: [unit, lint, latest-tag-sha] + runs-on: ubuntu-20.04 + timeout-minutes: 60 + if: (needs['latest-tag-sha'].outputs.sha != github.sha) + + strategy: + matrix: + node-version: [14.x] + fail-fast: false + + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly-2022-03-08 + override: true + components: rustfmt + - name: Install Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node-version }} + - name: Get yarn cache directory path + id: yarn-cache-dir-path + run: echo "::set-output name=dir::$(yarn cache dir)" + - name: Restore lerna + uses: actions/cache@v2 + with: + path: | + ${{ steps.yarn-cache-dir-path.outputs.dir }} + node_modules + rust/cubestore/node_modules + packages/*/node_modules + key: ${{ runner.os }}-workspace-main-${{ matrix.node-version }}-${{ hashFiles('**/yarn.lock') }} + restore-keys: | + ${{ runner.os }}-workspace-main-${{ matrix.node-version }}- + - name: Set Yarn version + run: yarn policies set-version v1.22.5 + - name: Yarn install + uses: nick-invision/retry@v2 + env: + CUBESTORE_SKIP_POST_INSTALL: true + with: + max_attempts: 3 + retry_on: error + retry_wait_seconds: 15 + timeout_minutes: 20 + command: yarn install --frozen-lockfile + - name: Lerna tsc + run: yarn tsc + - name: Run Cubestore Integration + timeout-minutes: 10 + run: | + yarn lerna run --concurrency 1 --stream --no-prefix integration:cubestore + integration: needs: [unit, lint, latest-tag-sha] runs-on: ubuntu-20.04 From 317d1e2d897e44fc319d4fd2f51cd98e738f10e8 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 27 Dec 2022 23:43:45 +0300 Subject: [PATCH 06/10] chore: move cubestore integration tests to testing (cycle deps problem) --- packages/cubejs-cubestore-driver/package.json | 5 +---- .../test/CubeStoreQuery.test.ts | 14 -------------- packages/cubejs-testing/package.json | 1 + .../test/driver-cubestore.test.ts | 18 ++++++++++++++++++ 4 files changed, 20 insertions(+), 18 deletions(-) delete mode 100644 packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts create mode 100644 packages/cubejs-testing/test/driver-cubestore.test.ts diff --git a/packages/cubejs-cubestore-driver/package.json b/packages/cubejs-cubestore-driver/package.json index bd1cff4bcdfa7..3454155c24201 100644 --- a/packages/cubejs-cubestore-driver/package.json +++ b/packages/cubejs-cubestore-driver/package.json @@ -23,10 +23,7 @@ "tsc": "tsc", "watch": "tsc -w", "lint": "eslint src/*.ts", - "lint:fix": "eslint --fix src/*.ts", - "test": "yarn integration", - "integration": "jest", - "integration:cubestore": "jest" + "lint:fix": "eslint --fix src/*.ts" }, "dependencies": { "@cubejs-backend/base-driver": "^0.31.32", diff --git a/packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts b/packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts deleted file mode 100644 index 9c81aac56d46b..0000000000000 --- a/packages/cubejs-cubestore-driver/test/CubeStoreQuery.test.ts +++ /dev/null @@ -1,14 +0,0 @@ -// import { createQueryTestCase, CubeStoreDBRunner, QueryTestAbstract } from '@cubejs-backend/testing-shared'; -// import { CubeStoreDriver } from '../src'; -// -// class CubeStoreQueryTest extends QueryTestAbstract { -// } -// -// createQueryTestCase(new CubeStoreQueryTest(), { -// name: 'CubeStore', -// connectionFactory: (container) => new CubeStoreDriver({ -// host: container.getHost(), -// port: container.getMappedPort(3030) -// }), -// DbRunnerClass: CubeStoreDBRunner, -// }); diff --git a/packages/cubejs-testing/package.json b/packages/cubejs-testing/package.json index bb68ea0d7f723..9750b073e01b7 100644 --- a/packages/cubejs-testing/package.json +++ b/packages/cubejs-testing/package.json @@ -46,6 +46,7 @@ "driver:questdb:snap": "jest --verbose --updateSnapshot -i dist/test/driver-questdb.test.js", "driver:databricks": "jest --verbose -i dist/test/driver-databricks.test.js", "driver:databricks:snap": "jest --verbose --updateSnapshot -i dist/test/driver-databricks.test.js", + "integration:cubestore": "jest --verbose --updateSnapshot -i dist/test/driver-cubestore.test.js", "rest:postgres": "yarn tsc && clear && jest --verbose -i dist/test/rest-postgres.test.js", "smoke": "jest --verbose -i 'dist/test/smoke-(?!.*?(redshift|bigquery|firebolt))'", "smoke:athena": "jest --verbose -i dist/test/smoke-athena.test.js", diff --git a/packages/cubejs-testing/test/driver-cubestore.test.ts b/packages/cubejs-testing/test/driver-cubestore.test.ts new file mode 100644 index 0000000000000..bec293266fdca --- /dev/null +++ b/packages/cubejs-testing/test/driver-cubestore.test.ts @@ -0,0 +1,18 @@ +import { createQueryTestCase, CubeStoreDBRunner, QueryTestAbstract } from '@cubejs-backend/testing-shared'; +import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; +import { CubeStoreQuery } from '@cubejs-backend/schema-compiler'; + +class CubeStoreQueryTest extends QueryTestAbstract { + public getQueryClass(): any { + return CubeStoreQuery; + } +} + +createQueryTestCase(new CubeStoreQueryTest(), { + name: 'CubeStore', + connectionFactory: (container) => new CubeStoreDriver({ + host: container.getHost(), + port: container.getMappedPort(3030) + }), + DbRunnerClass: CubeStoreDBRunner, +}); From 3ca56af64a1d1c353b60c415243eecb4d4f3db58 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 30 Dec 2022 19:48:36 +0300 Subject: [PATCH 07/10] chore: cyclic deps --- packages/cubejs-cubestore-driver/package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/cubejs-cubestore-driver/package.json b/packages/cubejs-cubestore-driver/package.json index 3454155c24201..51698096bb3fa 100644 --- a/packages/cubejs-cubestore-driver/package.json +++ b/packages/cubejs-cubestore-driver/package.json @@ -42,7 +42,6 @@ }, "devDependencies": { "@cubejs-backend/linter": "^0.31.0", - "@cubejs-backend/testing-shared": "^0.31.32", "@types/flatbuffers": "^1.10.0", "@types/generic-pool": "^3.1.9", "@types/mysql": "^2.15.17", From a72f37cf678fc484bfe1692d6e8e6e3f9c885a88 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 30 Dec 2022 19:49:24 +0300 Subject: [PATCH 08/10] chore: prefix supports for cubestore driver --- .../src/orchestrator/PreAggregations.ts | 6 +++--- .../src/orchestrator/QueryCache.ts | 12 ++++++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index ddf11f4f4c77a..46bf50c42be89 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -327,7 +327,7 @@ class PreAggregationLoadCache { } public tablesRedisKey(preAggregation: PreAggregationDescription) { - return `SQL_PRE_AGGREGATIONS_TABLES_${this.redisPrefix}_${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`; + return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`); } protected async getTablesQuery(preAggregation) { @@ -1884,12 +1884,12 @@ export class PreAggregations { protected tablesUsedRedisKey(tableName) { // TODO add dataSource? - return `SQL_PRE_AGGREGATIONS_${this.redisPrefix}_TABLES_USED_${tableName}`; + return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_USED', tableName); } protected tablesTouchRedisKey(tableName) { // TODO add dataSource? - return `SQL_PRE_AGGREGATIONS_${this.redisPrefix}_TABLES_TOUCH_${tableName}`; + return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName); } public async addTableUsed(tableName) { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 91b8fce2c4c83..8b1c39e094aa1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -166,6 +166,14 @@ export class QueryCache { return this.cacheDriver; } + public getKey(catalog: string, key: string): string { + if (this.cacheDriver instanceof CubeStoreCacheDriver) { + return `${this.redisPrefix}#${catalog}:${key}`; + } else { + return `${catalog}_${this.redisPrefix}_${key}`; + } + } + /** * Force reconcile queue logic to be executed. */ @@ -857,8 +865,8 @@ export class QueryCache { return null; } - public queryRedisKey(cacheKey) { - return `SQL_QUERY_RESULT_${this.redisPrefix}_${getCacheHash(cacheKey)}`; + public queryRedisKey(cacheKey): string { + return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey)); } public async cleanup() { From a25cd8545e79bf7f1983574b303dd7930f0d67c6 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 30 Dec 2022 22:34:32 +0300 Subject: [PATCH 09/10] chore: better name for interface --- packages/cubejs-base-driver/src/queue-driver.interface.ts | 6 +++--- .../src/orchestrator/BaseQueueDriver.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 1f3d9790abe9f..4e66e509b7d8f 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -1,4 +1,4 @@ -export interface LocalQueueDriverConnectionInterface { +export interface QueueDriverConnectionInterface { getResultBlocking(queryKey: string): Promise; getResult(queryKey: string): Promise; addToQueue(queryKey: string): Promise; @@ -18,6 +18,6 @@ export interface LocalQueueDriverConnectionInterface { } export interface QueueDriverInterface { - createConnection(): Promise; - release(connection: LocalQueueDriverConnectionInterface): Promise; + createConnection(): Promise; + release(connection: QueueDriverConnectionInterface): Promise; } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts index c15bd99a4efeb..1e3e0091098d5 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts @@ -1,4 +1,4 @@ -import { LocalQueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver'; +import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver'; import { getCacheHash } from './utils'; export abstract class BaseQueueDriver implements QueueDriverInterface { @@ -6,7 +6,7 @@ export abstract class BaseQueueDriver implements QueueDriverInterface { return getCacheHash(queryKey); } - abstract createConnection(): Promise; + abstract createConnection(): Promise; - abstract release(connection: LocalQueueDriverConnectionInterface): Promise; + abstract release(connection: QueueDriverConnectionInterface): Promise; } From a6f0e83376509453de231f638e51321a3fcd5a45 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sat, 31 Dec 2022 12:42:19 +0300 Subject: [PATCH 10/10] chore: use placeholders everywhere --- .../src/CubeStoreCacheDriver.ts | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts index f2c1b1b6089a4..fc0a586a35e70 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts @@ -22,7 +22,9 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { if (rows && rows.length === 1 && rows[0]?.success === 'true') { if (tkn.isCanceled()) { if (freeAfter) { - await this.connection.query(`CACHE REMOVE "${key}"`, []); + await this.connection.query('CACHE REMOVE ?', [ + key + ]); } return false; @@ -32,7 +34,9 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { await tkn.with(cb()); } finally { if (freeAfter) { - await this.connection.query(`CACHE REMOVE "${key}"`, []); + await this.connection.query('CACHE REMOVE ?', [ + key + ]); } } @@ -43,7 +47,9 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { }); public async get(key: string) { - const rows = await this.connection.query(`CACHE GET "${key}"`, []); + const rows = await this.connection.query('CACHE GET ?', [ + key + ]); if (rows && rows.length === 1) { return JSON.parse(rows[0].value); } @@ -53,7 +59,7 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { public async set(key: string, value, expiration) { const strValue = JSON.stringify(value); - await this.connection.query(`CACHE SET TTL ${expiration} ? ?`, [key, strValue]); + await this.connection.query('CACHE SET TTL ? ? ?', [expiration, key, strValue]); return { key, @@ -62,11 +68,15 @@ export class CubeStoreCacheDriver implements CacheDriverInterface { } public async remove(key: string) { - await this.connection.query(`CACHE REMOVE "${key}"`, []); + await this.connection.query('CACHE REMOVE ?', [ + key + ]); } public async keysStartingWith(prefix: string) { - const rows = await this.connection.query(`CACHE KEYS "${prefix}"`, []); + const rows = await this.connection.query('CACHE KEYS ?', [ + prefix + ]); return rows.map((row) => row.key); }