Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,63 @@ jobs:
CUBEJS_REDIS_USE_IOREDIS: true
CUBEJS_REDIS_SENTINEL: "redis+sentinel://localhost:5000,localhost:5001,localhost:5002/mymaster/0"

integration-cubestore:
needs: [unit, lint, latest-tag-sha]
runs-on: ubuntu-20.04
timeout-minutes: 60
if: (needs['latest-tag-sha'].outputs.sha != github.sha)

strategy:
matrix:
node-version: [14.x]
fail-fast: false

steps:
- name: Checkout
uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2022-03-08
override: true
components: rustfmt
- name: Install Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Get yarn cache directory path
id: yarn-cache-dir-path
run: echo "::set-output name=dir::$(yarn cache dir)"
- name: Restore lerna
uses: actions/cache@v2
with:
path: |
${{ steps.yarn-cache-dir-path.outputs.dir }}
node_modules
rust/cubestore/node_modules
packages/*/node_modules
key: ${{ runner.os }}-workspace-main-${{ matrix.node-version }}-${{ hashFiles('**/yarn.lock') }}
restore-keys: |
${{ runner.os }}-workspace-main-${{ matrix.node-version }}-
- name: Set Yarn version
run: yarn policies set-version v1.22.5
- name: Yarn install
uses: nick-invision/retry@v2
env:
CUBESTORE_SKIP_POST_INSTALL: true
with:
max_attempts: 3
retry_on: error
retry_wait_seconds: 15
timeout_minutes: 20
command: yarn install --frozen-lockfile
- name: Lerna tsc
run: yarn tsc
- name: Run Cubestore Integration
timeout-minutes: 10
run: |
yarn lerna run --concurrency 1 --stream --no-prefix integration:cubestore

integration:
needs: [unit, lint, latest-tag-sha]
runs-on: ubuntu-20.04
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-base-driver/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './BaseDriver';
export * from './utils';
export * from './driver.interface';
export * from './queue-driver.interface';
export * from './cache-driver.interface';
23 changes: 23 additions & 0 deletions packages/cubejs-base-driver/src/queue-driver.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export interface QueueDriverConnectionInterface {
getResultBlocking(queryKey: string): Promise<unknown>;
getResult(queryKey: string): Promise<unknown>;
addToQueue(queryKey: string): Promise<unknown>;
getToProcessQueries(): Promise<unknown>;
getActiveQueries(): Promise<unknown>;
getOrphanedQueries(): Promise<unknown>;
getStalledQueries(): Promise<unknown>;
getQueryStageState(onlyKeys: any): Promise<unknown>;
updateHeartBeat(queryKey: string): Promise<void>;
getNextProcessingId(): Promise<string>;
retrieveForProcessing(queryKey: string, processingId: string): Promise<unknown>;
freeProcessingLock(queryKe: string, processingId: string, activated: unknown): Promise<unknown>;
optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise<unknown>;
cancelQuery(queryKey: string): Promise<unknown>;
setResultAndRemoveQuery(queryKey: string, executionResult: any, processingId: any): Promise<unknown>;
release(): Promise<void>;
}

export interface QueueDriverInterface {
createConnection(): Promise<QueueDriverConnectionInterface>;
release(connection: QueueDriverConnectionInterface): Promise<void>;
}
18 changes: 8 additions & 10 deletions packages/cubejs-cubestore-driver/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
const fromExports = require('./dist/src');
const { CubeStoreDriver } = require('./dist/src/CubeStoreDriver');
const { CubeStoreDevDriver } = require('./dist/src/CubeStoreDevDriver');
const { isCubeStoreSupported, CubeStoreHandler } = require('./dist/src/rexport');

/**
* After 5 years working with TypeScript, now I know
* that commonjs and nodejs require is not compatibility with using export default
*/
module.exports = CubeStoreDriver;
const toExport = CubeStoreDriver;

