Skip to content

Commit

Permalink
feat: Introduce CubeStoreCacheDriver (#5511)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Dec 31, 2022
1 parent ae5e8f8 commit fb39776
Show file tree
Hide file tree
Showing 30 changed files with 369 additions and 92 deletions.
57 changes: 57 additions & 0 deletions .github/workflows/push.yml
Expand Up @@ -299,6 +299,63 @@ jobs:
CUBEJS_REDIS_USE_IOREDIS: true
CUBEJS_REDIS_SENTINEL: "redis+sentinel://localhost:5000,localhost:5001,localhost:5002/mymaster/0"

integration-cubestore:
needs: [unit, lint, latest-tag-sha]
runs-on: ubuntu-20.04
timeout-minutes: 60
if: (needs['latest-tag-sha'].outputs.sha != github.sha)

strategy:
matrix:
node-version: [14.x]
fail-fast: false

steps:
- name: Checkout
uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2022-03-08
override: true
components: rustfmt
- name: Install Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Get yarn cache directory path
id: yarn-cache-dir-path
run: echo "::set-output name=dir::$(yarn cache dir)"
- name: Restore lerna
uses: actions/cache@v2
with:
path: |
${{ steps.yarn-cache-dir-path.outputs.dir }}
node_modules
rust/cubestore/node_modules
packages/*/node_modules
key: ${{ runner.os }}-workspace-main-${{ matrix.node-version }}-${{ hashFiles('**/yarn.lock') }}
restore-keys: |
${{ runner.os }}-workspace-main-${{ matrix.node-version }}-
- name: Set Yarn version
run: yarn policies set-version v1.22.5
- name: Yarn install
uses: nick-invision/retry@v2
env:
CUBESTORE_SKIP_POST_INSTALL: true
with:
max_attempts: 3
retry_on: error
retry_wait_seconds: 15
timeout_minutes: 20
command: yarn install --frozen-lockfile
- name: Lerna tsc
run: yarn tsc
- name: Run Cubestore Integration
timeout-minutes: 10
run: |
yarn lerna run --concurrency 1 --stream --no-prefix integration:cubestore
integration:
needs: [unit, lint, latest-tag-sha]
runs-on: ubuntu-20.04
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-base-driver/src/index.ts
@@ -1,3 +1,5 @@
export * from './BaseDriver';
export * from './utils';
export * from './driver.interface';
export * from './queue-driver.interface';
export * from './cache-driver.interface';
23 changes: 23 additions & 0 deletions packages/cubejs-base-driver/src/queue-driver.interface.ts
@@ -0,0 +1,23 @@
export interface QueueDriverConnectionInterface {
getResultBlocking(queryKey: string): Promise<unknown>;
getResult(queryKey: string): Promise<unknown>;
addToQueue(queryKey: string): Promise<unknown>;
getToProcessQueries(): Promise<unknown>;
getActiveQueries(): Promise<unknown>;
getOrphanedQueries(): Promise<unknown>;
getStalledQueries(): Promise<unknown>;
getQueryStageState(onlyKeys: any): Promise<unknown>;
updateHeartBeat(queryKey: string): Promise<void>;
getNextProcessingId(): Promise<string>;
retrieveForProcessing(queryKey: string, processingId: string): Promise<unknown>;
freeProcessingLock(queryKe: string, processingId: string, activated: unknown): Promise<unknown>;
optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise<unknown>;
cancelQuery(queryKey: string): Promise<unknown>;
setResultAndRemoveQuery(queryKey: string, executionResult: any, processingId: any): Promise<unknown>;
release(): Promise<void>;
}

export interface QueueDriverInterface {
createConnection(): Promise<QueueDriverConnectionInterface>;
release(connection: QueueDriverConnectionInterface): Promise<void>;
}
18 changes: 8 additions & 10 deletions packages/cubejs-cubestore-driver/index.js
@@ -1,17 +1,15 @@
const fromExports = require('./dist/src');
const { CubeStoreDriver } = require('./dist/src/CubeStoreDriver');
const { CubeStoreDevDriver } = require('./dist/src/CubeStoreDevDriver');
const { isCubeStoreSupported, CubeStoreHandler } = require('./dist/src/rexport');

/**
* After 5 years working with TypeScript, now I know
* that commonjs and nodejs require is not compatibility with using export default
*/
module.exports = CubeStoreDriver;
const toExport = CubeStoreDriver;

/**
* It's needed to move our CLI to destructing style on import
* Please sync this file with src/index.ts
*/
module.exports.CubeStoreDevDriver = CubeStoreDevDriver;
module.exports.isCubeStoreSupported = isCubeStoreSupported;
module.exports.CubeStoreHandler = CubeStoreHandler;
// eslint-disable-next-line no-restricted-syntax
for (const [key, module] of Object.entries(fromExports)) {
toExport[key] = module;
}

module.exports = toExport;
7 changes: 1 addition & 6 deletions packages/cubejs-cubestore-driver/package.json
Expand Up @@ -23,15 +23,11 @@
"tsc": "tsc",
"watch": "tsc -w",
"lint": "eslint src/*.ts",
"lint:fix": "eslint --fix src/*.ts",
"test": "yarn integration",
"integration": "jest",
"integration:cubestore": "jest"
"lint:fix": "eslint --fix src/*.ts"
},
"dependencies": {
"@cubejs-backend/base-driver": "^0.31.32",
"@cubejs-backend/cubestore": "^0.31.32",
"@cubejs-backend/schema-compiler": "^0.31.32",
"@cubejs-backend/shared": "^0.31.32",
"csv-write-stream": "^2.0.0",
"flatbuffers": "^1.12.0",
Expand All @@ -46,7 +42,6 @@
},
"devDependencies": {
"@cubejs-backend/linter": "^0.31.0",
"@cubejs-backend/testing-shared": "^0.31.32",
"@types/flatbuffers": "^1.10.0",
"@types/generic-pool": "^3.1.9",
"@types/mysql": "^2.15.17",
Expand Down
90 changes: 90 additions & 0 deletions packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts
@@ -0,0 +1,90 @@
import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared';
import { CacheDriverInterface } from '@cubejs-backend/base-driver';

import { CubeStoreDriver } from './CubeStoreDriver';

export class CubeStoreCacheDriver implements CacheDriverInterface {
public constructor(
protected readonly connection: CubeStoreDriver
) {}

public withLock = (
key: string,
cb: () => MaybeCancelablePromise<any>,
expiration: number = 60,
freeAfter: boolean = true,
) => createCancelablePromise(async (tkn) => {
if (tkn.isCanceled()) {
return false;
}

const rows = await this.connection.query('CACHE SET NX TTL ? ? ?', [expiration, key, '1']);
if (rows && rows.length === 1 && rows[0]?.success === 'true') {
if (tkn.isCanceled()) {
if (freeAfter) {
await this.connection.query('CACHE REMOVE ?', [
key
]);
}

return false;
}

try {
await tkn.with(cb());
} finally {
if (freeAfter) {
await this.connection.query('CACHE REMOVE ?', [
key
]);
}
}

return true;
}

return false;
});

public async get(key: string) {
const rows = await this.connection.query('CACHE GET ?', [
key
]);
if (rows && rows.length === 1) {
return JSON.parse(rows[0].value);
}

return null;
}

public async set(key: string, value, expiration) {
const strValue = JSON.stringify(value);
await this.connection.query('CACHE SET TTL ? ? ?', [expiration, key, strValue]);

return {
key,
bytes: Buffer.byteLength(strValue),
};
}

public async remove(key: string) {
await this.connection.query('CACHE REMOVE ?', [
key
]);
}

public async keysStartingWith(prefix: string) {
const rows = await this.connection.query('CACHE KEYS ?', [
prefix
]);
return rows.map((row) => row.key);
}

public async cleanup(): Promise<void> {
//
}

public async testConnection(): Promise<void> {
return this.connection.testConnection();
}
}
9 changes: 2 additions & 7 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Expand Up @@ -10,15 +10,14 @@ import {
ExternalCreateTableOptions,
DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex,
StreamTableData,
DriverCapabilities,
StreamingSourceTableData,
QueryOptions,
ExternalDriverCompatibilities,
} from '@cubejs-backend/base-driver';
import { getEnv } from '@cubejs-backend/shared';
import { format as formatSql, escape } from 'sqlstring';
import fetch from 'node-fetch';

import { CubeStoreQuery } from './CubeStoreQuery';
import { ConnectionConfig } from './types';
import { WebSocketConnection } from './WebSocketConnection';

Expand Down Expand Up @@ -390,11 +389,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
return this.createTableWithOptions(table, columns, options, queryTracingObj);
}

