Skip to content

Commit

Permalink
feat: Concurrency controls for scheduled refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Dec 26, 2020
1 parent 1e5bdf5 commit 2132f0d
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 41 deletions.
149 changes: 114 additions & 35 deletions packages/cubejs-server-core/core/RefreshScheduler.js
@@ -1,4 +1,5 @@
const uuid = require('uuid/v4');
const R = require('ramda');

class RefreshScheduler {
constructor(serverCore) {
Expand Down Expand Up @@ -84,7 +85,7 @@ class RefreshScheduler {
}

async runScheduledRefresh(context, queryingOptions) {
queryingOptions = { timezone: 'UTC', ...queryingOptions };
queryingOptions = { timezones: [queryingOptions.timezone || 'UTC'], ...queryingOptions };
const { throwErrors, ...restOptions } = queryingOptions;
context = { requestId: `scheduler-${context && context.requestId || uuid()}`, ...context };
this.serverCore.logger('Refresh Scheduler Run', {
Expand Down Expand Up @@ -125,51 +126,129 @@ class RefreshScheduler {
if (measuresCount === 0 && dimensionsCount === 0) {
return;
}
const query = {
...queryingOptions,
...(
measuresCount &&
{ measures: [`${cube}.${Object.keys(cubeFromPath.measures)[0]}`] }
),
...(
dimensionsCount &&
{ dimensions: [`${cube}.${Object.keys(cubeFromPath.dimensions)[0]}`] }
)
};
const sqlQuery = await compilerApi.getSql(query);
const orchestratorApi = this.serverCore.getOrchestratorApi(context);
await orchestratorApi.executeQuery({
...sqlQuery,
preAggregations: [],
query: 'SELECT 1', // TODO get rid off it
continueWait: true,
renewQuery: true,
requestId: context.requestId
});
await Promise.all(queryingOptions.timezones.map(async timezone => {
const query = {
...queryingOptions,
...(
measuresCount &&
{ measures: [`${cube}.${Object.keys(cubeFromPath.measures)[0]}`] }
),
...(
dimensionsCount &&
{ dimensions: [`${cube}.${Object.keys(cubeFromPath.dimensions)[0]}`] }
),
timezone
};
const sqlQuery = await compilerApi.getSql(query);
const orchestratorApi = this.serverCore.getOrchestratorApi(context);
await orchestratorApi.executeQuery({
...sqlQuery,
preAggregations: [],
query: 'SELECT 1', // TODO get rid off it
continueWait: true,
renewQuery: true,
requestId: context.requestId
});
}));
}));
}

async refreshPreAggregations(context, compilerApi, queryingOptions) {
async roundRobinRefreshPreAggregationsQueryIterator(context, compilerApi, queryingOptions) {
const { timezones } = queryingOptions;
const scheduledPreAggregations = await compilerApi.scheduledPreAggregations();
await Promise.all(scheduledPreAggregations.map(async preAggregation => {
const queries = await this.refreshQueriesForPreAggregation(
context, compilerApi, preAggregation, queryingOptions
);
for (let i = queries.length - 1; i >= 0; i--) {
const query = queries[i];

let preAggregationCursor = null;
let timezoneCursor = 0;
let partitionCursor = 0;

const queriesCache = {};
const finishedPartitions = {};
scheduledPreAggregations.forEach((p, pi) => {
timezones.forEach((t, ti) => {
finishedPartitions[`${pi}_${ti}`] = false;
});
});
const queriesForPreAggregation = async (preAggregationIndex, timezone) => {
const key = `${preAggregationIndex}_${timezone}`;
if (!queriesCache[key]) {
const preAggregation = scheduledPreAggregations[preAggregationIndex];
queriesCache[key] = this.refreshQueriesForPreAggregation(
context, compilerApi, preAggregation, { ...queryingOptions, timezone }
);
}
return queriesCache[key];
};

const advance = async () => {
preAggregationCursor = preAggregationCursor != null ? preAggregationCursor + 1 : 0;
if (preAggregationCursor >= scheduledPreAggregations.length) {
preAggregationCursor = 0;
timezoneCursor += 1;
}

if (timezoneCursor >= timezones.length) {
timezoneCursor = 0;
partitionCursor += 1;
}

const queries = await queriesForPreAggregation(preAggregationCursor, timezones[timezoneCursor]);
if (partitionCursor < queries.length) {
const queryCursor = queries.length - 1 - partitionCursor;
const query = queries[queryCursor];
const sqlQuery = await compilerApi.getSql(query);
const orchestratorApi = this.serverCore.getOrchestratorApi(context);
await orchestratorApi.executeQuery({
return {
...sqlQuery,
preAggregations: sqlQuery.preAggregations.map(
(p) => ({ ...p, priority: i - queries.length })
(p) => ({ ...p, priority: queryCursor - queries.length })
),
continueWait: true,
renewQuery: true,
requestId: context.requestId
});
requestId: context.requestId,
timezone: timezones[timezoneCursor]
};
} else {
finishedPartitions[`${preAggregationCursor}_${timezoneCursor}`] = true;
return null;
}
}));
};

return {
next: async () => {
let next;
while (Object.keys(finishedPartitions).find(k => !finishedPartitions[k])) {
next = await advance();
if (next) {
return next;
}
}
return null;
}
};
}

async refreshPreAggregations(context, compilerApi, queryingOptions) {
let { concurrency, workerIndices } = queryingOptions;
concurrency = concurrency || 1;
workerIndices = workerIndices || R.range(0, concurrency);
return Promise.all(R.range(0, concurrency)
.filter(workerIndex => workerIndices.indexOf(workerIndex) !== -1)
.map(async workerIndex => {
const queryIterator = await this.roundRobinRefreshPreAggregationsQueryIterator(
context, compilerApi, queryingOptions
);
for (;;) {
for (let i = 0; i < concurrency; i++) {
const nextQuery = await queryIterator.next();
if (!nextQuery) {
return;
}
if (i === workerIndex) {
const orchestratorApi = this.serverCore.getOrchestratorApi(context);
await orchestratorApi.executeQuery(nextQuery);
}
}
}
}));
}
}

Expand Down

0 comments on commit 2132f0d

Please sign in to comment.