From 4e8400bae7cb33719b3e200c56e71b1d632d862f Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 15:31:10 +0200 Subject: [PATCH 01/35] spelling --- packages/cubejs-api-gateway/src/gateway.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 10e8889ccfd8d..b5da22c011c13 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -895,7 +895,7 @@ class ApiGateway { if (result.length === 0) { throw new UserError( 'A user\'s selector doesn\'t match any of the ' + - 'pre-aggregations described by the Cube schemas.' + 'pre-aggregations defined by the Cube schemas.' ); } break; From 3dd807505d85a657e7de216efad7c919be089737 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 15:31:25 +0200 Subject: [PATCH 02/35] add dateRange to PreAggsJobsRequest --- packages/cubejs-api-gateway/src/types/request.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cubejs-api-gateway/src/types/request.ts b/packages/cubejs-api-gateway/src/types/request.ts index 3393554fd8d65..827a4a9e76cac 100644 --- a/packages/cubejs-api-gateway/src/types/request.ts +++ b/packages/cubejs-api-gateway/src/types/request.ts @@ -153,6 +153,7 @@ type PreAggsSelector = { dataSources?: string[], cubes?: string[], preAggregations?: string[], + dateRange?: [string, string], // We expect only single date Range for rebuilding }; /** From 5fc7e353878bbce7ef427600f19bea8493ec54d4 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 15:49:43 +0200 Subject: [PATCH 03/35] pass dateRange to preAggregations(filter) --- packages/cubejs-api-gateway/src/gateway.ts | 46 +++++++++---------- .../src/compiler/CubeEvaluator.ts | 16 +++---- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index b5da22c011c13..2aa92add1cf38 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -926,30 +926,30 @@ class ApiGateway { selector: PreAggsSelector, ): Promise { let jobs: string[] = []; - if (!selector.contexts?.length) { - jobs = await this.postPreAggregationsBuildJobs( - context, - selector, - ); - } else { - const promise = Promise.all( - selector.contexts.map(async (config) => { - const ctx = { - ...context, - ...config, - }; - const _jobs = await this.postPreAggregationsBuildJobs( - ctx, - selector, - ); - return _jobs; - }) - ); - const resolve = await promise; - resolve.forEach((_jobs) => { - jobs = jobs.concat(_jobs); - }); + + // For the sake of type check, as contexts are checked in preAggregationsJobs() + if (!selector.contexts) { + return jobs; } + + const promise = Promise.all( + selector.contexts.map(async (config) => { + const ctx = { + ...context, + ...config, + }; + const _jobs = await this.postPreAggregationsBuildJobs( + ctx, + selector, + ); + return _jobs; + }) + ); + const resolve = await promise; + resolve.forEach((_jobs) => { + jobs = jobs.concat(_jobs); + }); + return jobs; } diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts index abcc94ad603c5..1deacf997bb2e 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts @@ -58,6 +58,13 @@ export type MeasureDefinition = { timeShiftReferences?: TimeShiftDefinitionReference[], }; +export type PreAggregationFilters = { + dataSources?: string[], + cubes?: string[], + preAggregationIds?: string[], + scheduled?: boolean, +}; + export class CubeEvaluator extends CubeSymbols { public evaluatedCubes: Record = {}; @@ -484,15 +491,8 @@ export class CubeEvaluator extends CubeSymbols { /** * Returns pre-aggregations filtered by the specified selector. - * @param {{ - * scheduled: boolean, - * dataSource: Array, - * cubes: Array, - * preAggregationIds: Array - * }} filter pre-aggregations selector - * @returns {*} */ - public preAggregations(filter) { + public preAggregations(filter: PreAggregationFilters) { const { scheduled, dataSources, cubes, preAggregationIds } = filter || {}; const idFactory = ({ cube, preAggregationName }) => `${cube}.${preAggregationName}`; From e1bf5af137acbaa6d79bde2cbce1a25f0937d464 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 15:51:43 +0200 Subject: [PATCH 04/35] a bit of code polishment --- .../cubejs-schema-compiler/src/compiler/CubeEvaluator.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts index 1deacf997bb2e..c58823b4ac9c6 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts @@ -601,9 +601,7 @@ export class CubeEvaluator extends CubeSymbols { public isInstanceOfType(type: 'measures' | 'dimensions' | 'segments', path: string | string[]): boolean { const cubeAndName = Array.isArray(path) ? path : path.split('.'); - const symbol = this.evaluatedCubes[cubeAndName[0]] && - this.evaluatedCubes[cubeAndName[0]][type] && - this.evaluatedCubes[cubeAndName[0]][type][cubeAndName[1]]; + const symbol = this.evaluatedCubes[cubeAndName[0]]?.[type]?.[cubeAndName[1]]; return symbol !== undefined; } @@ -655,7 +653,7 @@ export class CubeEvaluator extends CubeSymbols { } public isRbacEnabledForCube(cube: any): boolean { - return cube.accessPolicy && cube.accessPolicy.length; + return cube.accessPolicy?.length; } public isRbacEnabled(): boolean { From a6555248f477f99bb3d29c538d995c82273fa520 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 17:58:16 +0200 Subject: [PATCH 05/35] add preAggsJobsRequestSchema validator --- packages/cubejs-api-gateway/src/gateway.ts | 58 +++++++++------------- packages/cubejs-api-gateway/src/query.js | 25 ++++++++-- 2 files changed, 44 insertions(+), 39 deletions(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 2aa92add1cf38..60dbef62920eb 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -8,6 +8,7 @@ import structuredClone from '@ungap/structured-clone'; import { getEnv, getRealType, + parseLocalDate, QueryAlias, } from '@cubejs-backend/shared'; import { @@ -81,7 +82,9 @@ import { normalizeQuery, normalizeQueryCancelPreAggregations, normalizeQueryPreAggregationPreview, - normalizeQueryPreAggregations, remapToQueryAdapterFormat, + normalizeQueryPreAggregations, + preAggsJobsRequestSchema, + remapToQueryAdapterFormat, } from './query'; import { cachedHandler } from './cached-handler'; import { createJWKsFetcher } from './jwk'; @@ -843,7 +846,6 @@ class ApiGateway { * ] * } * ``` - * TODO (buntarb): selector object validator. */ private async preAggregationsJobs(req: Request, res: ExpressResponse) { const response = this.resToResultFn(res); @@ -853,41 +855,14 @@ class ApiGateway { let result; try { await this.assertApiScope('jobs', req?.context?.securityContext); + + const { error } = preAggsJobsRequestSchema.validate(query); + if (error) { + throw new UserError(`Invalid Job query format: ${error.message || error.toString()}`); + } + switch (query.action) { case 'post': - if ( - !(query.selector).timezones || - (query.selector).timezones.length === 0 - ) { - throw new UserError( - 'A user\'s selector must contain at least one time zone.' - ); - } - if ( - !(query.selector).contexts || - ( - <{securityContext: any}[]>( - query.selector - ).contexts - ).length === 0 - ) { - throw new UserError( - 'A user\'s selector must contain at least one context element.' - ); - } else { - let e = false; - (<{securityContext: any}[]>( - query.selector - ).contexts).forEach((c) => { - if (!c.securityContext) e = true; - }); - if (e) { - throw new UserError( - 'Every context element must contain the ' + - '\'securityContext\' property.' - ); - } - } result = await this.preAggregationsJobsPOST( context, query.selector @@ -932,6 +907,19 @@ class ApiGateway { return jobs; } + // There might be a few contexts but dateRange if present is still the same + // so let's normalize it only once. + // It's expected that selector.dateRange is provided in local time (without timezone) + // At the same time it is ok to get timestamps with `Z` (in UTC). + if (selector.dateRange) { + const start = parseLocalDate(selector.dateRange[0], 'UTC'); + const end = parseLocalDate(selector.dateRange[1], 'UTC'); + if (!start || !end) { + throw new Error(`Cannot parse selector date range ${selector.dateRange}`); + } + selector.dateRange = [start, end]; + } + const promise = Promise.all( selector.contexts.map(async (config) => { const ctx = { diff --git a/packages/cubejs-api-gateway/src/query.js b/packages/cubejs-api-gateway/src/query.js index 02ea1fa56c27b..77ca482b08451 100644 --- a/packages/cubejs-api-gateway/src/query.js +++ b/packages/cubejs-api-gateway/src/query.js @@ -6,9 +6,10 @@ import { getEnv } from '@cubejs-backend/shared'; import { UserError } from './UserError'; import { dateParser } from './dateParser'; import { QueryType } from './types/enums'; +import { PreAggsJobsRequest } from "./types/request"; const getQueryGranularity = (queries) => R.pipe( - R.map(({ timeDimensions }) => timeDimensions[0] && timeDimensions[0].granularity || null), + R.map(({ timeDimensions }) => timeDimensions[0]?.granularity), R.filter(Boolean), R.uniq )(queries); @@ -145,6 +146,22 @@ const normalizeQueryOrder = order => { return result; }; +export const preAggsJobsRequestSchema = Joi.object().keys({ + action: Joi.string().valid('post', 'get').required(), + selector: Joi.object().keys({ + contexts: Joi.array().items( + Joi.object().keys({ + securityContext: Joi.required(), + }) + ).min(1).required(), + timezones: Joi.array().items(Joi.string()).min(1).required(), + dataSources: Joi.array().items(Joi.string()), + cubes: Joi.array().items(Joi.string()), + preAggregations: Joi.array().items(Joi.string()), + dateRange: Joi.array().length(2).items(Joi.string()), + }).optional(), +}); + const DateRegex = /^\d\d\d\d-\d\d-\d\d$/; const normalizeQueryFilters = (filter) => ( @@ -196,9 +213,9 @@ const normalizeQuery = (query, persistent) => { if (error) { throw new UserError(`Invalid query format: ${error.message || error.toString()}`); } - const validQuery = query.measures && query.measures.length || - query.dimensions && query.dimensions.length || - query.timeDimensions && query.timeDimensions.filter(td => !!td.granularity).length; + const validQuery = query.measures?.length || + query.dimensions?.length || + query.timeDimensions?.filter(td => !!td.granularity).length; if (!validQuery) { throw new UserError( 'Query should contain either measures, dimensions or timeDimensions with granularities in order to be valid' From c5fce7d11ebd7205db885a2365e65208be2919af Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 17:58:57 +0200 Subject: [PATCH 06/35] fix PreAggsJobsRequest types --- packages/cubejs-api-gateway/src/gateway.ts | 7 ++++--- packages/cubejs-api-gateway/src/types/request.ts | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 60dbef62920eb..2ae7469aa6666 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -949,7 +949,7 @@ class ApiGateway { selector: PreAggsSelector ): Promise { const compiler = await this.getCompilerApi(context); - const { timezones } = selector; + const { timezones, dateRange } = selector; const preaggs = await compiler.preAggregations({ dataSources: selector.dataSources, cubes: selector.cubes, @@ -965,12 +965,13 @@ class ApiGateway { { metadata: undefined, timezones, + dateRange, preAggregations: preaggs.map(p => ({ id: p.id, - cacheOnly: undefined, // boolean + cacheOnly: false, partitions: undefined, // string[] })), - forceBuildPreAggregations: undefined, + forceBuildPreAggregations: false, throwErrors: false, } ); diff --git a/packages/cubejs-api-gateway/src/types/request.ts b/packages/cubejs-api-gateway/src/types/request.ts index 827a4a9e76cac..4cc3d23d8d6f1 100644 --- a/packages/cubejs-api-gateway/src/types/request.ts +++ b/packages/cubejs-api-gateway/src/types/request.ts @@ -148,7 +148,7 @@ type SqlApiRequest = BaseRequest & { * Pre-aggregations selector object. */ type PreAggsSelector = { - contexts?: {securityContext: any}[], + contexts: {securityContext: any}[], timezones: string[], dataSources?: string[], cubes?: string[], @@ -178,7 +178,7 @@ type PreAggJob = { * The `/cubejs-system/v1/pre-aggregations/jobs` endpoint object type. */ type PreAggsJobsRequest = { - action: 'post' | 'get' | 'delete', + action: 'post' | 'get', selector?: PreAggsSelector, tokens?: string[] resType?: 'object' | 'array' From 3f67b7fbd3e5198062350e18048e562c7696560d Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 18:19:56 +0200 Subject: [PATCH 07/35] specify type for parseLocalDate() --- packages/cubejs-backend-shared/src/time.ts | 4 ++-- packages/cubejs-backend-shared/test/time.test.ts | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-backend-shared/src/time.ts b/packages/cubejs-backend-shared/src/time.ts index 1a39ec5838c94..62ab33b93c3c7 100644 --- a/packages/cubejs-backend-shared/src/time.ts +++ b/packages/cubejs-backend-shared/src/time.ts @@ -251,12 +251,12 @@ export const utcToLocalTimeZone = (timezone: string, timestampFormat: string, ti return moment.tz(timestamp, 'UTC').tz(timezone).format(timestampFormat); }; -export const parseLocalDate = (data: any, timezone: string, timestampFormat: string = 'YYYY-MM-DDTHH:mm:ss.SSS'): string | null => { +export const parseLocalDate = (data: { [key: string]: string }[] | null | undefined, timezone: string, timestampFormat: string = 'YYYY-MM-DDTHH:mm:ss.SSS'): string | null => { if (!data) { return null; } data = JSON.parse(JSON.stringify(data)); - const value = data[0] && data[0][Object.keys(data[0])[0]]; + const value = data?.[0]?.[Object.keys(data[0])[0]]; if (!value) { return null; } diff --git a/packages/cubejs-backend-shared/test/time.test.ts b/packages/cubejs-backend-shared/test/time.test.ts index 4b90b45bc4ace..65174395044a9 100644 --- a/packages/cubejs-backend-shared/test/time.test.ts +++ b/packages/cubejs-backend-shared/test/time.test.ts @@ -267,7 +267,6 @@ describe('extractDate', () => { expect(parseLocalDate(null, timezone)).toBeNull(); expect(parseLocalDate(undefined, timezone)).toBeNull(); expect(parseLocalDate([], timezone)).toBeNull(); - expect(parseLocalDate('', timezone)).toBeNull(); }); it('should return null if no valid date is found in data', () => { From d74ba883334ff40d2572b9fe5ab41eb037386097 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 18:28:27 +0200 Subject: [PATCH 08/35] refresh only matched partitions --- packages/cubejs-api-gateway/src/gateway.ts | 4 +- .../src/core/RefreshScheduler.ts | 75 ++++++++++++------- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 2ae7469aa6666..b8b437e88b2da 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -912,8 +912,8 @@ class ApiGateway { // It's expected that selector.dateRange is provided in local time (without timezone) // At the same time it is ok to get timestamps with `Z` (in UTC). if (selector.dateRange) { - const start = parseLocalDate(selector.dateRange[0], 'UTC'); - const end = parseLocalDate(selector.dateRange[1], 'UTC'); + const start = parseLocalDate([{ val: selector.dateRange[0] }], 'UTC'); + const end = parseLocalDate([{ val: selector.dateRange[1] }], 'UTC'); if (!start || !end) { throw new Error(`Cannot parse selector date range ${selector.dateRange}`); } diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index 942c47f299596..f24300511cd42 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -3,7 +3,10 @@ import pLimit from 'p-limit'; import { v4 as uuidv4 } from 'uuid'; import crypto from 'crypto'; import { Required } from '@cubejs-backend/shared'; -import { PreAggregationDescription } from '@cubejs-backend/query-orchestrator'; +import { + PreAggregationDescription, + PreAggregationPartitionRangeLoader +} from '@cubejs-backend/query-orchestrator'; import { CubejsServerCore } from './server'; import { CompilerApi } from './CompilerApi'; @@ -30,6 +33,7 @@ type ScheduledRefreshQueryingOptions = Required { const orchestratorApi = await this.serverCore.getOrchestratorApi(context); const preAggregations = await this.preAggregationPartitions(context, queryingOptions); + if (queryingOptions.dateRange) { + preAggregations.forEach(preAggregation => { + preAggregation.partitions = preAggregation.partitions + .filter(p => PreAggregationPartitionRangeLoader.intersectDateRanges( + [p.buildRangeStart, p.buildRangeEnd], + queryingOptions.dateRange, + )); + }); + } + const preAggregationsLoadCacheByDataSource = {}; const jobsPromise = Promise.all( - preAggregations.map(async (p: any) => { - const { partitionsWithDependencies } = p; - return Promise.all( - partitionsWithDependencies.map(({ partitions, dependencies }) => ( - Promise.all( - partitions.map( - async (partition): Promise => { - const job = await orchestratorApi.executeQuery({ - preAggregations: dependencies.concat([partition]), - continueWait: true, - renewQuery: false, - forceBuildPreAggregations: true, - orphanedTimeout: 60 * 60, - requestId: context.requestId, - timezone: partition.timezone, - scheduledRefresh: false, - preAggregationsLoadCacheByDataSource, - metadata: queryingOptions.metadata, - isJob: true, - }); - job[0].dataSource = partition.dataSource; - job[0].timezone = partition.timezone; - return job; - } + preAggregations + // Filter out pre-aggs without partitions + .filter(p => p.partitions.length) + .map(async (p: any) => { + const { partitionsWithDependencies } = p; + return Promise.all( + partitionsWithDependencies.map(({ partitions, dependencies }) => ( + Promise.all( + partitions.map( + async (partition): Promise => { + const job = await orchestratorApi.executeQuery({ + preAggregations: dependencies.concat([partition]), + continueWait: true, + renewQuery: false, + forceBuildPreAggregations: true, + orphanedTimeout: 60 * 60, + requestId: context.requestId, + timezone: partition.timezone, + scheduledRefresh: false, + preAggregationsLoadCacheByDataSource, + metadata: queryingOptions.metadata, + isJob: true, + }); + job[0].dataSource = partition.dataSource; + job[0].timezone = partition.timezone; + return job; + } + ) ) - ) - )) - ); - }) + )) + ); + }) ); const jobedPAs = await jobsPromise; From 2be7cbe947e6f1d94fdff1d215d4f42e91c9ad4f Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 18:41:14 +0200 Subject: [PATCH 09/35] add html reporters --- packages/cubejs-api-gateway/package.json | 1 + packages/cubejs-backend-shared/package.json | 1 + packages/cubejs-query-orchestrator/package.json | 1 + 3 files changed, 3 insertions(+) diff --git a/packages/cubejs-api-gateway/package.json b/packages/cubejs-api-gateway/package.json index f4c9873774ce4..8bcafdfdd6ca5 100644 --- a/packages/cubejs-api-gateway/package.json +++ b/packages/cubejs-api-gateway/package.json @@ -73,6 +73,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", diff --git a/packages/cubejs-backend-shared/package.json b/packages/cubejs-backend-shared/package.json index 35621cee1a544..fb796517bae71 100644 --- a/packages/cubejs-backend-shared/package.json +++ b/packages/cubejs-backend-shared/package.json @@ -63,6 +63,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", diff --git a/packages/cubejs-query-orchestrator/package.json b/packages/cubejs-query-orchestrator/package.json index eef1f2e698077..6e5eed0e2fde3 100644 --- a/packages/cubejs-query-orchestrator/package.json +++ b/packages/cubejs-query-orchestrator/package.json @@ -54,6 +54,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", From 21a1cdd598ff3a0777ed0fd0964b9f43ed0b5e04 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 18:43:12 +0200 Subject: [PATCH 10/35] some code polishment --- packages/cubejs-api-gateway/src/dateParser.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-api-gateway/src/dateParser.js b/packages/cubejs-api-gateway/src/dateParser.js index 9db25453e3939..4e9c97bfbeaaf 100644 --- a/packages/cubejs-api-gateway/src/dateParser.js +++ b/packages/cubejs-api-gateway/src/dateParser.js @@ -65,8 +65,7 @@ export function dateParser(dateString, timezone, now = new Date()) { moment.tz(timezone).endOf('day').add(1, 'day') ]; } else if (dateString.match(/^from (.*) to (.*)$/)) { - // eslint-disable-next-line no-unused-vars,@typescript-eslint/no-unused-vars - const [all, from, to] = dateString.match(/^from (.*) to (.*)$/); + const [, from, to] = dateString.match(/^from (.*) to (.*)$/); const current = moment(now).tz(timezone); const fromResults = parse(from, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); @@ -89,7 +88,7 @@ export function dateParser(dateString, timezone, now = new Date()) { momentRange = [momentRange[0].startOf(exactGranularity), momentRange[1].endOf(exactGranularity)]; } else { const results = parse(dateString, new Date(moment().tz(timezone).format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); - if (!results || !results.length) { + if (!results?.length) { throw new UserError(`Can't parse date: '${dateString}'`); } From 03c36345a524ffdc759796b304e03cc7c20ff5a3 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 19:02:05 +0200 Subject: [PATCH 11/35] fix/add some tests to dateParser() --- packages/cubejs-api-gateway/src/dateParser.js | 2 +- .../test/dateParser.test.js | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-api-gateway/src/dateParser.js b/packages/cubejs-api-gateway/src/dateParser.js index 4e9c97bfbeaaf..2452f4c700f85 100644 --- a/packages/cubejs-api-gateway/src/dateParser.js +++ b/packages/cubejs-api-gateway/src/dateParser.js @@ -75,7 +75,7 @@ export function dateParser(dateString, timezone, now = new Date()) { throw new UserError(`Can't parse date: '${from}'`); } - if (!Array.isArray(fromResults) || !fromResults.length) { + if (!Array.isArray(toResults) || !toResults.length) { throw new UserError(`Can't parse date: '${to}'`); } diff --git a/packages/cubejs-api-gateway/test/dateParser.test.js b/packages/cubejs-api-gateway/test/dateParser.test.js index 4b67a1aed26ed..ab082de2fb833 100644 --- a/packages/cubejs-api-gateway/test/dateParser.test.js +++ b/packages/cubejs-api-gateway/test/dateParser.test.js @@ -168,4 +168,28 @@ describe('dateParser', () => { Date.now.mockRestore(); }); + + test('throws error on from invalid date to date', () => { + expect(() => dateParser('from invalid to 2020-02-02', 'UTC')).toThrow( + 'Can\'t parse date: \'invalid\'' + ); + }); + + test('throws error on from date to invalid date', () => { + expect(() => dateParser('from 2020-02-02 to invalid', 'UTC')).toThrow( + 'Can\'t parse date: \'invalid\'' + ); + }); + + test('from 12AM till now by 1 hour', () => { + Date.now = jest.fn().mockReturnValue(new Date(2021, 2, 5, 13, 0, 0, 0)); + expect(dateParser('from 12AM till now by 1 hour', 'UTC', new Date(2021, 2, 5, 13, 0, 0, 0))).toStrictEqual( + [ + '2021-03-05T00:00:00.000', + '2021-03-05T11:59:59.999' + ] + ); + + Date.now.mockRestore(); + }); }); From 625e46612398096dabd4139d343531a38226818b Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 19:18:19 +0200 Subject: [PATCH 12/35] code polishment --- packages/cubejs-api-gateway/src/types/request.ts | 2 +- .../src/orchestrator/QueryOrchestrator.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-api-gateway/src/types/request.ts b/packages/cubejs-api-gateway/src/types/request.ts index 4cc3d23d8d6f1..9ff0ba90121b6 100644 --- a/packages/cubejs-api-gateway/src/types/request.ts +++ b/packages/cubejs-api-gateway/src/types/request.ts @@ -108,7 +108,7 @@ type ResponseResultFn = ( message: (Record | Record[]) | DataResult | ErrorResponse, extra?: { status: number } - ) => void; + ) => void | Promise; /** * Base HTTP request parameters map data type. diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index 1b54cdff03b16..20a4a24c5b3ae 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -379,7 +379,7 @@ export class QueryOrchestrator { preAggregations.map(p => { const { preAggregation } = p.preAggregation; const partition = p.partitions[0]; - preAggregation.dataSource = (partition && partition.dataSource) || 'default'; + preAggregation.dataSource = partition?.dataSource || 'default'; preAggregation.preAggregationsSchema = preAggregationsSchema; return preAggregation; }), From 3a2b6f7a840f7f1a285a8f1af50362f26fb3be3a Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 20:35:51 +0200 Subject: [PATCH 13/35] add another check --- packages/cubejs-api-gateway/src/gateway.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index b8b437e88b2da..cbad75c39ac51 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -856,6 +856,10 @@ class ApiGateway { try { await this.assertApiScope('jobs', req?.context?.securityContext); + if (!query) { + throw new UserError('No job description provided'); + } + const { error } = preAggsJobsRequestSchema.validate(query); if (error) { throw new UserError(`Invalid Job query format: ${error.message || error.toString()}`); From 2aa4f1b6e72cbc42d49babed41be5c1000a1e69d Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 7 Mar 2025 20:47:48 +0200 Subject: [PATCH 14/35] add tests for api-gw preagg jobs endpoint --- packages/cubejs-api-gateway/src/gateway.ts | 9 +- .../cubejs-api-gateway/test/index.test.ts | 92 +++++++++++++++++++ 2 files changed, 94 insertions(+), 7 deletions(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index cbad75c39ac51..0529a773e9ed2 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -856,7 +856,7 @@ class ApiGateway { try { await this.assertApiScope('jobs', req?.context?.securityContext); - if (!query) { + if (!query || Object.keys(query).length === 0) { throw new UserError('No job description provided'); } @@ -906,11 +906,6 @@ class ApiGateway { ): Promise { let jobs: string[] = []; - // For the sake of type check, as contexts are checked in preAggregationsJobs() - if (!selector.contexts) { - return jobs; - } - // There might be a few contexts but dateRange if present is still the same // so let's normalize it only once. // It's expected that selector.dateRange is provided in local time (without timezone) @@ -919,7 +914,7 @@ class ApiGateway { const start = parseLocalDate([{ val: selector.dateRange[0] }], 'UTC'); const end = parseLocalDate([{ val: selector.dateRange[1] }], 'UTC'); if (!start || !end) { - throw new Error(`Cannot parse selector date range ${selector.dateRange}`); + throw new UserError(`Cannot parse selector date range ${selector.dateRange}`); } selector.dateRange = [start, end]; } diff --git a/packages/cubejs-api-gateway/test/index.test.ts b/packages/cubejs-api-gateway/test/index.test.ts index b7436e3093cae..2153442e814ad 100644 --- a/packages/cubejs-api-gateway/test/index.test.ts +++ b/packages/cubejs-api-gateway/test/index.test.ts @@ -17,6 +17,7 @@ import { DataSourceStorageMock, AdapterApiMock } from './mocks'; +import { ApiScopesTuple } from '../src/types/auth'; const logger = (type, message) => console.log({ type, ...message }); @@ -61,6 +62,7 @@ async function createApiGateway( process.env.NODE_ENV = 'unknown'; const app = express(); + app.use(express.json()); apiGateway.initApp(app); return { @@ -982,6 +984,96 @@ describe('API Gateway', () => { }); }); + describe('/v1/pre-aggregations/jobs', () => { + const scheduledRefreshContextsFactory = () => ([ + { securityContext: { foo: 'bar' } }, + { securityContext: { bar: 'foo' } } + ]); + + const scheduledRefreshTimeZonesFactory = () => (['UTC', 'America/Los_Angeles']); + + const appPrepareFactory = async (scope: string[]) => { + const playgroundAuthSecret = 'test12345'; + const { app } = await createApiGateway( + new AdapterApiMock(), + new DataSourceStorageMock(), + { + basePath: '/test', + playgroundAuthSecret, + refreshScheduler: () => new RefreshSchedulerMock(), + scheduledRefreshContexts: () => Promise.resolve(scheduledRefreshContextsFactory()), + scheduledRefreshTimeZones: scheduledRefreshTimeZonesFactory, + contextToApiScopes: () => Promise.resolve(scope) + } + ); + const token = generateAuthToken({ uid: 5, scope }, {}, playgroundAuthSecret); + const tokenUser = generateAuthToken({ uid: 5, scope }, {}, API_SECRET); + + return { app, token, tokenUser }; + }; + + test('no input', async () => { + const { app, tokenUser } = await appPrepareFactory(['graphql', 'data', 'meta', 'jobs']); + + const req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`); + + const res = await req; + expect(res.status).toEqual(400); + expect(res.body.error).toEqual('No job description provided'); + }); + + test('invalid input action', async () => { + const { app, tokenUser } = await appPrepareFactory(['graphql', 'data', 'meta', 'jobs']); + + const req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`) + .send({ action: 'patch' }); + + const res = await req; + expect(res.status).toEqual(400); + expect(res.body.error.includes('Invalid Job query format')).toBeTruthy(); + }); + + test('invalid input date range', async () => { + const { app, tokenUser } = await appPrepareFactory(['graphql', 'data', 'meta', 'jobs']); + + let req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`) + .send({ + action: 'post', + selector: { + contexts: [{ securityContext: {} }], + timezones: ['UTC', 'America/Los_Angeles'], + dateRange: ['invalid string', '2020-02-20'] + } + }); + + let res = await req; + expect(res.status).toEqual(400); + expect(res.body.error.includes('Cannot parse selector date range')).toBeTruthy(); + + req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`) + .send({ + action: 'post', + selector: { + contexts: [{ securityContext: {} }], + timezones: ['UTC', 'America/Los_Angeles'], + dateRange: ['2020-02-20', 'invalid string'] + } + }); + + res = await req; + expect(res.status).toEqual(400); + expect(res.body.error.includes('Cannot parse selector date range')).toBeTruthy(); + }); + }); + describe('healtchecks', () => { test('readyz (standalone)', async () => { const { app, adapterApi } = await createApiGateway(); From c4cf6b2a793abacf9b0f15899736372bc139428c Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 11 Mar 2025 12:56:41 +0200 Subject: [PATCH 15/35] align refactor some types --- .../orchestrator/PreAggregationLoadCache.ts | 4 +-- .../src/orchestrator/PreAggregationLoader.ts | 4 +-- .../PreAggregationPartitionRangeLoader.ts | 6 ++-- .../src/orchestrator/QueryCache.ts | 34 +++++++++---------- .../src/orchestrator/utils.ts | 5 +-- 5 files changed, 26 insertions(+), 27 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 08f77c79b6a95..098d20337a972 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -1,6 +1,6 @@ import { TableStructure } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; -import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; +import { QueryCache, QueryWithParams } from './QueryCache'; import { PreAggregationDescription, PreAggregations, @@ -189,7 +189,7 @@ export class PreAggregationLoadCache { } public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) { - const [query, values, queryOptions]: QueryTuple = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; + const [query, values, queryOptions]: QueryWithParams = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) { this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult( diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts index 5de44242dfff3..16703906b76e1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts @@ -12,7 +12,7 @@ import { UnloadOptions } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; -import { PreAggTableToTempTableNames, QueryCache, QueryTuple } from './QueryCache'; +import { PreAggTableToTempTableNames, QueryCache, QueryWithParams } from './QueryCache'; import { ContinueWaitError } from './ContinueWaitError'; import { LargeStreamWarning } from './StreamObjectsCounter'; import { @@ -30,7 +30,7 @@ import { PreAggregationLoadCache } from './PreAggregationLoadCache'; type IndexesSql = { sql: [string, unknown[]], indexName: string }[]; -type QueryKey = [QueryTuple, IndexesSql, InvalidationKeys] | [QueryTuple, InvalidationKeys]; +type QueryKey = [QueryWithParams, IndexesSql, InvalidationKeys] | [QueryWithParams, InvalidationKeys]; type QueryOptions = { queryKey: QueryKey; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts index b8019ff18d037..f76fd9cc61a99 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -13,7 +13,7 @@ import { } from '@cubejs-backend/shared'; import { InlineTable, TableStructure } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; -import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; +import { QueryCache, QueryWithParams } from './QueryCache'; import { getLastUpdatedAtTimestamp, LAMBDA_TABLE_PREFIX, @@ -83,8 +83,8 @@ export class PreAggregationPartitionRangeLoader { this.compilerCacheFn = options.compilerCacheFn || ((subKey, cacheFn) => cacheFn()); } - private async loadRangeQuery(rangeQuery: QueryTuple, partitionRange?: QueryDateRange) { - const [query, values, queryOptions]: QueryTuple = rangeQuery; + private async loadRangeQuery(rangeQuery: QueryWithParams, partitionRange?: QueryDateRange) { + const [query, values, queryOptions]: QueryWithParams = rangeQuery; const invalidate = this.preAggregation.invalidateKeyQueries?.[0] ? this.preAggregation.invalidateKeyQueries[0].slice(0, 2) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index a3f4ef7deeef4..52eb696dd21e5 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -9,7 +9,7 @@ import { InlineTables, CacheDriverInterface, TableStructure, - DriverInterface, + DriverInterface, QueryKey, } from '@cubejs-backend/base-driver'; import { QueryQueue } from './QueryQueue'; @@ -28,14 +28,12 @@ type QueryOptions = { incremental?: boolean; }; -export type QueryTuple = [ +export type QueryWithParams = [ sql: string, - params: unknown[], + params: string[], options?: QueryOptions ]; -export type QueryWithParams = QueryTuple; - export type Query = { requestId?: string; dataSource: string; @@ -84,7 +82,7 @@ export type PreAggTableToTempTable = [ export type PreAggTableToTempTableNames = [string, { targetTableName: string; }]; -export type CacheKeyItem = string | string[] | QueryTuple | QueryTuple[] | undefined; +export type CacheKeyItem = string | string[] | QueryWithParams | QueryWithParams[] | undefined; export type CacheKey = [CacheKeyItem, CacheKeyItem] | @@ -382,7 +380,7 @@ export class QueryCache { public static replacePreAggregationTableNames( queryAndParams: string | QueryWithParams, preAggregationsTablesToTempTables: PreAggTableToTempTableNames[], - ): string | QueryTuple { + ): string | QueryWithParams { const [keyQuery, params, queryOptions] = Array.isArray(queryAndParams) ? queryAndParams : [queryAndParams, []]; @@ -404,7 +402,7 @@ export class QueryCache { * queries and with the `stream.Writable` instance for the persistent. */ public async queryWithRetryAndRelease( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], { cacheKey, @@ -665,9 +663,9 @@ export class QueryCache { } public startRenewCycle( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], - cacheKeyQueries: (string | QueryTuple)[], + cacheKeyQueries: (string | QueryWithParams)[], expireSecs: number, cacheKey: CacheKey, renewalThreshold: any, @@ -700,9 +698,9 @@ export class QueryCache { } public renewQuery( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], - cacheKeyQueries: (string | QueryTuple)[], + cacheKeyQueries: (string | QueryWithParams)[], expireSecs: number, cacheKey: CacheKey, renewalThreshold: any, @@ -719,7 +717,7 @@ export class QueryCache { ) { options = options || { dataSource: 'default' }; return Promise.all( - this.loadRefreshKeys(cacheKeyQueries, expireSecs, options), + this.loadRefreshKeys(cacheKeyQueries, expireSecs, options), ) .catch(e => { if (e instanceof ContinueWaitError) { @@ -782,11 +780,11 @@ export class QueryCache { } ) { return cacheKeyQueries.map((q) => { - const [query, values, queryOptions]: QueryTuple = Array.isArray(q) ? q : [q, [], {}]; + const [query, values, queryOptions]: QueryWithParams = Array.isArray(q) ? q : [q, [], {}]; return this.cacheQueryResult( query, - values, - [query, values], + values, + [query, values], expireSecs, { renewalThreshold: this.options.refreshKeyRenewalThreshold || queryOptions?.renewalThreshold || 2 * 60, @@ -808,7 +806,7 @@ export class QueryCache { ) => this.cacheDriver.withLock(`lock:${key}`, callback, ttl, true); public async cacheQueryResult( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], cacheKey: CacheKey, expiration: number, @@ -990,7 +988,7 @@ export class QueryCache { return null; } - public queryRedisKey(cacheKey): string { + public queryRedisKey(cacheKey: CacheKey): string { return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey) as any); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts index 5cb71266f1e69..1e9c366a09a61 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts @@ -2,6 +2,7 @@ import crypto from 'crypto'; import { getProcessUid } from '@cubejs-backend/shared'; import { QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver'; +import { CacheKey } from './QueryCache'; /** * Unique process ID regexp. @@ -11,13 +12,13 @@ export const processUidRE = /^[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}- /** * Returns query hash by specified `queryKey`. */ -export function getCacheHash(queryKey: QueryKey, processUid?: string): QueryKeyHash { +export function getCacheHash(queryKey: QueryKey | CacheKey, processUid?: string): QueryKeyHash { processUid = processUid || getProcessUid(); if (typeof queryKey === 'string' && queryKey.length < 256) { return queryKey as any; } - if (typeof queryKey === 'object' && queryKey.persistent) { + if (typeof queryKey === 'object' && 'persistent' in queryKey && queryKey.persistent) { return `${crypto .createHash('md5') .update(JSON.stringify(queryKey)) From 036dc795465af4742b02d06499d08f11bc975498 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 11 Mar 2025 20:09:03 +0200 Subject: [PATCH 16/35] code polish --- packages/cubejs-api-gateway/src/gateway.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 0529a773e9ed2..69e7db825aaf0 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -771,8 +771,8 @@ class ApiGateway { preAggregations: [{ id: preAggregationId }] } ); - const { partitions } = (preAggregationPartitions && preAggregationPartitions[0] || {}); - const preAggregationPartition = partitions && partitions.find(p => p?.tableName === versionEntry.table_name); + const { partitions } = (preAggregationPartitions?.[0] || {}); + const preAggregationPartition = partitions?.find(p => p?.tableName === versionEntry.table_name); res({ preview: preAggregationPartition && await orchestratorApi.getPreAggregationPreview( From a2ef014e7cb38c6c75d81fe47f6ac4aed293ce3a Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 11 Mar 2025 20:12:34 +0200 Subject: [PATCH 17/35] refresh only matched partitions fix --- .../src/core/RefreshScheduler.ts | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index f24300511cd42..777ed886899a4 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -688,10 +688,39 @@ export class RefreshScheduler { if (queryingOptions.dateRange) { preAggregations.forEach(preAggregation => { preAggregation.partitions = preAggregation.partitions - .filter(p => PreAggregationPartitionRangeLoader.intersectDateRanges( - [p.buildRangeStart, p.buildRangeEnd], - queryingOptions.dateRange, - )); + .filter(p => { + if (!p.buildRangeStart && !p.buildRangeEnd) { + return true; // If there is no range specified - we should include it like rebuild in anyway + } + + return PreAggregationPartitionRangeLoader.intersectDateRanges( + [p.buildRangeStart, p.buildRangeEnd], + queryingOptions.dateRange, + ); + }); + preAggregation.partitionsWithDependencies.forEach(pd => { + pd.partitions = pd.partitions.filter(p => { + if (!p.buildRangeStart && !p.buildRangeEnd) { + return true; // If there is no range specified - we should include it like rebuild in any way + } + + return PreAggregationPartitionRangeLoader.intersectDateRanges( + [p.buildRangeStart, p.buildRangeEnd], + queryingOptions.dateRange, + ); + }); + + pd.dependencies = pd.dependencies.filter(p => { + if (!p.buildRangeStart && !p.buildRangeEnd) { + return true; // If there is no range specified - we should include it like rebuild in any way + } + + return PreAggregationPartitionRangeLoader.intersectDateRanges( + [p.buildRangeStart, p.buildRangeEnd], + queryingOptions.dateRange, + ); + }); + }); }); } From f1b7c9df7b065158a501d22ece2f8a8823659c56 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 11 Mar 2025 20:13:26 +0200 Subject: [PATCH 18/35] code polish --- .../src/orchestrator/PreAggregationLoadCache.ts | 4 ++-- .../src/orchestrator/QueryCache.ts | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 098d20337a972..3e9d0947cdc2f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -194,8 +194,8 @@ export class PreAggregationLoadCache { if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) { this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult( query, - values, - [query, values], + values, + [query, values], 60 * 60, { renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 52eb696dd21e5..7f2360b95be65 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -865,7 +865,14 @@ export class QueryCache { }); }).catch(e => { if (!(e instanceof ContinueWaitError)) { - this.logger('Dropping Cache', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId, primaryQuery, renewCycle }); + this.logger('Dropping Cache', { + cacheKey, + error: e.stack || e, + requestId: options.requestId, + spanId, + primaryQuery, + renewCycle + }); this.cacheDriver.remove(redisKey) .catch(err => this.logger('Error removing key', { cacheKey, From 73c584f8a4d4a6304f948225b8ee895b28209158 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 11 Mar 2025 20:14:23 +0200 Subject: [PATCH 19/35] add tests for pre-agg jobs --- .../test/unit/RefreshScheduler.test.ts | 179 +++++++++++++++++- 1 file changed, 175 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts index 6e2f42bb7ba3e..5ec1ccd933716 100644 --- a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts +++ b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts @@ -1,9 +1,8 @@ import R from 'ramda'; import { BaseDriver } from '@cubejs-backend/query-orchestrator'; import { pausePromise, SchemaFileRepository } from '@cubejs-backend/shared'; -import { CubejsServerCore } from '../../src'; -import { RefreshScheduler } from '../../src/core/RefreshScheduler'; -import { CompilerApi } from '../../src/core/CompilerApi'; +import { clearInterval } from 'timers'; +import { CubejsServerCore, CompilerApi, RefreshScheduler } from '../../src'; const schemaContent = ` cube('Foo', { @@ -234,6 +233,16 @@ cube('Bar', { ]), }; +function createDeferred() { + let resolve; + let reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + class MockDriver extends BaseDriver { public tables: any[] = []; @@ -395,7 +404,7 @@ const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertS jest.spyOn(serverCore, 'getCompilerApi').mockImplementation(async () => compilerApi); const refreshScheduler = new RefreshScheduler(serverCore); - return { refreshScheduler, compilerApi, mockDriver }; + return { refreshScheduler, compilerApi, mockDriver, serverCore }; }; describe('Refresh Scheduler', () => { @@ -819,6 +828,168 @@ describe('Refresh Scheduler', () => { expect(refreshResult.finished).toEqual(true); }); + describe('Manual pre-aggregations rebuild via postBuildJobs', () => { + test('All pre-aggregations', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + + const { + refreshScheduler, mockDriver, serverCore + } = setupScheduler({ repository: repositoryWithPreAggregations }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + + let finish = false; + + while (!finish) { + try { + await refreshScheduler.postBuildJobs( + ctx, + { + metadata: undefined, + preAggregations: [], + timezones: ['UTC', 'America/Los_Angeles'], + forceBuildPreAggregations: false, + throwErrors: false, + preAggregationLoadConcurrency: 1, + } + ); + finish = true; + } catch (err: any) { + if (err.error !== 'Continue wait') { + throw err; + } + } + } + + const deferred = createDeferred(); + const orchestrator = await serverCore.getOrchestratorApi(ctx); + + const interval = setInterval(async () => { + const queuedList = await orchestrator.getPreAggregationQueueStates(); + + if (queuedList.length === 0) { + deferred.resolve(); + } + }, 500); + + await deferred.promise; + clearInterval(interval); + + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + }); + + test('Only `first` pre-aggregation', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + + const { + refreshScheduler, mockDriver, serverCore + } = setupScheduler({ repository: repositoryWithPreAggregations }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + + let finish = false; + + while (!finish) { + try { + await refreshScheduler.postBuildJobs( + ctx, + { + metadata: undefined, + preAggregations: [{ id: 'Foo.first' }], + timezones: ['UTC', 'America/Los_Angeles'], + forceBuildPreAggregations: false, + throwErrors: false, + } + ); + finish = true; + } catch (err: any) { + if (err.error !== 'Continue wait') { + throw err; + } + } + } + + const deferred = createDeferred(); + const orchestrator = await serverCore.getOrchestratorApi(ctx); + + const interval = setInterval(async () => { + const queuedList = await orchestrator.getPreAggregationQueueStates(); + + if (queuedList.length === 0) { + deferred.resolve(); + } + }, 500); + + await deferred.promise; + clearInterval(interval); + + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned')).length).toEqual(0); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second')).length).toEqual(0); + }); + + test('Only `first` pre-aggregation with dateRange', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + + const { + refreshScheduler, mockDriver, serverCore + } = setupScheduler({ repository: repositoryWithPreAggregations }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + + let finish = false; + + while (!finish) { + try { + await refreshScheduler.postBuildJobs( + ctx, + { + metadata: undefined, + preAggregations: [{ id: 'Foo.first' }], + timezones: ['UTC', 'America/Los_Angeles'], + dateRange: ['2020-12-29T00:00:00.000', '2021-01-01T00:00:00.000'], + forceBuildPreAggregations: false, + throwErrors: false, + } + ); + finish = true; + } catch (err: any) { + if (err.error !== 'Continue wait') { + throw err; + } + } + } + + const deferred = createDeferred(); + const orchestrator = await serverCore.getOrchestratorApi(ctx); + + const interval = setInterval(async () => { + const queuedList = await orchestrator.getPreAggregationQueueStates(); + + if (queuedList.length === 0) { + deferred.resolve(); + } + }, 500); + + await deferred.promise; + clearInterval(interval); + + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(3); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(3); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned')).length).toEqual(0); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second')).length).toEqual(0); + }); + }); + test('Iterator waits before advance', async () => { process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; From 44d88f8b201dccd4ab51d7f8be2674037502a2f4 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 11 Mar 2025 21:40:01 +0200 Subject: [PATCH 20/35] jest reporters --- packages/cubejs-server-core/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cubejs-server-core/package.json b/packages/cubejs-server-core/package.json index 7aeeecb879193..f56643175ea06 100644 --- a/packages/cubejs-server-core/package.json +++ b/packages/cubejs-server-core/package.json @@ -85,6 +85,7 @@ "/dist/test/setup.js" ], "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", From e72697f4a562c272c350e7cc8385b6090bca1bd4 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 11 Mar 2025 22:00:57 +0200 Subject: [PATCH 21/35] add tests for refreshScheduler.getCachedBuildJobs() --- .../cubejs-server-core/test/unit/RefreshScheduler.test.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts index 5ec1ccd933716..de8fbfae24227 100644 --- a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts +++ b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts @@ -840,10 +840,11 @@ describe('Refresh Scheduler', () => { const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; let finish = false; + let jobs: string[]; while (!finish) { try { - await refreshScheduler.postBuildJobs( + jobs = await refreshScheduler.postBuildJobs( ctx, { metadata: undefined, @@ -882,6 +883,11 @@ describe('Refresh Scheduler', () => { expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned') && o.timezone === 'America/Los_Angeles').length).toEqual(5); expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'UTC').length).toEqual(5); expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + + // Let's also test the getCachedBuildJobs() + const buildJobs = await refreshScheduler.getCachedBuildJobs(ctx, jobs); + const allTokensExist = jobs.every(token => buildJobs.some(job => job.token === token)); + expect(allTokensExist).toBeTruthy(); }); test('Only `first` pre-aggregation', async () => { From 37527701352289e8af532004fd9e50a1838e7275 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 12 Mar 2025 16:52:35 +0200 Subject: [PATCH 22/35] improve caching in pre-agg-load-cache --- .../orchestrator/PreAggregationLoadCache.ts | 51 +++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 3e9d0947cdc2f..710a876493f58 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -16,6 +16,16 @@ type PreAggregationLoadCacheOptions = { tablePrefixes?: string[], }; +function createDeferred() { + let resolve; + let reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + export class PreAggregationLoadCache { private readonly driverFactory: DriverFactory; @@ -25,6 +35,8 @@ export class PreAggregationLoadCache { private readonly queryResults: any; + private queryResultRequests: { [redisKey: string]: { resolve: CallableFunction, reject: CallableFunction }[]} = {}; + private readonly externalDriverFactory: any; private readonly requestId: any; @@ -190,9 +202,26 @@ export class PreAggregationLoadCache { public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) { const [query, values, queryOptions]: QueryWithParams = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; + const queryKey = this.queryCache.queryRedisKey([query, values]); - if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) { - this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult( + // Have result in cache + if (this.queryResults[queryKey]) { + return this.queryResults[queryKey]; + } + + // There is ongoing request + if (this.queryResultRequests[queryKey]) { + const { promise, resolve, reject } = createDeferred(); + this.queryResultRequests[queryKey].push({ resolve, reject }); + + return promise; + } + + // Making query for a first time + this.queryResultRequests[queryKey] = []; + + try { + this.queryResults[queryKey] = await this.queryCache.cacheQueryResult( query, values, [query, values], @@ -209,8 +238,24 @@ export class PreAggregationLoadCache { external: queryOptions?.external } ); + + let r = (this.queryResultRequests[queryKey] || []).pop(); + while (r) { + r.resolve(this.queryResults[queryKey]); + r = this.queryResultRequests[queryKey].pop(); + } + + return this.queryResults[queryKey]; + } catch (err) { + let r = (this.queryResultRequests[queryKey] || []).pop(); + while (r) { + r.reject(err); + r = this.queryResultRequests[queryKey].pop(); + } + throw err; + } finally { + this.queryResultRequests[queryKey] = null; } - return this.queryResults[this.queryCache.queryRedisKey([query, values])]; } public hasKeyQueryResult(keyQuery) { From 4f15cf1a5dd07ed06b56cd289b72effbb3a9f378 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 12 Mar 2025 21:23:20 +0200 Subject: [PATCH 23/35] increase delay in tests to pass --- packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts index de8fbfae24227..b400724af0c58 100644 --- a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts +++ b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts @@ -254,7 +254,8 @@ class MockDriver extends BaseDriver { public cancelledQueries: any[] = []; - private tablesQueryDelay: any; + // FIXME: With small or absent delay 'Manual pre-aggregations rebuild via postBuildJobs' tests fails with incorrect results. + private tablesQueryDelay: any = 200; private schema: any; From c3f26f29a6037c725d00a07eb297eb7af642462e Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 12 Mar 2025 22:37:52 +0200 Subject: [PATCH 24/35] use asyncDebounce for caching in pre-agg-load-cache --- .../orchestrator/PreAggregationLoadCache.ts | 77 +++++-------------- 1 file changed, 21 insertions(+), 56 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 710a876493f58..3c47f9362547b 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -1,4 +1,5 @@ import { TableStructure } from '@cubejs-backend/base-driver'; +import { asyncDebounce } from '@cubejs-backend/shared'; import { DriverFactory } from './DriverFactory'; import { QueryCache, QueryWithParams } from './QueryCache'; import { @@ -16,16 +17,6 @@ type PreAggregationLoadCacheOptions = { tablePrefixes?: string[], }; -function createDeferred() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve, reject }; -} - export class PreAggregationLoadCache { private readonly driverFactory: DriverFactory; @@ -35,8 +26,6 @@ export class PreAggregationLoadCache { private readonly queryResults: any; - private queryResultRequests: { [redisKey: string]: { resolve: CallableFunction, reject: CallableFunction }[]} = {}; - private readonly externalDriverFactory: any; private readonly requestId: any; @@ -56,6 +45,8 @@ export class PreAggregationLoadCache { private readonly tablePrefixes: string[] | null; + private readonly cacheQueryResultDebounced: Function; + public constructor( clientFactory: DriverFactory, queryCache, @@ -73,6 +64,7 @@ export class PreAggregationLoadCache { this.versionEntries = {}; this.tables = {}; this.tableColumnTypes = {}; + this.cacheQueryResultDebounced = asyncDebounce(this.queryCache.cacheQueryResult.bind(this.queryCache)); } protected async tablesFromCache(preAggregation, forceRenew: boolean = false) { @@ -209,53 +201,26 @@ export class PreAggregationLoadCache { return this.queryResults[queryKey]; } - // There is ongoing request - if (this.queryResultRequests[queryKey]) { - const { promise, resolve, reject } = createDeferred(); - this.queryResultRequests[queryKey].push({ resolve, reject }); - - return promise; - } - - // Making query for a first time - this.queryResultRequests[queryKey] = []; - try { - this.queryResults[queryKey] = await this.queryCache.cacheQueryResult( - query, - values, - [query, values], - 60 * 60, - { - renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold - || queryOptions?.renewalThreshold || 2 * 60, - renewalKey: [query, values], - waitForRenew, - priority, - requestId: this.requestId, - dataSource: this.dataSource, - useInMemory: true, - external: queryOptions?.external - } - ); - - let r = (this.queryResultRequests[queryKey] || []).pop(); - while (r) { - r.resolve(this.queryResults[queryKey]); - r = this.queryResultRequests[queryKey].pop(); + this.queryResults[queryKey] = await this.cacheQueryResultDebounced( + query, + values, + [query, values], + 60 * 60, + { + renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold + || queryOptions?.renewalThreshold || 2 * 60, + renewalKey: [query, values], + waitForRenew, + priority, + requestId: this.requestId, + dataSource: this.dataSource, + useInMemory: true, + external: queryOptions?.external } + ); - return this.queryResults[queryKey]; - } catch (err) { - let r = (this.queryResultRequests[queryKey] || []).pop(); - while (r) { - r.reject(err); - r = this.queryResultRequests[queryKey].pop(); - } - throw err; - } finally { - this.queryResultRequests[queryKey] = null; - } + return this.queryResults[queryKey]; } public hasKeyQueryResult(keyQuery) { From fa60a5177890f30ad987851363456c6d6eec7dba Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 12 Mar 2025 23:14:32 +0200 Subject: [PATCH 25/35] Revert "use asyncDebounce for caching in pre-agg-load-cache" This reverts commit a353cf153b62a580d611255ecb97d6344de64abb. --- .../orchestrator/PreAggregationLoadCache.ts | 77 ++++++++++++++----- 1 file changed, 56 insertions(+), 21 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 3c47f9362547b..710a876493f58 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -1,5 +1,4 @@ import { TableStructure } from '@cubejs-backend/base-driver'; -import { asyncDebounce } from '@cubejs-backend/shared'; import { DriverFactory } from './DriverFactory'; import { QueryCache, QueryWithParams } from './QueryCache'; import { @@ -17,6 +16,16 @@ type PreAggregationLoadCacheOptions = { tablePrefixes?: string[], }; +function createDeferred() { + let resolve; + let reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + export class PreAggregationLoadCache { private readonly driverFactory: DriverFactory; @@ -26,6 +35,8 @@ export class PreAggregationLoadCache { private readonly queryResults: any; + private queryResultRequests: { [redisKey: string]: { resolve: CallableFunction, reject: CallableFunction }[]} = {}; + private readonly externalDriverFactory: any; private readonly requestId: any; @@ -45,8 +56,6 @@ export class PreAggregationLoadCache { private readonly tablePrefixes: string[] | null; - private readonly cacheQueryResultDebounced: Function; - public constructor( clientFactory: DriverFactory, queryCache, @@ -64,7 +73,6 @@ export class PreAggregationLoadCache { this.versionEntries = {}; this.tables = {}; this.tableColumnTypes = {}; - this.cacheQueryResultDebounced = asyncDebounce(this.queryCache.cacheQueryResult.bind(this.queryCache)); } protected async tablesFromCache(preAggregation, forceRenew: boolean = false) { @@ -201,26 +209,53 @@ export class PreAggregationLoadCache { return this.queryResults[queryKey]; } + // There is ongoing request + if (this.queryResultRequests[queryKey]) { + const { promise, resolve, reject } = createDeferred(); + this.queryResultRequests[queryKey].push({ resolve, reject }); + + return promise; + } + + // Making query for a first time + this.queryResultRequests[queryKey] = []; - this.queryResults[queryKey] = await this.cacheQueryResultDebounced( - query, - values, - [query, values], - 60 * 60, - { - renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold - || queryOptions?.renewalThreshold || 2 * 60, - renewalKey: [query, values], - waitForRenew, - priority, - requestId: this.requestId, - dataSource: this.dataSource, - useInMemory: true, - external: queryOptions?.external + try { + this.queryResults[queryKey] = await this.queryCache.cacheQueryResult( + query, + values, + [query, values], + 60 * 60, + { + renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold + || queryOptions?.renewalThreshold || 2 * 60, + renewalKey: [query, values], + waitForRenew, + priority, + requestId: this.requestId, + dataSource: this.dataSource, + useInMemory: true, + external: queryOptions?.external + } + ); + + let r = (this.queryResultRequests[queryKey] || []).pop(); + while (r) { + r.resolve(this.queryResults[queryKey]); + r = this.queryResultRequests[queryKey].pop(); } - ); - return this.queryResults[queryKey]; + return this.queryResults[queryKey]; + } catch (err) { + let r = (this.queryResultRequests[queryKey] || []).pop(); + while (r) { + r.reject(err); + r = this.queryResultRequests[queryKey].pop(); + } + throw err; + } finally { + this.queryResultRequests[queryKey] = null; + } } public hasKeyQueryResult(keyQuery) { From 99013fd06148387fda0153357f4a627058502e7b Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 12 Mar 2025 23:14:42 +0200 Subject: [PATCH 26/35] Revert "improve caching in pre-agg-load-cache" This reverts commit f8f640cc45ffb24f949f0d54ed8ebad6f9f9cacf. --- .../orchestrator/PreAggregationLoadCache.ts | 51 ++----------------- 1 file changed, 3 insertions(+), 48 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 710a876493f58..3e9d0947cdc2f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -16,16 +16,6 @@ type PreAggregationLoadCacheOptions = { tablePrefixes?: string[], }; -function createDeferred() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve, reject }; -} - export class PreAggregationLoadCache { private readonly driverFactory: DriverFactory; @@ -35,8 +25,6 @@ export class PreAggregationLoadCache { private readonly queryResults: any; - private queryResultRequests: { [redisKey: string]: { resolve: CallableFunction, reject: CallableFunction }[]} = {}; - private readonly externalDriverFactory: any; private readonly requestId: any; @@ -202,26 +190,9 @@ export class PreAggregationLoadCache { public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) { const [query, values, queryOptions]: QueryWithParams = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; - const queryKey = this.queryCache.queryRedisKey([query, values]); - // Have result in cache - if (this.queryResults[queryKey]) { - return this.queryResults[queryKey]; - } - - // There is ongoing request - if (this.queryResultRequests[queryKey]) { - const { promise, resolve, reject } = createDeferred(); - this.queryResultRequests[queryKey].push({ resolve, reject }); - - return promise; - } - - // Making query for a first time - this.queryResultRequests[queryKey] = []; - - try { - this.queryResults[queryKey] = await this.queryCache.cacheQueryResult( + if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) { + this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult( query, values, [query, values], @@ -238,24 +209,8 @@ export class PreAggregationLoadCache { external: queryOptions?.external } ); - - let r = (this.queryResultRequests[queryKey] || []).pop(); - while (r) { - r.resolve(this.queryResults[queryKey]); - r = this.queryResultRequests[queryKey].pop(); - } - - return this.queryResults[queryKey]; - } catch (err) { - let r = (this.queryResultRequests[queryKey] || []).pop(); - while (r) { - r.reject(err); - r = this.queryResultRequests[queryKey].pop(); - } - throw err; - } finally { - this.queryResultRequests[queryKey] = null; } + return this.queryResults[this.queryCache.queryRedisKey([query, values])]; } public hasKeyQueryResult(keyQuery) { From d91bd5a7158f1057214c4fb02ae7b7b17d98bf42 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 12 Mar 2025 23:20:17 +0200 Subject: [PATCH 27/35] set coverageReporters --- packages/cubejs-schema-compiler/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cubejs-schema-compiler/package.json b/packages/cubejs-schema-compiler/package.json index 131d280ab4140..2955734674e07 100644 --- a/packages/cubejs-schema-compiler/package.json +++ b/packages/cubejs-schema-compiler/package.json @@ -91,6 +91,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", From 74667333026c8c449b3d1c6d6cbc85da3ff7f500 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 13 Mar 2025 11:50:06 +0200 Subject: [PATCH 28/35] add docs --- .../apis-integrations/rest-api/reference.mdx | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/docs/pages/product/apis-integrations/rest-api/reference.mdx b/docs/pages/product/apis-integrations/rest-api/reference.mdx index ea935aa175c31..eb01d4a3d99ef 100644 --- a/docs/pages/product/apis-integrations/rest-api/reference.mdx +++ b/docs/pages/product/apis-integrations/rest-api/reference.mdx @@ -279,6 +279,7 @@ Trigger pre-aggregation build jobs or retrieve statuses of such jobs. | `selector.datasources` | Array of data source names which have pre-aggregations defined | ❌ | | `selector.cubes` | Array of cube names which contain pre-aggregations | ❌ | | `selector.preAggregations` | Array of pre-aggregation names | ❌ | +| `selector.dateRange` | Date Range tuple ['range-date-start', 'range-date-end'] | ❌ | To trigger pre-aggregation builds, send a `POST` request with a payload including `post` as the `action` and `selector` properties. The response will @@ -340,6 +341,27 @@ curl \ https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs ``` +Example request triggering builds of the `main` pre-aggregation defined in the +`orders` cube within date range with some security context data +and an `America/Los_Angeles` timezone: + +```bash{outputLines: 2-13} +curl \ + -d '{ + "action": "post", + "selector": { + "contexts": [{ "securityContext": { "tenantId": "tenant1" } }], + "timezones": ["America/Los_Angeles"], + "preAggregations": ["orders.main"], + "dateRange": ["2020-01-01", "2020-02-01"] + } + }' \ + -H "Authorization: EXAMPLE-API-TOKEN" \ + -H "Content-Type: application/json" \ + -X POST \ + https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs +``` + Example response: ```json From 3b039746e305007028a88ce0b4a98bea2ae9d251 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 13 Mar 2025 12:06:51 +0200 Subject: [PATCH 29/35] fix regexp --- packages/cubejs-api-gateway/src/dateParser.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-api-gateway/src/dateParser.js b/packages/cubejs-api-gateway/src/dateParser.js index 2452f4c700f85..db437fe14da0a 100644 --- a/packages/cubejs-api-gateway/src/dateParser.js +++ b/packages/cubejs-api-gateway/src/dateParser.js @@ -65,7 +65,7 @@ export function dateParser(dateString, timezone, now = new Date()) { moment.tz(timezone).endOf('day').add(1, 'day') ]; } else if (dateString.match(/^from (.*) to (.*)$/)) { - const [, from, to] = dateString.match(/^from (.*) to (.*)$/); + const [, from, to] = dateString.match(/^from\s{1,3}(.{0,50})\s{1,3}to\s{1,3}(.{0,50})$/); const current = moment(now).tz(timezone); const fromResults = parse(from, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); From 2cfecf5bd8dab2b2b049bf7179d916c93ed30c1a Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 13 Mar 2025 13:38:38 +0200 Subject: [PATCH 30/35] fix dateParser + tests --- packages/cubejs-api-gateway/src/dateParser.js | 4 +++- packages/cubejs-api-gateway/test/dateParser.test.js | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-api-gateway/src/dateParser.js b/packages/cubejs-api-gateway/src/dateParser.js index db437fe14da0a..aafb676559a77 100644 --- a/packages/cubejs-api-gateway/src/dateParser.js +++ b/packages/cubejs-api-gateway/src/dateParser.js @@ -87,7 +87,9 @@ export function dateParser(dateString, timezone, now = new Date()) { momentRange = [momentRange[0].startOf(exactGranularity), momentRange[1].endOf(exactGranularity)]; } else { - const results = parse(dateString, new Date(moment().tz(timezone).format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); + const current = moment(now).tz(timezone); + const results = parse(dateString, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); + if (!results?.length) { throw new UserError(`Can't parse date: '${dateString}'`); } diff --git a/packages/cubejs-api-gateway/test/dateParser.test.js b/packages/cubejs-api-gateway/test/dateParser.test.js index ab082de2fb833..46acce9fa8d82 100644 --- a/packages/cubejs-api-gateway/test/dateParser.test.js +++ b/packages/cubejs-api-gateway/test/dateParser.test.js @@ -181,12 +181,12 @@ describe('dateParser', () => { ); }); - test('from 12AM till now by 1 hour', () => { + test('from 12AM till now by hour', () => { Date.now = jest.fn().mockReturnValue(new Date(2021, 2, 5, 13, 0, 0, 0)); - expect(dateParser('from 12AM till now by 1 hour', 'UTC', new Date(2021, 2, 5, 13, 0, 0, 0))).toStrictEqual( + expect(dateParser('2 weeks ago by hour', 'UTC', new Date(Date.UTC(2021, 2, 5, 13, 0, 0, 0)))).toStrictEqual( [ - '2021-03-05T00:00:00.000', - '2021-03-05T11:59:59.999' + '2021-02-19T13:00:00.000', + '2021-02-19T13:59:59.999' ] ); From d25d1d9be502e9d34796ad00b44e984043a199fe Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 13 Mar 2025 15:43:23 +0200 Subject: [PATCH 31/35] fix preAggsJobsRequestSchema validator --- packages/cubejs-api-gateway/src/query.js | 40 ++++++++++++++++-------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/packages/cubejs-api-gateway/src/query.js b/packages/cubejs-api-gateway/src/query.js index 77ca482b08451..a6936f43dcf2c 100644 --- a/packages/cubejs-api-gateway/src/query.js +++ b/packages/cubejs-api-gateway/src/query.js @@ -146,20 +146,34 @@ const normalizeQueryOrder = order => { return result; }; -export const preAggsJobsRequestSchema = Joi.object().keys({ +export const preAggsJobsRequestSchema = Joi.object({ action: Joi.string().valid('post', 'get').required(), - selector: Joi.object().keys({ - contexts: Joi.array().items( - Joi.object().keys({ - securityContext: Joi.required(), - }) - ).min(1).required(), - timezones: Joi.array().items(Joi.string()).min(1).required(), - dataSources: Joi.array().items(Joi.string()), - cubes: Joi.array().items(Joi.string()), - preAggregations: Joi.array().items(Joi.string()), - dateRange: Joi.array().length(2).items(Joi.string()), - }).optional(), + selector: Joi.when('action', { + is: 'post', + then: Joi.object({ + contexts: Joi.array().items( + Joi.object({ + securityContext: Joi.required(), + }) + ).min(1).required(), + timezones: Joi.array().items(Joi.string()).min(1).required(), + dataSources: Joi.array().items(Joi.string()), + cubes: Joi.array().items(Joi.string()), + preAggregations: Joi.array().items(Joi.string()), + dateRange: Joi.array().length(2).items(Joi.string()), + }).optional(), + otherwise: Joi.forbidden(), + }), + tokens: Joi.when('action', { + is: 'get', + then: Joi.array().items(Joi.string()).min(1).required(), + otherwise: Joi.forbidden(), + }), + resType: Joi.when('action', { + is: 'get', + then: Joi.string().valid('object').optional(), + otherwise: Joi.forbidden(), + }), }); const DateRegex = /^\d\d\d\d-\d\d-\d\d$/; From db45d387bdc3d7d68f558ac8248eb2ca529b2f61 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 14 Mar 2025 11:09:20 +0200 Subject: [PATCH 32/35] fix error message Co-authored-by: Igor Lukanin --- packages/cubejs-api-gateway/src/gateway.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 69e7db825aaf0..d6d90e2e7bde1 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -874,7 +874,7 @@ class ApiGateway { if (result.length === 0) { throw new UserError( 'A user\'s selector doesn\'t match any of the ' + - 'pre-aggregations defined by the Cube schemas.' + 'pre-aggregations defined in the data model.' ); } break; From 1cfc7db91b350c8dbd7f2b653a78a6caa8249a02 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 14 Mar 2025 16:15:00 +0200 Subject: [PATCH 33/35] simplify regexp in dateParser --- packages/cubejs-api-gateway/src/dateParser.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-api-gateway/src/dateParser.js b/packages/cubejs-api-gateway/src/dateParser.js index aafb676559a77..dab7896779ad8 100644 --- a/packages/cubejs-api-gateway/src/dateParser.js +++ b/packages/cubejs-api-gateway/src/dateParser.js @@ -65,11 +65,11 @@ export function dateParser(dateString, timezone, now = new Date()) { moment.tz(timezone).endOf('day').add(1, 'day') ]; } else if (dateString.match(/^from (.*) to (.*)$/)) { - const [, from, to] = dateString.match(/^from\s{1,3}(.{0,50})\s{1,3}to\s{1,3}(.{0,50})$/); + const [, from, to] = dateString.match(/^from(.{0,50})to(.{0,50})$/); const current = moment(now).tz(timezone); - const fromResults = parse(from, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); - const toResults = parse(to, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); + const fromResults = parse(from.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); + const toResults = parse(to.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); if (!Array.isArray(fromResults) || !fromResults.length) { throw new UserError(`Can't parse date: '${from}'`); From e1541d78a75fbb8dc85a51bc4402e5d8b49eb582 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 14 Mar 2025 16:15:06 +0200 Subject: [PATCH 34/35] tests polish --- .../test/unit/RefreshScheduler.test.ts | 31 ++++++------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts index b400724af0c58..fa0bd8cc0e8ac 100644 --- a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts +++ b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts @@ -1,7 +1,6 @@ import R from 'ramda'; import { BaseDriver } from '@cubejs-backend/query-orchestrator'; -import { pausePromise, SchemaFileRepository } from '@cubejs-backend/shared'; -import { clearInterval } from 'timers'; +import { pausePromise, SchemaFileRepository, createPromiseLock } from '@cubejs-backend/shared'; import { CubejsServerCore, CompilerApi, RefreshScheduler } from '../../src'; const schemaContent = ` @@ -233,16 +232,6 @@ cube('Bar', { ]), }; -function createDeferred() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve, reject }; -} - class MockDriver extends BaseDriver { public tables: any[] = []; @@ -864,18 +853,18 @@ describe('Refresh Scheduler', () => { } } - const deferred = createDeferred(); + const lock = createPromiseLock(); const orchestrator = await serverCore.getOrchestratorApi(ctx); const interval = setInterval(async () => { const queuedList = await orchestrator.getPreAggregationQueueStates(); if (queuedList.length === 0) { - deferred.resolve(); + lock.resolve(); } }, 500); - await deferred.promise; + await lock.promise; clearInterval(interval); expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5); @@ -923,18 +912,18 @@ describe('Refresh Scheduler', () => { } } - const deferred = createDeferred(); + const lock = createPromiseLock(); const orchestrator = await serverCore.getOrchestratorApi(ctx); const interval = setInterval(async () => { const queuedList = await orchestrator.getPreAggregationQueueStates(); if (queuedList.length === 0) { - deferred.resolve(); + lock.resolve(); } }, 500); - await deferred.promise; + await lock.promise; clearInterval(interval); expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5); @@ -976,18 +965,18 @@ describe('Refresh Scheduler', () => { } } - const deferred = createDeferred(); + const lock = createPromiseLock(); const orchestrator = await serverCore.getOrchestratorApi(ctx); const interval = setInterval(async () => { const queuedList = await orchestrator.getPreAggregationQueueStates(); if (queuedList.length === 0) { - deferred.resolve(); + lock.resolve(); } }, 500); - await deferred.promise; + await lock.promise; clearInterval(interval); expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(3); From 6b904a110ac9cd87337aca0735952c65c9bba1dc Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Mon, 17 Mar 2025 11:00:00 +0200 Subject: [PATCH 35/35] little fix --- packages/cubejs-api-gateway/src/dateParser.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-api-gateway/src/dateParser.js b/packages/cubejs-api-gateway/src/dateParser.js index dab7896779ad8..136f5d86878a8 100644 --- a/packages/cubejs-api-gateway/src/dateParser.js +++ b/packages/cubejs-api-gateway/src/dateParser.js @@ -65,7 +65,9 @@ export function dateParser(dateString, timezone, now = new Date()) { moment.tz(timezone).endOf('day').add(1, 'day') ]; } else if (dateString.match(/^from (.*) to (.*)$/)) { - const [, from, to] = dateString.match(/^from(.{0,50})to(.{0,50})$/); + let [, from, to] = dateString.match(/^from(.{0,50})to(.{0,50})$/); + from = from.trim(); + to = to.trim(); const current = moment(now).tz(timezone); const fromResults = parse(from.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));