/**
* It's needed to move our CLI to destructing style on import
* Please sync this file with src/index.ts
*/
module.exports.CubeStoreDevDriver = CubeStoreDevDriver;
module.exports.isCubeStoreSupported = isCubeStoreSupported;
module.exports.CubeStoreHandler = CubeStoreHandler;
// eslint-disable-next-line no-restricted-syntax
for (const [key, module] of Object.entries(fromExports)) {
toExport[key] = module;
}

module.exports = toExport;
7 changes: 1 addition & 6 deletions packages/cubejs-cubestore-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,11 @@
"tsc": "tsc",
"watch": "tsc -w",
"lint": "eslint src/*.ts",
"lint:fix": "eslint --fix src/*.ts",
"test": "yarn integration",
"integration": "jest",
"integration:cubestore": "jest"
"lint:fix": "eslint --fix src/*.ts"
},
"dependencies": {
"@cubejs-backend/base-driver": "^0.31.32",
"@cubejs-backend/cubestore": "^0.31.32",
"@cubejs-backend/schema-compiler": "^0.31.32",
"@cubejs-backend/shared": "^0.31.32",
"csv-write-stream": "^2.0.0",
"flatbuffers": "^1.12.0",
Expand All @@ -46,7 +42,6 @@
},
"devDependencies": {
"@cubejs-backend/linter": "^0.31.0",
"@cubejs-backend/testing-shared": "^0.31.32",
"@types/flatbuffers": "^1.10.0",
"@types/generic-pool": "^3.1.9",
"@types/mysql": "^2.15.17",
Expand Down
90 changes: 90 additions & 0 deletions packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared';
import { CacheDriverInterface } from '@cubejs-backend/base-driver';

import { CubeStoreDriver } from './CubeStoreDriver';

export class CubeStoreCacheDriver implements CacheDriverInterface {
public constructor(
protected readonly connection: CubeStoreDriver
) {}

public withLock = (
key: string,
cb: () => MaybeCancelablePromise<any>,
expiration: number = 60,
freeAfter: boolean = true,
) => createCancelablePromise(async (tkn) => {
if (tkn.isCanceled()) {
return false;
}

const rows = await this.connection.query('CACHE SET NX TTL ? ? ?', [expiration, key, '1']);
if (rows && rows.length === 1 && rows[0]?.success === 'true') {
if (tkn.isCanceled()) {
if (freeAfter) {
await this.connection.query('CACHE REMOVE ?', [
key
]);
}

return false;
}

try {
await tkn.with(cb());
} finally {
if (freeAfter) {
await this.connection.query('CACHE REMOVE ?', [
key
]);
}
}

return true;
}

return false;
});

public async get(key: string) {
const rows = await this.connection.query('CACHE GET ?', [
key
]);
if (rows && rows.length === 1) {
return JSON.parse(rows[0].value);
}

return null;
}

public async set(key: string, value, expiration) {
const strValue = JSON.stringify(value);
await this.connection.query('CACHE SET TTL ? ? ?', [expiration, key, strValue]);

return {
key,
bytes: Buffer.byteLength(strValue),
};
}

public async remove(key: string) {
await this.connection.query('CACHE REMOVE ?', [
key
]);
}

public async keysStartingWith(prefix: string) {
const rows = await this.connection.query('CACHE KEYS ?', [
prefix
]);
return rows.map((row) => row.key);
}

public async cleanup(): Promise<void> {
//
}

public async testConnection(): Promise<void> {
return this.connection.testConnection();
}
}
9 changes: 2 additions & 7 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import {
ExternalCreateTableOptions,
DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex,
StreamTableData,
DriverCapabilities,
StreamingSourceTableData,
QueryOptions,
ExternalDriverCompatibilities,
} from '@cubejs-backend/base-driver';
import { getEnv } from '@cubejs-backend/shared';
import { format as formatSql, escape } from 'sqlstring';
import fetch from 'node-fetch';

import { CubeStoreQuery } from './CubeStoreQuery';
import { ConnectionConfig } from './types';
import { WebSocketConnection } from './WebSocketConnection';

Expand Down Expand Up @@ -390,11 +389,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
return this.createTableWithOptions(table, columns, options, queryTracingObj);
}

