Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@
},
"resolutions": {
"@types/node": "^12",
"@types/ramda": "0.27.40",
"rc-tree": "4.1.5"
"@types/ramda": "0.27.40"
},
"license": "MIT",
"packageManager": "yarn@1.22.19"
Expand Down
3 changes: 2 additions & 1 deletion packages/cubejs-backend-shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
"moment-timezone": "^0.5.33",
"node-fetch": "^2.6.1",
"shelljs": "^0.8.5",
"throttle-debounce": "^3.0.1"
"throttle-debounce": "^3.0.1",
"uuid": "^8.3.2"
},
"publishConfig": {
"access": "public"
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-backend-shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from './http-utils';
export * from './cli';
export * from './proxy';
export * from './time';
export * from './process';
13 changes: 13 additions & 0 deletions packages/cubejs-backend-shared/src/process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { v1, v5 } from 'uuid';

/**
* Unique process ID (aka 00000000-0000-0000-0000-000000000000).
*/
const processUid = v5(v1(), v1()).toString();

/**
* Returns unique process ID.
*/
export function getProcessUid(): string {
return processUid;
}
4 changes: 3 additions & 1 deletion packages/cubejs-base-driver/src/queue-driver.interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export type QueryDef = unknown;
export type QueryKey = string | [string, any[]];
export type QueryKey = (string | [string, any[]]) & {
persistent?: true,
};

export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
Expand Down
19 changes: 13 additions & 6 deletions packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import crypto from 'crypto';
import {
QueueDriverInterface,
QueueDriverConnectionInterface,
Expand All @@ -8,12 +9,18 @@ import {
AddToQueueQuery,
AddToQueueOptions, AddToQueueResponse, QueryKey,
} from '@cubejs-backend/base-driver';
import { getProcessUid } from '@cubejs-backend/shared';

import crypto from 'crypto';
import { CubeStoreDriver } from './CubeStoreDriver';

function hashQueryKey(queryKey: QueryKey) {
return crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');

if (typeof queryKey === 'object' && queryKey.persistent) {
return `${hash}@${getProcessUid()}`;
}

return hash;
}

class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
Expand Down Expand Up @@ -237,14 +244,14 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
// nothing to release
}

public async retrieveForProcessing(queryKey: string, _processingId: string): Promise<RetrieveForProcessingResponse> {
public async retrieveForProcessing(queryKeyHashed: string, _processingId: string): Promise<RetrieveForProcessingResponse> {
const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [
this.options.concurrency,
this.prefixKey(queryKey),
this.prefixKey(queryKeyHashed),
]);
if (rows && rows.length) {
const addedCount = 1;
const active = [this.redisHash(queryKey)];
const active = [queryKeyHashed];
const toProcess = 0;
const lockAcquired = true;
const def = this.decodeQueryDefFromRow(rows[0], 'retrieveForProcessing');
Expand Down Expand Up @@ -272,7 +279,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
public async setResultAndRemoveQuery(queryKey: string, executionResult: any, _processingId: any): Promise<boolean> {
await this.driver.query('QUEUE ACK ? ? ', [
this.prefixKey(queryKey),
JSON.stringify(executionResult)
executionResult ? JSON.stringify(executionResult) : executionResult
]);

return true;
Expand Down
3 changes: 1 addition & 2 deletions packages/cubejs-query-orchestrator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
"moment-range": "^4.0.2",
"moment-timezone": "^0.5.33",
"ramda": "^0.27.2",
"redis": "^3.0.2",
"uuid": "^8.3.2"
"redis": "^3.0.2"
},
"devDependencies": {
"@cubejs-backend/linter": "^0.31.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,11 @@ export class QueryCache {
if (!persistent) {
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
} else {
const _stream = queue.getQueryStream(cacheKey, aliasNameToMember);
const stream = queue.setQueryStream(cacheKey, aliasNameToMember);
// we don't want to handle error here as we want it to bubble up
// to the api gateway
queue.executeInQueue('stream', cacheKey, _query, priority, opt);
return _stream;
return stream;
}
}

Expand Down
76 changes: 44 additions & 32 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import R from 'ramda';
import { getEnv } from '@cubejs-backend/shared';
import { getEnv, getProcessUid } from '@cubejs-backend/shared';
import { QueueDriverInterface } from '@cubejs-backend/base-driver';
import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';

import { TimeoutError } from './TimeoutError';
import { ContinueWaitError } from './ContinueWaitError';
import { RedisQueueDriver } from './RedisQueueDriver';
import { LocalQueueDriver } from './LocalQueueDriver';
import { getProcessUid } from './utils';
import { QueryStream } from './QueryStream';

/**
Expand Down Expand Up @@ -140,20 +139,30 @@ export class QueryQueue {
/**
* Returns stream object which will be used to pipe data from data source.
*
* @param {*} queryKeyHash
*/
getQueryStream(queryKeyHash) {
if (!this.streams.queued.has(queryKeyHash)) {
throw new Error(`Unable to find stream for persisted query with id: ${queryKeyHash}`);
}

return this.streams.queued.get(queryKeyHash);
}

/**
* @param {*} queryKey
* @param {{ [alias: string]: string }} aliasNameToMember
*/
getQueryStream(queryKey, aliasNameToMember) {
setQueryStream(queryKey, aliasNameToMember) {
const key = this.redisHash(queryKey);
if (!this.streams.queued.has(key)) {
const _stream = new QueryStream({
key,
maps: this.streams,
aliasNameToMember,
});
this.streams.queued.set(key, _stream);
}
return this.streams.queued.get(key);
const stream = new QueryStream({
key,
maps: this.streams,
aliasNameToMember,
});
this.streams.queued.set(key, stream);

return stream;
}

/**
Expand Down Expand Up @@ -559,7 +568,7 @@ export class QueryQueue {
R.pipe(
R.filter(p => {
if (active.indexOf(p) === -1) {
const subKeys = p.split('::');
const subKeys = p.split('@');
if (subKeys.length === 1) {
// common queries
return true;
Expand Down Expand Up @@ -729,10 +738,10 @@ export class QueryQueue {
* Processing query specified by the `queryKey`. This method incapsulate most
* of the logic related with the queues updates, heartbeating, etc.
*
* @param {string} queryKey
* @param {string} queryKeyHashed
* @return {Promise<{ result: undefined | Object, error: string | undefined }>}
*/
async processQuery(queryKey) {
async processQuery(queryKeyHashed) {
const queueConnection = await this.queueDriver.createConnection();

let insertedCount;
Expand All @@ -743,15 +752,15 @@ export class QueryQueue {
let processingLockAcquired;
try {
const processingId = await queueConnection.getNextProcessingId();
const retrieveResult = await queueConnection.retrieveForProcessing(queryKey, processingId);
const retrieveResult = await queueConnection.retrieveForProcessing(queryKeyHashed, processingId);

if (retrieveResult) {
[insertedCount, _removedCount, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
}

const activated = activeKeys && activeKeys.indexOf(this.redisHash(queryKey)) !== -1;
const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1;
if (!query) {
query = await queueConnection.getQueryDef(this.redisHash(queryKey));
query = await queueConnection.getQueryDef(queryKeyHashed);
}

if (query && insertedCount && activated && processingLockAcquired) {
Expand All @@ -771,19 +780,22 @@ export class QueryQueue {
preAggregation: query.query?.preAggregation,
addedToQueueTime: query.addedToQueueTime,
});
await queueConnection.optimisticQueryUpdate(queryKey, { startQueryTime }, processingId);
await queueConnection.optimisticQueryUpdate(queryKeyHashed, { startQueryTime }, processingId);

const heartBeatTimer = setInterval(
() => queueConnection.updateHeartBeat(queryKey),
() => queueConnection.updateHeartBeat(queryKeyHashed),
this.heartBeatInterval * 1000
);
try {
const handler = query?.queryHandler;
let target;
switch (handler) {
case 'stream':
target = this.getQueryStream(this.redisHash(queryKey));
await this.queryTimeout(this.queryHandlers.stream(query.query, target));
await this.queryTimeout(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed due to linter.

this.queryHandlers.stream(query.query, this.getQueryStream(queryKeyHashed))
);

// CubeStore has special handling for null
executionResult = null;
break;
default:
executionResult = {
Expand All @@ -792,7 +804,7 @@ export class QueryQueue {
query.query,
async (cancelHandler) => {
try {
return queueConnection.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId);
return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId);
} catch (e) {
this.logger('Error while query update', {
queryKey: query.queryKey,
Expand Down Expand Up @@ -848,7 +860,7 @@ export class QueryQueue {
error: (e.stack || e).toString()
});
if (e instanceof TimeoutError) {
const queryWithCancelHandle = await queueConnection.getQueryDef(queryKey);
const queryWithCancelHandle = await queueConnection.getQueryDef(queryKeyHashed);
if (queryWithCancelHandle) {
this.logger('Cancelling query due to timeout', {
processingId,
Expand All @@ -868,7 +880,7 @@ export class QueryQueue {

clearInterval(heartBeatTimer);

if (!(await queueConnection.setResultAndRemoveQuery(queryKey, executionResult, processingId))) {
if (!(await queueConnection.setResultAndRemoveQuery(queryKeyHashed, executionResult, processingId))) {
this.logger('Orphaned execution result', {
processingId,
warn: 'Result for query was not set due to processing lock wasn\'t acquired',
Expand All @@ -887,7 +899,7 @@ export class QueryQueue {
} else {
this.logger('Skip processing', {
processingId,
queryKey: query && query.queryKey || queryKey,
queryKey: query && query.queryKey || queryKeyHashed,
requestId: query && query.requestId,
queuePrefix: this.redisQueuePrefix,
processingLockAcquired,
Expand All @@ -899,15 +911,15 @@ export class QueryQueue {
});
// closing stream
if (query?.queryHandler === 'stream') {
const stream = this.getQueryStream(this.redisHash(queryKey));
const stream = this.getQueryStream(queryKeyHashed);
stream.destroy();
}
const currentProcessingId = await queueConnection.freeProcessingLock(queryKey, processingId, activated);
const currentProcessingId = await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated);
if (currentProcessingId) {
this.logger('Skipping free processing lock', {
processingId,
currentProcessingId,
queryKey: query && query.queryKey || queryKey,
queryKey: query && query.queryKey || queryKeyHashed,
requestId: query && query.requestId,
queuePrefix: this.redisQueuePrefix,
processingLockAcquired,
Expand All @@ -921,7 +933,7 @@ export class QueryQueue {
}
} catch (e) {
this.logger('Queue storage error', {
queryKey: query && query.queryKey || queryKey,
queryKey: query && query.queryKey || queryKeyHashed,
requestId: query && query.requestId,
error: (e.stack || e).toString(),
queuePrefix: this.redisQueuePrefix
Expand Down
17 changes: 3 additions & 14 deletions packages/cubejs-query-orchestrator/src/orchestrator/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/* eslint-disable no-restricted-syntax */
import * as querystring from 'querystring';
import { v1, v5 } from 'uuid';
import crypto from 'crypto';

import { getProcessUid } from '@cubejs-backend/shared';

function parseHostPort(addr: string): { host: string, port: number } {
if (addr.includes(':')) {
const parts = addr.split(':');
Expand Down Expand Up @@ -178,18 +179,6 @@ export function parseRedisUrl(url: Readonly<string>): RedisParsedResult {
return parseUrl(url, result, parseHostPartBasic);
}

/**
* Unique process ID (aka 00000000-0000-0000-0000-000000000000).
*/
const processUid = v5(v1(), v1()).toString();

/**
* Returns unique process ID.
*/
export function getProcessUid(): string {
return processUid;
}

/**
* Unique process ID regexp.
*/
Expand All @@ -208,7 +197,7 @@ export function getCacheHash(queryKey) {
.digest('hex')
}${
typeof queryKey === 'object' && queryKey.persistent
? `::${getProcessUid()}`
? `@${getProcessUid()}`
: ''
}`;
}
Loading