Skip to content

Commit

Permalink
feat(query-orchestrator): Queue - improve performance (#7983)
Browse files Browse the repository at this point in the history
Small optimization: Skipping getQueryDef in case when it's a new item in the queue.
It reduces one request to the Cube Store on each call for executeInQueue.
  • Loading branch information
ovr committed Mar 22, 2024
1 parent d3b21c4 commit 56d48fb
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 21 deletions.
18 changes: 18 additions & 0 deletions packages/cubejs-backend-shared/src/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@ import crypto from 'crypto';

import { Optional } from './type-helpers';

export type PromiseLock = {
promise: Promise<void>,
resolve: () => void,
};

export function createPromiseLock(): PromiseLock {
let resolve: any = null;

return {
promise: new Promise<void>((resolver) => {
resolve = resolver;
}),
resolve: () => {
resolve();
}
};
}

export type CancelablePromiseCancel = (waitExecution?: boolean) => Promise<any>;

export interface CancelablePromise<T> extends Promise<T> {
Expand Down
55 changes: 34 additions & 21 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ export class QueryQueue {
processUid: this.processUid,
};

const queueDriverFactory = options.queueDriverFactory || factoryQueueDriver;

/**
* @type {QueueDriverInterface}
*/
this.queueDriver = factoryQueueDriver(options.cacheAndQueueDriver, queueDriverOptions);
this.queueDriver = queueDriverFactory(options.cacheAndQueueDriver, queueDriverOptions);
/**
* @protected
* @type {boolean}
Expand Down Expand Up @@ -264,6 +266,15 @@ export class QueryQueue {
);

if (added > 0) {
waitingContext = {
queueId,
spanId: options.spanId,
queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: options.requestId,
waitingForRequestId: options.requestId
};

this.logger('Added to queue', {
queueId,
spanId: options.spanId,
Expand All @@ -284,28 +295,30 @@ export class QueryQueue {

await this.reconcileQueue();

const queryDef = await queueConnection.getQueryDef(queryKeyHash, queueId);
const [active, toProcess] = await queueConnection.getQueryStageState(true);
if (!added) {
const queryDef = await queueConnection.getQueryDef(queryKeyHash, queueId);
if (queryDef) {
waitingContext = {
queueId,
spanId: options.spanId,
queryKey: queryDef.queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: options.requestId,
waitingForRequestId: queryDef.requestId
};
}
}

if (queryDef) {
waitingContext = {
queueId,
spanId: options.spanId,
queryKey: queryDef.queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: options.requestId,
waitingForRequestId: queryDef.requestId
};
const [active, toProcess] = await queueConnection.getQueryStageState(true);

this.logger('Waiting for query', {
...waitingContext,
queueSize,
activeQueryKeys: active,
toProcessQueryKeys: toProcess,
active: active.indexOf(queryKeyHash) !== -1,
queueIndex: toProcess.indexOf(queryKeyHash),
});
}
this.logger('Waiting for query', {
...waitingContext,
queueSize,
activeQueryKeys: active,
toProcessQueryKeys: toProcess,
active: active.indexOf(queryKeyHash) !== -1,
queueIndex: toProcess.indexOf(queryKeyHash),
});

// Stream processing goes here under assumption there's no way of a stream close just after it was added to the `streams` map.
// Otherwise `streamStarted` event listener should go before the `reconcileQueue` call.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
import crypto from 'crypto';
import { createPromiseLock, pausePromise } from '@cubejs-backend/shared';
import { QueueDriverConnectionInterface, QueueDriverInterface, } from '@cubejs-backend/base-driver';
import { LocalQueueDriver, QueryQueue } from '../../src';

export type QueryQueueTestOptions = {
cacheAndQueueDriver?: string,
redisPool?: any,
cubeStoreDriverFactory?: () => Promise<CubeStoreDriver>,
beforeAll?: () => Promise<void>,
afterAll?: () => Promise<void>,
};

function patchQueueDriverConnectionForTrack(connection: QueueDriverConnectionInterface, counters: any): QueueDriverConnectionInterface {
function wrapAsyncMethod(methodName: string): any {
return async function (...args) {
if (!(methodName in counters.methods)) {
counters.methods[methodName] = {
started: 1,
finished: 0,
};
} else {
counters.methods[methodName].started++;
}

const result = await connection[methodName](...args);
counters.methods[methodName].finished++;

return result;
};
}

return {
...connection,
addToQueue: wrapAsyncMethod('addToQueue'),
getResult: wrapAsyncMethod('getResult'),
getQueriesToCancel: wrapAsyncMethod('getQueriesToCancel'),
getActiveAndToProcess: wrapAsyncMethod('getActiveAndToProcess'),
retrieveForProcessing: wrapAsyncMethod('retrieveForProcessing'),
getQueryDef: wrapAsyncMethod('getQueryDef'),
setResultAndRemoveQuery: wrapAsyncMethod('setResultAndRemoveQuery'),
getQueryStageState: wrapAsyncMethod('getQueryStageState'),
getResultBlocking: wrapAsyncMethod('getResultBlocking'),
freeProcessingLock: wrapAsyncMethod('freeProcessingLock'),
optimisticQueryUpdate: wrapAsyncMethod('optimisticQueryUpdate'),
getQueryAndRemove: wrapAsyncMethod('getQueryAndRemove'),
getNextProcessingId: wrapAsyncMethod('getNextProcessingId'),
release: connection.release,
};
}

function patchQueueDriverForTrack(driver: QueueDriverInterface, counters: any): QueueDriverInterface {
return {
...driver,
createConnection: async () => {
counters.connections++;

return patchQueueDriverConnectionForTrack(await driver.createConnection(), counters);
},
redisHash: (...args) => driver.redisHash(...args),
release: async (...args) => {
counters.connections--;

return driver.release(...args);
},
};
}

export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions = {}) {
(async () => {
if (options.beforeAll) {
await options.beforeAll();
}

const createBenchmark = async (benchSettings: { totalQueries: number, queueResponseSize: number, queuePayloadSize: number, currency: number }) => {
const counters = {
connections: 0,
methods: {},
events: {},
queueStarted: 0,
queueResolved: 0,
handlersStarted: 0,
handlersFinished: 0,
queueDriverQueriesStarted: 0,
};

const queueDriverFactory = (driverType, queueDriverOptions) => {
switch (driverType) {
case 'memory':
return patchQueueDriverForTrack(
new LocalQueueDriver(
queueDriverOptions
) as any,
counters
);
case 'cubestore':
return patchQueueDriverForTrack(
new CubeStoreQueueDriver(
async () => options.cubeStoreDriverFactory(),
queueDriverOptions
),
counters
);
default:
throw new Error(`Unsupported driver: ${driverType}`);
}
};

const tenantPrefix = crypto.randomBytes(6).toString('hex');
const queue = new QueryQueue(`${tenantPrefix}#test_query_queue`, {
queryHandlers: {
query: async (_query) => {
counters.handlersStarted++;
await pausePromise(1500);
counters.handlersFinished++;

return {
payload: 'a'.repeat(benchSettings.queueResponseSize),
};
},
},
continueWaitTimeout: 60 * 2,
executionTimeout: 20,
orphanedTimeout: 60 * 5,
concurrency: benchSettings.currency,
logger: (event, _params) => {
// console.log(event, _params);
// console.log(event);

if (event in counters.events) {
counters.events[event]++;
} else {
counters.events[event] = 1;
}
},
queueDriverFactory,
...options
});

const processingPromisses = [];

async function awaitProcessing() {
// process query can call reconcileQueue
while (await queue.shutdown() || processingPromisses.length) {
console.log('awaitProcessing', {
counters,
processingPromisses: processingPromisses.length
});
await Promise.all(processingPromisses.splice(0));
}
}

const progressIntervalId = setInterval(() => {
console.log('running', {
...counters,
processingPromisses: processingPromisses.length
});
}, 1000);

const lock = createPromiseLock();

const pusherIntervalId = setInterval(async () => {
if (counters.queueStarted >= benchSettings.totalQueries) {
lock.resolve();
clearInterval(pusherIntervalId);

return;
}

counters.queueStarted++;

const queueId = crypto.randomBytes(12).toString('hex');
const running = (async () => {
await queue.executeInQueue('query', queueId, {
// eslint-disable-next-line no-bitwise
payload: 'a'.repeat(benchSettings.queuePayloadSize)
}, 1, {

});

counters.queueResolved++;

// loosing memory for result
return null;
})();

processingPromisses.push(running);
await running;
}, 10);

await lock.promise;
await awaitProcessing();
clearInterval(progressIntervalId);

console.log('Result', {
benchSettings,
...counters,
});
};

await createBenchmark({
currency: 50,
totalQueries: 1_000,
// eslint-disable-next-line no-bitwise
queueResponseSize: 5 << 20,
queuePayloadSize: 256 * 1024,
});

if (options.afterAll) {
await options.afterAll();
}
})();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import 'source-map-support/register';

import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import { QueryQueueBenchmark } from './QueueBench.abstract';

let cubeStoreDriver;

const afterAll = async () => {
if (cubeStoreDriver) {
await cubeStoreDriver.release();
}
};

const cubeStoreDriverFactory = async () => {
if (cubeStoreDriver) {
return cubeStoreDriver;
}

// eslint-disable-next-line no-return-assign
return cubeStoreDriver = new CubeStoreDriver({});
};

const beforeAll = async () => {
await (await cubeStoreDriverFactory()).query('QUEUE TRUNCATE');
};

QueryQueueBenchmark(
'CubeStore Queue',
{
cacheAndQueueDriver: 'cubestore',
cubeStoreDriverFactory,
beforeAll,
afterAll
}
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import 'source-map-support/register';

import { QueryQueueBenchmark } from './QueueBench.abstract';

const afterAll = async () => {
// nothing to do
};

const beforeAll = async () => {
// nothing to do
};

QueryQueueBenchmark(
'Memory Queue',
{
cacheAndQueueDriver: 'memory',
beforeAll,
afterAll
}
);

0 comments on commit 56d48fb

Please sign in to comment.