public static dialectClass() {
return CubeStoreQuery;
}

public capabilities(): DriverCapabilities {
public capabilities(): ExternalDriverCompatibilities {
return {
csvImport: true,
streamImport: true,
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-cubestore-driver/src/index.ts
@@ -1,4 +1,4 @@
export * from './CubeStoreQuery';
export * from './CubeStoreCacheDriver';
export * from './CubeStoreDriver';
export * from './CubeStoreDevDriver';
export * from './rexport';
1 change: 1 addition & 0 deletions packages/cubejs-query-orchestrator/.gitignore
@@ -1 +1,2 @@
dist
.cubestore
4 changes: 3 additions & 1 deletion packages/cubejs-query-orchestrator/package.json
Expand Up @@ -20,7 +20,8 @@
"test": "npm run unit && npm run integration",
"unit": "jest --runInBand --coverage --verbose dist/test/unit",
"integration": "npm run integration:redis dist/test/integration",
"integration:redis": "jest --runInBand --verbose dist/test/integration",
"integration:redis": "jest --runInBand --verbose dist/test/integration/redis",
"integration:cubestore": "jest --runInBand --verbose dist/test/integration/cubestore",
"lint": "eslint src/* test/* --ext .ts,.js",
"lint:fix": "eslint --fix src/* test/* --ext .ts,.js"
},
Expand All @@ -33,6 +34,7 @@
"dependencies": {
"@cubejs-backend/base-driver": "^0.31.32",
"@cubejs-backend/shared": "^0.31.32",
"@cubejs-backend/cubestore-driver": "^0.31.0",
"csv-write-stream": "^2.0.0",
"es5-ext": "0.10.53",
"generic-pool": "^3.7.1",
Expand Down
@@ -1,7 +1,12 @@
import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver';
import { getCacheHash } from './utils';

export abstract class BaseQueueDriver {
export abstract class BaseQueueDriver implements QueueDriverInterface {
public redisHash(queryKey) {
return getCacheHash(queryKey);
}

abstract createConnection(): Promise<QueueDriverConnectionInterface>;

abstract release(connection: QueueDriverConnectionInterface): Promise<void>;
}
@@ -1,6 +1,5 @@
import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared';

import { CacheDriverInterface } from './cache-driver.interface';
import { CacheDriverInterface } from '@cubejs-backend/base-driver';

interface ItemBucket {
value: any,
Expand Down
Expand Up @@ -23,13 +23,14 @@ import {
InlineTable,
SaveCancelFn,
StreamOptions,
UnloadOptions,
UnloadOptions
} from '@cubejs-backend/base-driver';
import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
import { ContinueWaitError } from './ContinueWaitError';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { QueryQueue } from './QueryQueue';
import { LargeStreamWarning } from './StreamObjectsCounter';
import { CacheAndQueryDriverType } from './QueryOrchestrator';

/// Name of the inline table containing the lambda rows.
export const LAMBDA_TABLE_PREFIX = 'lambda';
Expand Down Expand Up @@ -326,7 +327,7 @@ class PreAggregationLoadCache {
}

public tablesRedisKey(preAggregation: PreAggregationDescription) {
return `SQL_PRE_AGGREGATIONS_TABLES_${this.redisPrefix}_${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`;
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`);
}

protected async getTablesQuery(preAggregation) {
Expand Down Expand Up @@ -1832,7 +1833,7 @@ type PreAggregationsOptions = {
}>;
redisPool?: any;
continueWaitTimeout?: number;
cacheAndQueueDriver?: 'redis' | 'memory';
cacheAndQueueDriver?: CacheAndQueryDriverType;
skipExternalCacheAndQueue?: boolean;
};

Expand Down Expand Up @@ -1883,12 +1884,12 @@ export class PreAggregations {

protected tablesUsedRedisKey(tableName) {
// TODO add dataSource?
return `SQL_PRE_AGGREGATIONS_${this.redisPrefix}_TABLES_USED_${tableName}`;
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_USED', tableName);
}

protected tablesTouchRedisKey(tableName) {
// TODO add dataSource?
return `SQL_PRE_AGGREGATIONS_${this.redisPrefix}_TABLES_TOUCH_${tableName}`;
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName);
}

public async addTableUsed(tableName) {
Expand Down

0 comments on commit fb39776

Please sign in to comment.