public static dialectClass() {
return CubeStoreQuery;
}

public capabilities(): DriverCapabilities {
public capabilities(): ExternalDriverCompatibilities {
return {
csvImport: true,
streamImport: true,
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-cubestore-driver/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './CubeStoreQuery';
export * from './CubeStoreCacheDriver';
export * from './CubeStoreDriver';
export * from './CubeStoreDevDriver';
export * from './rexport';
1 change: 1 addition & 0 deletions packages/cubejs-query-orchestrator/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
dist
.cubestore
4 changes: 3 additions & 1 deletion packages/cubejs-query-orchestrator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
"test": "npm run unit && npm run integration",
"unit": "jest --runInBand --coverage --verbose dist/test/unit",
"integration": "npm run integration:redis dist/test/integration",
"integration:redis": "jest --runInBand --verbose dist/test/integration",
"integration:redis": "jest --runInBand --verbose dist/test/integration/redis",
"integration:cubestore": "jest --runInBand --verbose dist/test/integration/cubestore",
"lint": "eslint src/* test/* --ext .ts,.js",
"lint:fix": "eslint --fix src/* test/* --ext .ts,.js"
},
Expand All @@ -33,6 +34,7 @@
"dependencies": {
"@cubejs-backend/base-driver": "^0.31.32",
"@cubejs-backend/shared": "^0.31.32",
"@cubejs-backend/cubestore-driver": "^0.31.0",
"csv-write-stream": "^2.0.0",
"es5-ext": "0.10.53",
"generic-pool": "^3.7.1",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver';
import { getCacheHash } from './utils';

export abstract class BaseQueueDriver {
export abstract class BaseQueueDriver implements QueueDriverInterface {
public redisHash(queryKey) {
return getCacheHash(queryKey);
}

abstract createConnection(): Promise<QueueDriverConnectionInterface>;

abstract release(connection: QueueDriverConnectionInterface): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared';

import { CacheDriverInterface } from './cache-driver.interface';
import { CacheDriverInterface } from '@cubejs-backend/base-driver';

interface ItemBucket {
value: any,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import {
InlineTable,
SaveCancelFn,
StreamOptions,
UnloadOptions,
UnloadOptions
} from '@cubejs-backend/base-driver';
import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
import { ContinueWaitError } from './ContinueWaitError';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { QueryQueue } from './QueryQueue';
import { LargeStreamWarning } from './StreamObjectsCounter';
import { CacheAndQueryDriverType } from './QueryOrchestrator';

/// Name of the inline table containing the lambda rows.
export const LAMBDA_TABLE_PREFIX = 'lambda';
Expand Down Expand Up @@ -326,7 +327,7 @@ class PreAggregationLoadCache {
}

public tablesRedisKey(preAggregation: PreAggregationDescription) {
return `SQL_PRE_AGGREGATIONS_TABLES_${this.redisPrefix}_${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`;
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`);
}

protected async getTablesQuery(preAggregation) {
Expand Down Expand Up @@ -1832,7 +1833,7 @@ type PreAggregationsOptions = {
}>;
redisPool?: any;
continueWaitTimeout?: number;
cacheAndQueueDriver?: 'redis' | 'memory';
cacheAndQueueDriver?: CacheAndQueryDriverType;
skipExternalCacheAndQueue?: boolean;
};

Expand Down Expand Up @@ -1883,12 +1884,12 @@ export class PreAggregations {

protected tablesUsedRedisKey(tableName) {
// TODO add dataSource?
return `SQL_PRE_AGGREGATIONS_${this.redisPrefix}_TABLES_USED_${tableName}`;
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_USED', tableName);
}

protected tablesTouchRedisKey(tableName) {
// TODO add dataSource?
return `SQL_PRE_AGGREGATIONS_${this.redisPrefix}_TABLES_TOUCH_${tableName}`;
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName);
}

public async addTableUsed(tableName) {
Expand Down
Loading