diff --git a/packages/cubejs-server-core/core/RefreshScheduler.js b/packages/cubejs-server-core/core/RefreshScheduler.js index bbe57778ea2d..8cda31bfaeab 100644 --- a/packages/cubejs-server-core/core/RefreshScheduler.js +++ b/packages/cubejs-server-core/core/RefreshScheduler.js @@ -1,4 +1,5 @@ const uuid = require('uuid/v4'); +const R = require('ramda'); class RefreshScheduler { constructor(serverCore) { @@ -84,7 +85,7 @@ class RefreshScheduler { } async runScheduledRefresh(context, queryingOptions) { - queryingOptions = { timezone: 'UTC', ...queryingOptions }; + queryingOptions = { timezones: [queryingOptions.timezone || 'UTC'], ...queryingOptions }; const { throwErrors, ...restOptions } = queryingOptions; context = { requestId: `scheduler-${context && context.requestId || uuid()}`, ...context }; this.serverCore.logger('Refresh Scheduler Run', { @@ -125,51 +126,129 @@ class RefreshScheduler { if (measuresCount === 0 && dimensionsCount === 0) { return; } - const query = { - ...queryingOptions, - ...( - measuresCount && - { measures: [`${cube}.${Object.keys(cubeFromPath.measures)[0]}`] } - ), - ...( - dimensionsCount && - { dimensions: [`${cube}.${Object.keys(cubeFromPath.dimensions)[0]}`] } - ) - }; - const sqlQuery = await compilerApi.getSql(query); - const orchestratorApi = this.serverCore.getOrchestratorApi(context); - await orchestratorApi.executeQuery({ - ...sqlQuery, - preAggregations: [], - query: 'SELECT 1', // TODO get rid off it - continueWait: true, - renewQuery: true, - requestId: context.requestId - }); + await Promise.all(queryingOptions.timezones.map(async timezone => { + const query = { + ...queryingOptions, + ...( + measuresCount && + { measures: [`${cube}.${Object.keys(cubeFromPath.measures)[0]}`] } + ), + ...( + dimensionsCount && + { dimensions: [`${cube}.${Object.keys(cubeFromPath.dimensions)[0]}`] } + ), + timezone + }; + const sqlQuery = await compilerApi.getSql(query); + const orchestratorApi = this.serverCore.getOrchestratorApi(context); + await orchestratorApi.executeQuery({ + ...sqlQuery, + preAggregations: [], + query: 'SELECT 1', // TODO get rid off it + continueWait: true, + renewQuery: true, + requestId: context.requestId + }); + })); })); } - async refreshPreAggregations(context, compilerApi, queryingOptions) { + async roundRobinRefreshPreAggregationsQueryIterator(context, compilerApi, queryingOptions) { + const { timezones } = queryingOptions; const scheduledPreAggregations = await compilerApi.scheduledPreAggregations(); - await Promise.all(scheduledPreAggregations.map(async preAggregation => { - const queries = await this.refreshQueriesForPreAggregation( - context, compilerApi, preAggregation, queryingOptions - ); - for (let i = queries.length - 1; i >= 0; i--) { - const query = queries[i]; + + let preAggregationCursor = null; + let timezoneCursor = 0; + let partitionCursor = 0; + + const queriesCache = {}; + const finishedPartitions = {}; + scheduledPreAggregations.forEach((p, pi) => { + timezones.forEach((t, ti) => { + finishedPartitions[`${pi}_${ti}`] = false; + }); + }); + const queriesForPreAggregation = async (preAggregationIndex, timezone) => { + const key = `${preAggregationIndex}_${timezone}`; + if (!queriesCache[key]) { + const preAggregation = scheduledPreAggregations[preAggregationIndex]; + queriesCache[key] = this.refreshQueriesForPreAggregation( + context, compilerApi, preAggregation, { ...queryingOptions, timezone } + ); + } + return queriesCache[key]; + }; + + const advance = async () => { + preAggregationCursor = preAggregationCursor != null ? preAggregationCursor + 1 : 0; + if (preAggregationCursor >= scheduledPreAggregations.length) { + preAggregationCursor = 0; + timezoneCursor += 1; + } + + if (timezoneCursor >= timezones.length) { + timezoneCursor = 0; + partitionCursor += 1; + } + + const queries = await queriesForPreAggregation(preAggregationCursor, timezones[timezoneCursor]); + if (partitionCursor < queries.length) { + const queryCursor = queries.length - 1 - partitionCursor; + const query = queries[queryCursor]; const sqlQuery = await compilerApi.getSql(query); - const orchestratorApi = this.serverCore.getOrchestratorApi(context); - await orchestratorApi.executeQuery({ + return { ...sqlQuery, preAggregations: sqlQuery.preAggregations.map( - (p) => ({ ...p, priority: i - queries.length }) + (p) => ({ ...p, priority: queryCursor - queries.length }) ), continueWait: true, renewQuery: true, - requestId: context.requestId - }); + requestId: context.requestId, + timezone: timezones[timezoneCursor] + }; + } else { + finishedPartitions[`${preAggregationCursor}_${timezoneCursor}`] = true; + return null; } - })); + }; + + return { + next: async () => { + let next; + while (Object.keys(finishedPartitions).find(k => !finishedPartitions[k])) { + next = await advance(); + if (next) { + return next; + } + } + return null; + } + }; + } + + async refreshPreAggregations(context, compilerApi, queryingOptions) { + let { concurrency, workerIndices } = queryingOptions; + concurrency = concurrency || 1; + workerIndices = workerIndices || R.range(0, concurrency); + return Promise.all(R.range(0, concurrency) + .filter(workerIndex => workerIndices.indexOf(workerIndex) !== -1) + .map(async workerIndex => { + const queryIterator = await this.roundRobinRefreshPreAggregationsQueryIterator( + context, compilerApi, queryingOptions + ); + for (;;) { + for (let i = 0; i < concurrency; i++) { + const nextQuery = await queryIterator.next(); + if (!nextQuery) { + return; + } + if (i === workerIndex) { + const orchestratorApi = this.serverCore.getOrchestratorApi(context); + await orchestratorApi.executeQuery(nextQuery); + } + } + } + })); } } diff --git a/packages/cubejs-server-core/core/RefreshScheduler.test.js b/packages/cubejs-server-core/core/RefreshScheduler.test.js new file mode 100644 index 000000000000..4c1fb413cc65 --- /dev/null +++ b/packages/cubejs-server-core/core/RefreshScheduler.test.js @@ -0,0 +1,267 @@ +/* eslint-disable no-new */ +/* globals describe,test,expect,jest,beforeAll */ + +const R = require('ramda'); +const RefreshScheduler = require('./RefreshScheduler'); +const CubejsServerCore = require('./index'); +const CompilerApi = require('./CompilerApi'); + +const schemaContent = ` +cube('Foo', { + sql: 'select * from foo', + + measures: { + count: { + type: 'count' + }, + + total: { + sql: 'amount', + type: 'sum' + }, + }, + + dimensions: { + time: { + sql: 'timestamp', + type: 'time' + } + }, + + preAggregations: { + first: { + type: 'rollup', + measureReferences: [count], + timeDimensionReference: time, + granularity: 'day', + partitionGranularity: 'day', + scheduledRefresh: true, + refreshKey: { + every: '1 hour', + updateWindow: '1 day', + incremental: true + } + }, + second: { + type: 'rollup', + measureReferences: [total], + timeDimensionReference: time, + granularity: 'day', + partitionGranularity: 'day', + scheduledRefresh: true, + refreshKey: { + every: '1 hour', + updateWindow: '1 day', + incremental: true + } + }, + } +}); + +cube('Bar', { + sql: 'select * from bar', + + measures: { + count: { + type: 'count' + } + }, + + dimensions: { + time: { + sql: 'timestamp', + type: 'time' + } + }, + + preAggregations: { + first: { + type: 'rollup', + measureReferences: [count], + timeDimensionReference: time, + granularity: 'day', + partitionGranularity: 'day', + scheduledRefresh: true, + refreshKey: { + every: '1 hour', + updateWindow: '1 day', + incremental: true + } + } + } +}); +`; + +const repository = { + localPath: () => __dirname, + dataSchemaFiles: () => Promise.resolve([ + { fileName: 'main.js', content: schemaContent }, + ]), +}; + +class OrchestratorApiMock { + constructor() { + this.createdTables = []; + } + + async executeQuery(query) { + console.log('Executing query', query); + if (query.query && query.query.match(/min\(.*timestamp.*foo/)) { + return { + data: [{ + min: '2020-12-27T00:00:00.000', + }], + }; + } else if (query.query && query.query.match(/max\(.*timestamp.*/)) { + return { + data: [{ + max: '2020-12-31T00:00:00.000', + }], + }; + } else if (query.query && query.query.match(/min\(.*timestamp.*bar/)) { + return { + data: [{ + min: '2020-12-29T00:00:00.000', + }], + }; + } else if (query.query && query.query.match(/max\(.*timestamp.*bar/)) { + return { + data: [{ + max: '2020-12-31T00:00:00.000', + }], + }; + } + + if (query.preAggregations) { + query.preAggregations.forEach(p => { + const timezone = p.loadSql[0].match(/AT TIME ZONE '(.*?)'/)[1]; + if (!this.createdTables.find(t => t.tableName === p.tableName && t.timezone === timezone)) { + this.createdTables.push({ tableName: p.tableName, timezone }); + // eslint-disable-next-line no-throw-literal + throw { error: 'Continue wait' }; + } + }); + } + + return { + data: [], + }; + } +} + +describe('Refresh Scheduler', () => { + const setupScheduler = () => { + const serverCore = new CubejsServerCore({ + dbType: 'postgres', + apiSecret: 'foo', + }); + const compilerApi = new CompilerApi(repository, 'postgres', { + compileContext: {}, + logger: (msg, params) => { + console.log(msg, params); + }, + }); + const orchestratorApi = new OrchestratorApiMock(); + jest.spyOn(serverCore, 'getCompilerApi').mockImplementation(() => compilerApi); + jest.spyOn(serverCore, 'getOrchestratorApi').mockImplementation(() => orchestratorApi); + const refreshScheduler = new RefreshScheduler(serverCore); + return { refreshScheduler, orchestratorApi }; + }; + + test('Round robin pre-aggregation refresh by history priority', async () => { + const { refreshScheduler, orchestratorApi } = setupScheduler(); + const result = [ + { tableName: 'stb_pre_aggregations.foo_first20201231', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201231', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.bar_first20201231', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201230', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201230', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.bar_first20201230', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201229', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201229', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.bar_first20201229', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201228', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201228', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201227', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201227', timezone: 'UTC' }, + ]; + for (let i = 0; i < 1000; i++) { + const refreshResult = await refreshScheduler.runScheduledRefresh(null, { concurrency: 2, workerIndices: [0] }); + expect(orchestratorApi.createdTables).toEqual(R.take((i + 1) * 2, result).filter((x, qi) => qi % 2 === 0)); + if (refreshResult.finished) { + break; + } + } + + for (let i = 0; i < 1000; i++) { + const refreshResult = await refreshScheduler.runScheduledRefresh(null, { concurrency: 2, workerIndices: [1] }); + const prevWorkerResult = result.filter((x, qi) => qi % 2 === 0); + expect(orchestratorApi.createdTables).toEqual( + prevWorkerResult.concat(R.take((i + 1) * 2, result).filter((x, qi) => qi % 2 === 1)) + ); + if (refreshResult.finished) { + break; + } + } + }); + + test('Round robin pre-aggregation with timezones', async () => { + const { refreshScheduler, orchestratorApi } = setupScheduler(); + const result = [ + { tableName: 'stb_pre_aggregations.foo_first20201231', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201231', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.bar_first20201231', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201231', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.foo_second20201231', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.bar_first20201231', timezone: 'America/Los_Angeles' }, + + { tableName: 'stb_pre_aggregations.foo_first20201230', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201230', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.bar_first20201230', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201230', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.foo_second20201230', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.bar_first20201230', timezone: 'America/Los_Angeles' }, + + { tableName: 'stb_pre_aggregations.foo_first20201229', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201229', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.bar_first20201229', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201229', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.foo_second20201229', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.bar_first20201229', timezone: 'America/Los_Angeles' }, + + { tableName: 'stb_pre_aggregations.foo_first20201228', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201228', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201228', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.foo_second20201228', timezone: 'America/Los_Angeles' }, + + { tableName: 'stb_pre_aggregations.foo_first20201227', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_second20201227', timezone: 'UTC' }, + { tableName: 'stb_pre_aggregations.foo_first20201227', timezone: 'America/Los_Angeles' }, + { tableName: 'stb_pre_aggregations.foo_second20201227', timezone: 'America/Los_Angeles' }, + ]; + for (let i = 0; i < 1000; i++) { + const refreshResult = await refreshScheduler.runScheduledRefresh( + null, + { concurrency: 2, workerIndices: [0], timezones: ['UTC', 'America/Los_Angeles'] } + ); + expect(orchestratorApi.createdTables).toEqual(R.take((i + 1) * 2, result).filter((x, qi) => qi % 2 === 0)); + if (refreshResult.finished) { + break; + } + } + + for (let i = 0; i < 1000; i++) { + const refreshResult = await refreshScheduler.runScheduledRefresh( + null, + { concurrency: 2, workerIndices: [1], timezones: ['UTC', 'America/Los_Angeles'] } + ); + const prevWorkerResult = result.filter((x, qi) => qi % 2 === 0); + expect(orchestratorApi.createdTables).toEqual( + prevWorkerResult.concat(R.take((i + 1) * 2, result).filter((x, qi) => qi % 2 === 1)) + ); + if (refreshResult.finished) { + break; + } + } + }); +}); diff --git a/packages/cubejs-server-core/core/index.js b/packages/cubejs-server-core/core/index.js index 3e9e458b1672..774ca96ddecc 100644 --- a/packages/cubejs-server-core/core/index.js +++ b/packages/cubejs-server-core/core/index.js @@ -185,6 +185,7 @@ class CubejsServerCore { process.env.CUBEJS_SCHEDULED_REFRESH_TIMEZONES.split(',').map(t => t.trim()), scheduledRefreshContexts: async () => [null], basePath: '/cubejs-api', + scheduledRefreshConcurrency: parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10), ...options }; if ( @@ -280,14 +281,11 @@ class CubejsServerCore { }); } await Promise.all(contexts.map(async context => { + const queryingOptions = { scheduledRefreshConcurrency: this.options.scheduledRefreshConcurrency }; if (this.scheduledRefreshTimeZones) { - // eslint-disable-next-line no-restricted-syntax - for (const timezone of this.scheduledRefreshTimeZones) { - await this.runScheduledRefresh(context, { timezone }); - } - } else { - await this.runScheduledRefresh(context); + queryingOptions.timezones = this.scheduledRefreshTimeZones; } + await this.runScheduledRefresh(context, queryingOptions); })); }, this.scheduledRefreshTimer diff --git a/packages/cubejs-server-core/core/index.test.js b/packages/cubejs-server-core/core/index.test.js index a2d4619a9d5c..1f348d701898 100644 --- a/packages/cubejs-server-core/core/index.test.js +++ b/packages/cubejs-server-core/core/index.test.js @@ -87,6 +87,7 @@ describe('index.test', () => { updateCompilerCacheKeepAlive: true, telemetry: false, allowUngroupedWithoutPrimaryKey: true, + scheduledRefreshConcurrency: 4, orchestratorOptions: { redisPrefix: 'some-prefix', queryCacheOptions: { diff --git a/packages/cubejs-server-core/core/optionsValidate.js b/packages/cubejs-server-core/core/optionsValidate.js index 7bb8331f683d..df26c91ac606 100644 --- a/packages/cubejs-server-core/core/optionsValidate.js +++ b/packages/cubejs-server-core/core/optionsValidate.js @@ -49,6 +49,7 @@ const schemaOptions = Joi.object().keys({ ), compilerCacheSize: Joi.number().min(0).integer(), maxCompilerCacheKeepAlive: Joi.number().min(0).integer(), + scheduledRefreshConcurrency: Joi.number().min(1).integer(), updateCompilerCacheKeepAlive: Joi.boolean(), telemetry: Joi.boolean(), allowUngroupedWithoutPrimaryKey: Joi.boolean(),