diff --git a/docs/pages/product/apis-integrations/rest-api/reference.mdx b/docs/pages/product/apis-integrations/rest-api/reference.mdx index ea935aa175c31..eb01d4a3d99ef 100644 --- a/docs/pages/product/apis-integrations/rest-api/reference.mdx +++ b/docs/pages/product/apis-integrations/rest-api/reference.mdx @@ -279,6 +279,7 @@ Trigger pre-aggregation build jobs or retrieve statuses of such jobs. | `selector.datasources` | Array of data source names which have pre-aggregations defined | ❌ | | `selector.cubes` | Array of cube names which contain pre-aggregations | ❌ | | `selector.preAggregations` | Array of pre-aggregation names | ❌ | +| `selector.dateRange` | Date Range tuple ['range-date-start', 'range-date-end'] | ❌ | To trigger pre-aggregation builds, send a `POST` request with a payload including `post` as the `action` and `selector` properties. The response will @@ -340,6 +341,27 @@ curl \ https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs ``` +Example request triggering builds of the `main` pre-aggregation defined in the +`orders` cube within date range with some security context data +and an `America/Los_Angeles` timezone: + +```bash{outputLines: 2-13} +curl \ + -d '{ + "action": "post", + "selector": { + "contexts": [{ "securityContext": { "tenantId": "tenant1" } }], + "timezones": ["America/Los_Angeles"], + "preAggregations": ["orders.main"], + "dateRange": ["2020-01-01", "2020-02-01"] + } + }' \ + -H "Authorization: EXAMPLE-API-TOKEN" \ + -H "Content-Type: application/json" \ + -X POST \ + https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs +``` + Example response: ```json diff --git a/packages/cubejs-api-gateway/package.json b/packages/cubejs-api-gateway/package.json index f4c9873774ce4..8bcafdfdd6ca5 100644 --- a/packages/cubejs-api-gateway/package.json +++ b/packages/cubejs-api-gateway/package.json @@ -73,6 +73,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", diff --git a/packages/cubejs-api-gateway/src/dateParser.js b/packages/cubejs-api-gateway/src/dateParser.js index 9db25453e3939..136f5d86878a8 100644 --- a/packages/cubejs-api-gateway/src/dateParser.js +++ b/packages/cubejs-api-gateway/src/dateParser.js @@ -65,18 +65,19 @@ export function dateParser(dateString, timezone, now = new Date()) { moment.tz(timezone).endOf('day').add(1, 'day') ]; } else if (dateString.match(/^from (.*) to (.*)$/)) { - // eslint-disable-next-line no-unused-vars,@typescript-eslint/no-unused-vars - const [all, from, to] = dateString.match(/^from (.*) to (.*)$/); + let [, from, to] = dateString.match(/^from(.{0,50})to(.{0,50})$/); + from = from.trim(); + to = to.trim(); const current = moment(now).tz(timezone); - const fromResults = parse(from, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); - const toResults = parse(to, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); + const fromResults = parse(from.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); + const toResults = parse(to.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); if (!Array.isArray(fromResults) || !fromResults.length) { throw new UserError(`Can't parse date: '${from}'`); } - if (!Array.isArray(fromResults) || !fromResults.length) { + if (!Array.isArray(toResults) || !toResults.length) { throw new UserError(`Can't parse date: '${to}'`); } @@ -88,8 +89,10 @@ export function dateParser(dateString, timezone, now = new Date()) { momentRange = [momentRange[0].startOf(exactGranularity), momentRange[1].endOf(exactGranularity)]; } else { - const results = parse(dateString, new Date(moment().tz(timezone).format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); - if (!results || !results.length) { + const current = moment(now).tz(timezone); + const results = parse(dateString, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS))); + + if (!results?.length) { throw new UserError(`Can't parse date: '${dateString}'`); } diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 10e8889ccfd8d..d6d90e2e7bde1 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -8,6 +8,7 @@ import structuredClone from '@ungap/structured-clone'; import { getEnv, getRealType, + parseLocalDate, QueryAlias, } from '@cubejs-backend/shared'; import { @@ -81,7 +82,9 @@ import { normalizeQuery, normalizeQueryCancelPreAggregations, normalizeQueryPreAggregationPreview, - normalizeQueryPreAggregations, remapToQueryAdapterFormat, + normalizeQueryPreAggregations, + preAggsJobsRequestSchema, + remapToQueryAdapterFormat, } from './query'; import { cachedHandler } from './cached-handler'; import { createJWKsFetcher } from './jwk'; @@ -768,8 +771,8 @@ class ApiGateway { preAggregations: [{ id: preAggregationId }] } ); - const { partitions } = (preAggregationPartitions && preAggregationPartitions[0] || {}); - const preAggregationPartition = partitions && partitions.find(p => p?.tableName === versionEntry.table_name); + const { partitions } = (preAggregationPartitions?.[0] || {}); + const preAggregationPartition = partitions?.find(p => p?.tableName === versionEntry.table_name); res({ preview: preAggregationPartition && await orchestratorApi.getPreAggregationPreview( @@ -843,7 +846,6 @@ class ApiGateway { * ] * } * ``` - * TODO (buntarb): selector object validator. */ private async preAggregationsJobs(req: Request, res: ExpressResponse) { const response = this.resToResultFn(res); @@ -853,41 +855,18 @@ class ApiGateway { let result; try { await this.assertApiScope('jobs', req?.context?.securityContext); + + if (!query || Object.keys(query).length === 0) { + throw new UserError('No job description provided'); + } + + const { error } = preAggsJobsRequestSchema.validate(query); + if (error) { + throw new UserError(`Invalid Job query format: ${error.message || error.toString()}`); + } + switch (query.action) { case 'post': - if ( - !(query.selector).timezones || - (query.selector).timezones.length === 0 - ) { - throw new UserError( - 'A user\'s selector must contain at least one time zone.' - ); - } - if ( - !(query.selector).contexts || - ( - <{securityContext: any}[]>( - query.selector - ).contexts - ).length === 0 - ) { - throw new UserError( - 'A user\'s selector must contain at least one context element.' - ); - } else { - let e = false; - (<{securityContext: any}[]>( - query.selector - ).contexts).forEach((c) => { - if (!c.securityContext) e = true; - }); - if (e) { - throw new UserError( - 'Every context element must contain the ' + - '\'securityContext\' property.' - ); - } - } result = await this.preAggregationsJobsPOST( context, query.selector @@ -895,7 +874,7 @@ class ApiGateway { if (result.length === 0) { throw new UserError( 'A user\'s selector doesn\'t match any of the ' + - 'pre-aggregations described by the Cube schemas.' + 'pre-aggregations defined in the data model.' ); } break; @@ -926,30 +905,38 @@ class ApiGateway { selector: PreAggsSelector, ): Promise { let jobs: string[] = []; - if (!selector.contexts?.length) { - jobs = await this.postPreAggregationsBuildJobs( - context, - selector, - ); - } else { - const promise = Promise.all( - selector.contexts.map(async (config) => { - const ctx = { - ...context, - ...config, - }; - const _jobs = await this.postPreAggregationsBuildJobs( - ctx, - selector, - ); - return _jobs; - }) - ); - const resolve = await promise; - resolve.forEach((_jobs) => { - jobs = jobs.concat(_jobs); - }); + + // There might be a few contexts but dateRange if present is still the same + // so let's normalize it only once. + // It's expected that selector.dateRange is provided in local time (without timezone) + // At the same time it is ok to get timestamps with `Z` (in UTC). + if (selector.dateRange) { + const start = parseLocalDate([{ val: selector.dateRange[0] }], 'UTC'); + const end = parseLocalDate([{ val: selector.dateRange[1] }], 'UTC'); + if (!start || !end) { + throw new UserError(`Cannot parse selector date range ${selector.dateRange}`); + } + selector.dateRange = [start, end]; } + + const promise = Promise.all( + selector.contexts.map(async (config) => { + const ctx = { + ...context, + ...config, + }; + const _jobs = await this.postPreAggregationsBuildJobs( + ctx, + selector, + ); + return _jobs; + }) + ); + const resolve = await promise; + resolve.forEach((_jobs) => { + jobs = jobs.concat(_jobs); + }); + return jobs; } @@ -961,7 +948,7 @@ class ApiGateway { selector: PreAggsSelector ): Promise { const compiler = await this.getCompilerApi(context); - const { timezones } = selector; + const { timezones, dateRange } = selector; const preaggs = await compiler.preAggregations({ dataSources: selector.dataSources, cubes: selector.cubes, @@ -977,12 +964,13 @@ class ApiGateway { { metadata: undefined, timezones, + dateRange, preAggregations: preaggs.map(p => ({ id: p.id, - cacheOnly: undefined, // boolean + cacheOnly: false, partitions: undefined, // string[] })), - forceBuildPreAggregations: undefined, + forceBuildPreAggregations: false, throwErrors: false, } ); diff --git a/packages/cubejs-api-gateway/src/query.js b/packages/cubejs-api-gateway/src/query.js index 02ea1fa56c27b..a6936f43dcf2c 100644 --- a/packages/cubejs-api-gateway/src/query.js +++ b/packages/cubejs-api-gateway/src/query.js @@ -6,9 +6,10 @@ import { getEnv } from '@cubejs-backend/shared'; import { UserError } from './UserError'; import { dateParser } from './dateParser'; import { QueryType } from './types/enums'; +import { PreAggsJobsRequest } from "./types/request"; const getQueryGranularity = (queries) => R.pipe( - R.map(({ timeDimensions }) => timeDimensions[0] && timeDimensions[0].granularity || null), + R.map(({ timeDimensions }) => timeDimensions[0]?.granularity), R.filter(Boolean), R.uniq )(queries); @@ -145,6 +146,36 @@ const normalizeQueryOrder = order => { return result; }; +export const preAggsJobsRequestSchema = Joi.object({ + action: Joi.string().valid('post', 'get').required(), + selector: Joi.when('action', { + is: 'post', + then: Joi.object({ + contexts: Joi.array().items( + Joi.object({ + securityContext: Joi.required(), + }) + ).min(1).required(), + timezones: Joi.array().items(Joi.string()).min(1).required(), + dataSources: Joi.array().items(Joi.string()), + cubes: Joi.array().items(Joi.string()), + preAggregations: Joi.array().items(Joi.string()), + dateRange: Joi.array().length(2).items(Joi.string()), + }).optional(), + otherwise: Joi.forbidden(), + }), + tokens: Joi.when('action', { + is: 'get', + then: Joi.array().items(Joi.string()).min(1).required(), + otherwise: Joi.forbidden(), + }), + resType: Joi.when('action', { + is: 'get', + then: Joi.string().valid('object').optional(), + otherwise: Joi.forbidden(), + }), +}); + const DateRegex = /^\d\d\d\d-\d\d-\d\d$/; const normalizeQueryFilters = (filter) => ( @@ -196,9 +227,9 @@ const normalizeQuery = (query, persistent) => { if (error) { throw new UserError(`Invalid query format: ${error.message || error.toString()}`); } - const validQuery = query.measures && query.measures.length || - query.dimensions && query.dimensions.length || - query.timeDimensions && query.timeDimensions.filter(td => !!td.granularity).length; + const validQuery = query.measures?.length || + query.dimensions?.length || + query.timeDimensions?.filter(td => !!td.granularity).length; if (!validQuery) { throw new UserError( 'Query should contain either measures, dimensions or timeDimensions with granularities in order to be valid' diff --git a/packages/cubejs-api-gateway/src/types/request.ts b/packages/cubejs-api-gateway/src/types/request.ts index 3393554fd8d65..9ff0ba90121b6 100644 --- a/packages/cubejs-api-gateway/src/types/request.ts +++ b/packages/cubejs-api-gateway/src/types/request.ts @@ -108,7 +108,7 @@ type ResponseResultFn = ( message: (Record | Record[]) | DataResult | ErrorResponse, extra?: { status: number } - ) => void; + ) => void | Promise; /** * Base HTTP request parameters map data type. @@ -148,11 +148,12 @@ type SqlApiRequest = BaseRequest & { * Pre-aggregations selector object. */ type PreAggsSelector = { - contexts?: {securityContext: any}[], + contexts: {securityContext: any}[], timezones: string[], dataSources?: string[], cubes?: string[], preAggregations?: string[], + dateRange?: [string, string], // We expect only single date Range for rebuilding }; /** @@ -177,7 +178,7 @@ type PreAggJob = { * The `/cubejs-system/v1/pre-aggregations/jobs` endpoint object type. */ type PreAggsJobsRequest = { - action: 'post' | 'get' | 'delete', + action: 'post' | 'get', selector?: PreAggsSelector, tokens?: string[] resType?: 'object' | 'array' diff --git a/packages/cubejs-api-gateway/test/dateParser.test.js b/packages/cubejs-api-gateway/test/dateParser.test.js index 4b67a1aed26ed..46acce9fa8d82 100644 --- a/packages/cubejs-api-gateway/test/dateParser.test.js +++ b/packages/cubejs-api-gateway/test/dateParser.test.js @@ -168,4 +168,28 @@ describe('dateParser', () => { Date.now.mockRestore(); }); + + test('throws error on from invalid date to date', () => { + expect(() => dateParser('from invalid to 2020-02-02', 'UTC')).toThrow( + 'Can\'t parse date: \'invalid\'' + ); + }); + + test('throws error on from date to invalid date', () => { + expect(() => dateParser('from 2020-02-02 to invalid', 'UTC')).toThrow( + 'Can\'t parse date: \'invalid\'' + ); + }); + + test('from 12AM till now by hour', () => { + Date.now = jest.fn().mockReturnValue(new Date(2021, 2, 5, 13, 0, 0, 0)); + expect(dateParser('2 weeks ago by hour', 'UTC', new Date(Date.UTC(2021, 2, 5, 13, 0, 0, 0)))).toStrictEqual( + [ + '2021-02-19T13:00:00.000', + '2021-02-19T13:59:59.999' + ] + ); + + Date.now.mockRestore(); + }); }); diff --git a/packages/cubejs-api-gateway/test/index.test.ts b/packages/cubejs-api-gateway/test/index.test.ts index b7436e3093cae..2153442e814ad 100644 --- a/packages/cubejs-api-gateway/test/index.test.ts +++ b/packages/cubejs-api-gateway/test/index.test.ts @@ -17,6 +17,7 @@ import { DataSourceStorageMock, AdapterApiMock } from './mocks'; +import { ApiScopesTuple } from '../src/types/auth'; const logger = (type, message) => console.log({ type, ...message }); @@ -61,6 +62,7 @@ async function createApiGateway( process.env.NODE_ENV = 'unknown'; const app = express(); + app.use(express.json()); apiGateway.initApp(app); return { @@ -982,6 +984,96 @@ describe('API Gateway', () => { }); }); + describe('/v1/pre-aggregations/jobs', () => { + const scheduledRefreshContextsFactory = () => ([ + { securityContext: { foo: 'bar' } }, + { securityContext: { bar: 'foo' } } + ]); + + const scheduledRefreshTimeZonesFactory = () => (['UTC', 'America/Los_Angeles']); + + const appPrepareFactory = async (scope: string[]) => { + const playgroundAuthSecret = 'test12345'; + const { app } = await createApiGateway( + new AdapterApiMock(), + new DataSourceStorageMock(), + { + basePath: '/test', + playgroundAuthSecret, + refreshScheduler: () => new RefreshSchedulerMock(), + scheduledRefreshContexts: () => Promise.resolve(scheduledRefreshContextsFactory()), + scheduledRefreshTimeZones: scheduledRefreshTimeZonesFactory, + contextToApiScopes: () => Promise.resolve(scope) + } + ); + const token = generateAuthToken({ uid: 5, scope }, {}, playgroundAuthSecret); + const tokenUser = generateAuthToken({ uid: 5, scope }, {}, API_SECRET); + + return { app, token, tokenUser }; + }; + + test('no input', async () => { + const { app, tokenUser } = await appPrepareFactory(['graphql', 'data', 'meta', 'jobs']); + + const req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`); + + const res = await req; + expect(res.status).toEqual(400); + expect(res.body.error).toEqual('No job description provided'); + }); + + test('invalid input action', async () => { + const { app, tokenUser } = await appPrepareFactory(['graphql', 'data', 'meta', 'jobs']); + + const req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`) + .send({ action: 'patch' }); + + const res = await req; + expect(res.status).toEqual(400); + expect(res.body.error.includes('Invalid Job query format')).toBeTruthy(); + }); + + test('invalid input date range', async () => { + const { app, tokenUser } = await appPrepareFactory(['graphql', 'data', 'meta', 'jobs']); + + let req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`) + .send({ + action: 'post', + selector: { + contexts: [{ securityContext: {} }], + timezones: ['UTC', 'America/Los_Angeles'], + dateRange: ['invalid string', '2020-02-20'] + } + }); + + let res = await req; + expect(res.status).toEqual(400); + expect(res.body.error.includes('Cannot parse selector date range')).toBeTruthy(); + + req = request(app).post('/test/v1/pre-aggregations/jobs') + .set('Content-type', 'application/json') + .set('Authorization', `Bearer ${tokenUser}`) + .send({ + action: 'post', + selector: { + contexts: [{ securityContext: {} }], + timezones: ['UTC', 'America/Los_Angeles'], + dateRange: ['2020-02-20', 'invalid string'] + } + }); + + res = await req; + expect(res.status).toEqual(400); + expect(res.body.error.includes('Cannot parse selector date range')).toBeTruthy(); + }); + }); + describe('healtchecks', () => { test('readyz (standalone)', async () => { const { app, adapterApi } = await createApiGateway(); diff --git a/packages/cubejs-backend-shared/package.json b/packages/cubejs-backend-shared/package.json index 35621cee1a544..fb796517bae71 100644 --- a/packages/cubejs-backend-shared/package.json +++ b/packages/cubejs-backend-shared/package.json @@ -63,6 +63,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", diff --git a/packages/cubejs-backend-shared/src/time.ts b/packages/cubejs-backend-shared/src/time.ts index 1a39ec5838c94..62ab33b93c3c7 100644 --- a/packages/cubejs-backend-shared/src/time.ts +++ b/packages/cubejs-backend-shared/src/time.ts @@ -251,12 +251,12 @@ export const utcToLocalTimeZone = (timezone: string, timestampFormat: string, ti return moment.tz(timestamp, 'UTC').tz(timezone).format(timestampFormat); }; -export const parseLocalDate = (data: any, timezone: string, timestampFormat: string = 'YYYY-MM-DDTHH:mm:ss.SSS'): string | null => { +export const parseLocalDate = (data: { [key: string]: string }[] | null | undefined, timezone: string, timestampFormat: string = 'YYYY-MM-DDTHH:mm:ss.SSS'): string | null => { if (!data) { return null; } data = JSON.parse(JSON.stringify(data)); - const value = data[0] && data[0][Object.keys(data[0])[0]]; + const value = data?.[0]?.[Object.keys(data[0])[0]]; if (!value) { return null; } diff --git a/packages/cubejs-backend-shared/test/time.test.ts b/packages/cubejs-backend-shared/test/time.test.ts index 4b90b45bc4ace..65174395044a9 100644 --- a/packages/cubejs-backend-shared/test/time.test.ts +++ b/packages/cubejs-backend-shared/test/time.test.ts @@ -267,7 +267,6 @@ describe('extractDate', () => { expect(parseLocalDate(null, timezone)).toBeNull(); expect(parseLocalDate(undefined, timezone)).toBeNull(); expect(parseLocalDate([], timezone)).toBeNull(); - expect(parseLocalDate('', timezone)).toBeNull(); }); it('should return null if no valid date is found in data', () => { diff --git a/packages/cubejs-query-orchestrator/package.json b/packages/cubejs-query-orchestrator/package.json index eef1f2e698077..6e5eed0e2fde3 100644 --- a/packages/cubejs-query-orchestrator/package.json +++ b/packages/cubejs-query-orchestrator/package.json @@ -54,6 +54,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 08f77c79b6a95..3e9d0947cdc2f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -1,6 +1,6 @@ import { TableStructure } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; -import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; +import { QueryCache, QueryWithParams } from './QueryCache'; import { PreAggregationDescription, PreAggregations, @@ -189,13 +189,13 @@ export class PreAggregationLoadCache { } public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) { - const [query, values, queryOptions]: QueryTuple = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; + const [query, values, queryOptions]: QueryWithParams = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) { this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult( query, - values, - [query, values], + values, + [query, values], 60 * 60, { renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts index 5de44242dfff3..16703906b76e1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts @@ -12,7 +12,7 @@ import { UnloadOptions } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; -import { PreAggTableToTempTableNames, QueryCache, QueryTuple } from './QueryCache'; +import { PreAggTableToTempTableNames, QueryCache, QueryWithParams } from './QueryCache'; import { ContinueWaitError } from './ContinueWaitError'; import { LargeStreamWarning } from './StreamObjectsCounter'; import { @@ -30,7 +30,7 @@ import { PreAggregationLoadCache } from './PreAggregationLoadCache'; type IndexesSql = { sql: [string, unknown[]], indexName: string }[]; -type QueryKey = [QueryTuple, IndexesSql, InvalidationKeys] | [QueryTuple, InvalidationKeys]; +type QueryKey = [QueryWithParams, IndexesSql, InvalidationKeys] | [QueryWithParams, InvalidationKeys]; type QueryOptions = { queryKey: QueryKey; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts index b8019ff18d037..f76fd9cc61a99 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -13,7 +13,7 @@ import { } from '@cubejs-backend/shared'; import { InlineTable, TableStructure } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; -import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; +import { QueryCache, QueryWithParams } from './QueryCache'; import { getLastUpdatedAtTimestamp, LAMBDA_TABLE_PREFIX, @@ -83,8 +83,8 @@ export class PreAggregationPartitionRangeLoader { this.compilerCacheFn = options.compilerCacheFn || ((subKey, cacheFn) => cacheFn()); } - private async loadRangeQuery(rangeQuery: QueryTuple, partitionRange?: QueryDateRange) { - const [query, values, queryOptions]: QueryTuple = rangeQuery; + private async loadRangeQuery(rangeQuery: QueryWithParams, partitionRange?: QueryDateRange) { + const [query, values, queryOptions]: QueryWithParams = rangeQuery; const invalidate = this.preAggregation.invalidateKeyQueries?.[0] ? this.preAggregation.invalidateKeyQueries[0].slice(0, 2) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index a3f4ef7deeef4..7f2360b95be65 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -9,7 +9,7 @@ import { InlineTables, CacheDriverInterface, TableStructure, - DriverInterface, + DriverInterface, QueryKey, } from '@cubejs-backend/base-driver'; import { QueryQueue } from './QueryQueue'; @@ -28,14 +28,12 @@ type QueryOptions = { incremental?: boolean; }; -export type QueryTuple = [ +export type QueryWithParams = [ sql: string, - params: unknown[], + params: string[], options?: QueryOptions ]; -export type QueryWithParams = QueryTuple; - export type Query = { requestId?: string; dataSource: string; @@ -84,7 +82,7 @@ export type PreAggTableToTempTable = [ export type PreAggTableToTempTableNames = [string, { targetTableName: string; }]; -export type CacheKeyItem = string | string[] | QueryTuple | QueryTuple[] | undefined; +export type CacheKeyItem = string | string[] | QueryWithParams | QueryWithParams[] | undefined; export type CacheKey = [CacheKeyItem, CacheKeyItem] | @@ -382,7 +380,7 @@ export class QueryCache { public static replacePreAggregationTableNames( queryAndParams: string | QueryWithParams, preAggregationsTablesToTempTables: PreAggTableToTempTableNames[], - ): string | QueryTuple { + ): string | QueryWithParams { const [keyQuery, params, queryOptions] = Array.isArray(queryAndParams) ? queryAndParams : [queryAndParams, []]; @@ -404,7 +402,7 @@ export class QueryCache { * queries and with the `stream.Writable` instance for the persistent. */ public async queryWithRetryAndRelease( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], { cacheKey, @@ -665,9 +663,9 @@ export class QueryCache { } public startRenewCycle( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], - cacheKeyQueries: (string | QueryTuple)[], + cacheKeyQueries: (string | QueryWithParams)[], expireSecs: number, cacheKey: CacheKey, renewalThreshold: any, @@ -700,9 +698,9 @@ export class QueryCache { } public renewQuery( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], - cacheKeyQueries: (string | QueryTuple)[], + cacheKeyQueries: (string | QueryWithParams)[], expireSecs: number, cacheKey: CacheKey, renewalThreshold: any, @@ -719,7 +717,7 @@ export class QueryCache { ) { options = options || { dataSource: 'default' }; return Promise.all( - this.loadRefreshKeys(cacheKeyQueries, expireSecs, options), + this.loadRefreshKeys(cacheKeyQueries, expireSecs, options), ) .catch(e => { if (e instanceof ContinueWaitError) { @@ -782,11 +780,11 @@ export class QueryCache { } ) { return cacheKeyQueries.map((q) => { - const [query, values, queryOptions]: QueryTuple = Array.isArray(q) ? q : [q, [], {}]; + const [query, values, queryOptions]: QueryWithParams = Array.isArray(q) ? q : [q, [], {}]; return this.cacheQueryResult( query, - values, - [query, values], + values, + [query, values], expireSecs, { renewalThreshold: this.options.refreshKeyRenewalThreshold || queryOptions?.renewalThreshold || 2 * 60, @@ -808,7 +806,7 @@ export class QueryCache { ) => this.cacheDriver.withLock(`lock:${key}`, callback, ttl, true); public async cacheQueryResult( - query: string | QueryTuple, + query: string | QueryWithParams, values: string[], cacheKey: CacheKey, expiration: number, @@ -867,7 +865,14 @@ export class QueryCache { }); }).catch(e => { if (!(e instanceof ContinueWaitError)) { - this.logger('Dropping Cache', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId, primaryQuery, renewCycle }); + this.logger('Dropping Cache', { + cacheKey, + error: e.stack || e, + requestId: options.requestId, + spanId, + primaryQuery, + renewCycle + }); this.cacheDriver.remove(redisKey) .catch(err => this.logger('Error removing key', { cacheKey, @@ -990,7 +995,7 @@ export class QueryCache { return null; } - public queryRedisKey(cacheKey): string { + public queryRedisKey(cacheKey: CacheKey): string { return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey) as any); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index 1b54cdff03b16..20a4a24c5b3ae 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -379,7 +379,7 @@ export class QueryOrchestrator { preAggregations.map(p => { const { preAggregation } = p.preAggregation; const partition = p.partitions[0]; - preAggregation.dataSource = (partition && partition.dataSource) || 'default'; + preAggregation.dataSource = partition?.dataSource || 'default'; preAggregation.preAggregationsSchema = preAggregationsSchema; return preAggregation; }), diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts index 5cb71266f1e69..1e9c366a09a61 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts @@ -2,6 +2,7 @@ import crypto from 'crypto'; import { getProcessUid } from '@cubejs-backend/shared'; import { QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver'; +import { CacheKey } from './QueryCache'; /** * Unique process ID regexp. @@ -11,13 +12,13 @@ export const processUidRE = /^[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}- /** * Returns query hash by specified `queryKey`. */ -export function getCacheHash(queryKey: QueryKey, processUid?: string): QueryKeyHash { +export function getCacheHash(queryKey: QueryKey | CacheKey, processUid?: string): QueryKeyHash { processUid = processUid || getProcessUid(); if (typeof queryKey === 'string' && queryKey.length < 256) { return queryKey as any; } - if (typeof queryKey === 'object' && queryKey.persistent) { + if (typeof queryKey === 'object' && 'persistent' in queryKey && queryKey.persistent) { return `${crypto .createHash('md5') .update(JSON.stringify(queryKey)) diff --git a/packages/cubejs-schema-compiler/package.json b/packages/cubejs-schema-compiler/package.json index 131d280ab4140..2955734674e07 100644 --- a/packages/cubejs-schema-compiler/package.json +++ b/packages/cubejs-schema-compiler/package.json @@ -91,6 +91,7 @@ "jest": { "testEnvironment": "node", "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", diff --git a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts index abcc94ad603c5..c58823b4ac9c6 100644 --- a/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts +++ b/packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts @@ -58,6 +58,13 @@ export type MeasureDefinition = { timeShiftReferences?: TimeShiftDefinitionReference[], }; +export type PreAggregationFilters = { + dataSources?: string[], + cubes?: string[], + preAggregationIds?: string[], + scheduled?: boolean, +}; + export class CubeEvaluator extends CubeSymbols { public evaluatedCubes: Record = {}; @@ -484,15 +491,8 @@ export class CubeEvaluator extends CubeSymbols { /** * Returns pre-aggregations filtered by the specified selector. - * @param {{ - * scheduled: boolean, - * dataSource: Array, - * cubes: Array, - * preAggregationIds: Array - * }} filter pre-aggregations selector - * @returns {*} */ - public preAggregations(filter) { + public preAggregations(filter: PreAggregationFilters) { const { scheduled, dataSources, cubes, preAggregationIds } = filter || {}; const idFactory = ({ cube, preAggregationName }) => `${cube}.${preAggregationName}`; @@ -601,9 +601,7 @@ export class CubeEvaluator extends CubeSymbols { public isInstanceOfType(type: 'measures' | 'dimensions' | 'segments', path: string | string[]): boolean { const cubeAndName = Array.isArray(path) ? path : path.split('.'); - const symbol = this.evaluatedCubes[cubeAndName[0]] && - this.evaluatedCubes[cubeAndName[0]][type] && - this.evaluatedCubes[cubeAndName[0]][type][cubeAndName[1]]; + const symbol = this.evaluatedCubes[cubeAndName[0]]?.[type]?.[cubeAndName[1]]; return symbol !== undefined; } @@ -655,7 +653,7 @@ export class CubeEvaluator extends CubeSymbols { } public isRbacEnabledForCube(cube: any): boolean { - return cube.accessPolicy && cube.accessPolicy.length; + return cube.accessPolicy?.length; } public isRbacEnabled(): boolean { diff --git a/packages/cubejs-server-core/package.json b/packages/cubejs-server-core/package.json index 7aeeecb879193..f56643175ea06 100644 --- a/packages/cubejs-server-core/package.json +++ b/packages/cubejs-server-core/package.json @@ -85,6 +85,7 @@ "/dist/test/setup.js" ], "collectCoverage": false, + "coverageReporters": ["text", "html"], "coverageDirectory": "coverage/", "collectCoverageFrom": [ "dist/src/**/*.js", diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index 942c47f299596..777ed886899a4 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -3,7 +3,10 @@ import pLimit from 'p-limit'; import { v4 as uuidv4 } from 'uuid'; import crypto from 'crypto'; import { Required } from '@cubejs-backend/shared'; -import { PreAggregationDescription } from '@cubejs-backend/query-orchestrator'; +import { + PreAggregationDescription, + PreAggregationPartitionRangeLoader +} from '@cubejs-backend/query-orchestrator'; import { CubejsServerCore } from './server'; import { CompilerApi } from './CompilerApi'; @@ -30,6 +33,7 @@ type ScheduledRefreshQueryingOptions = Required { const orchestratorApi = await this.serverCore.getOrchestratorApi(context); const preAggregations = await this.preAggregationPartitions(context, queryingOptions); + if (queryingOptions.dateRange) { + preAggregations.forEach(preAggregation => { + preAggregation.partitions = preAggregation.partitions + .filter(p => { + if (!p.buildRangeStart && !p.buildRangeEnd) { + return true; // If there is no range specified - we should include it like rebuild in anyway + } + + return PreAggregationPartitionRangeLoader.intersectDateRanges( + [p.buildRangeStart, p.buildRangeEnd], + queryingOptions.dateRange, + ); + }); + preAggregation.partitionsWithDependencies.forEach(pd => { + pd.partitions = pd.partitions.filter(p => { + if (!p.buildRangeStart && !p.buildRangeEnd) { + return true; // If there is no range specified - we should include it like rebuild in any way + } + + return PreAggregationPartitionRangeLoader.intersectDateRanges( + [p.buildRangeStart, p.buildRangeEnd], + queryingOptions.dateRange, + ); + }); + + pd.dependencies = pd.dependencies.filter(p => { + if (!p.buildRangeStart && !p.buildRangeEnd) { + return true; // If there is no range specified - we should include it like rebuild in any way + } + + return PreAggregationPartitionRangeLoader.intersectDateRanges( + [p.buildRangeStart, p.buildRangeEnd], + queryingOptions.dateRange, + ); + }); + }); + }); + } + const preAggregationsLoadCacheByDataSource = {}; const jobsPromise = Promise.all( - preAggregations.map(async (p: any) => { - const { partitionsWithDependencies } = p; - return Promise.all( - partitionsWithDependencies.map(({ partitions, dependencies }) => ( - Promise.all( - partitions.map( - async (partition): Promise => { - const job = await orchestratorApi.executeQuery({ - preAggregations: dependencies.concat([partition]), - continueWait: true, - renewQuery: false, - forceBuildPreAggregations: true, - orphanedTimeout: 60 * 60, - requestId: context.requestId, - timezone: partition.timezone, - scheduledRefresh: false, - preAggregationsLoadCacheByDataSource, - metadata: queryingOptions.metadata, - isJob: true, - }); - job[0].dataSource = partition.dataSource; - job[0].timezone = partition.timezone; - return job; - } + preAggregations + // Filter out pre-aggs without partitions + .filter(p => p.partitions.length) + .map(async (p: any) => { + const { partitionsWithDependencies } = p; + return Promise.all( + partitionsWithDependencies.map(({ partitions, dependencies }) => ( + Promise.all( + partitions.map( + async (partition): Promise => { + const job = await orchestratorApi.executeQuery({ + preAggregations: dependencies.concat([partition]), + continueWait: true, + renewQuery: false, + forceBuildPreAggregations: true, + orphanedTimeout: 60 * 60, + requestId: context.requestId, + timezone: partition.timezone, + scheduledRefresh: false, + preAggregationsLoadCacheByDataSource, + metadata: queryingOptions.metadata, + isJob: true, + }); + job[0].dataSource = partition.dataSource; + job[0].timezone = partition.timezone; + return job; + } + ) ) - ) - )) - ); - }) + )) + ); + }) ); const jobedPAs = await jobsPromise; diff --git a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts index 6e2f42bb7ba3e..fa0bd8cc0e8ac 100644 --- a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts +++ b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts @@ -1,9 +1,7 @@ import R from 'ramda'; import { BaseDriver } from '@cubejs-backend/query-orchestrator'; -import { pausePromise, SchemaFileRepository } from '@cubejs-backend/shared'; -import { CubejsServerCore } from '../../src'; -import { RefreshScheduler } from '../../src/core/RefreshScheduler'; -import { CompilerApi } from '../../src/core/CompilerApi'; +import { pausePromise, SchemaFileRepository, createPromiseLock } from '@cubejs-backend/shared'; +import { CubejsServerCore, CompilerApi, RefreshScheduler } from '../../src'; const schemaContent = ` cube('Foo', { @@ -245,7 +243,8 @@ class MockDriver extends BaseDriver { public cancelledQueries: any[] = []; - private tablesQueryDelay: any; + // FIXME: With small or absent delay 'Manual pre-aggregations rebuild via postBuildJobs' tests fails with incorrect results. + private tablesQueryDelay: any = 200; private schema: any; @@ -395,7 +394,7 @@ const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertS jest.spyOn(serverCore, 'getCompilerApi').mockImplementation(async () => compilerApi); const refreshScheduler = new RefreshScheduler(serverCore); - return { refreshScheduler, compilerApi, mockDriver }; + return { refreshScheduler, compilerApi, mockDriver, serverCore }; }; describe('Refresh Scheduler', () => { @@ -819,6 +818,174 @@ describe('Refresh Scheduler', () => { expect(refreshResult.finished).toEqual(true); }); + describe('Manual pre-aggregations rebuild via postBuildJobs', () => { + test('All pre-aggregations', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + + const { + refreshScheduler, mockDriver, serverCore + } = setupScheduler({ repository: repositoryWithPreAggregations }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + + let finish = false; + let jobs: string[]; + + while (!finish) { + try { + jobs = await refreshScheduler.postBuildJobs( + ctx, + { + metadata: undefined, + preAggregations: [], + timezones: ['UTC', 'America/Los_Angeles'], + forceBuildPreAggregations: false, + throwErrors: false, + preAggregationLoadConcurrency: 1, + } + ); + finish = true; + } catch (err: any) { + if (err.error !== 'Continue wait') { + throw err; + } + } + } + + const lock = createPromiseLock(); + const orchestrator = await serverCore.getOrchestratorApi(ctx); + + const interval = setInterval(async () => { + const queuedList = await orchestrator.getPreAggregationQueueStates(); + + if (queuedList.length === 0) { + lock.resolve(); + } + }, 500); + + await lock.promise; + clearInterval(interval); + + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + + // Let's also test the getCachedBuildJobs() + const buildJobs = await refreshScheduler.getCachedBuildJobs(ctx, jobs); + const allTokensExist = jobs.every(token => buildJobs.some(job => job.token === token)); + expect(allTokensExist).toBeTruthy(); + }); + + test('Only `first` pre-aggregation', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + + const { + refreshScheduler, mockDriver, serverCore + } = setupScheduler({ repository: repositoryWithPreAggregations }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + + let finish = false; + + while (!finish) { + try { + await refreshScheduler.postBuildJobs( + ctx, + { + metadata: undefined, + preAggregations: [{ id: 'Foo.first' }], + timezones: ['UTC', 'America/Los_Angeles'], + forceBuildPreAggregations: false, + throwErrors: false, + } + ); + finish = true; + } catch (err: any) { + if (err.error !== 'Continue wait') { + throw err; + } + } + } + + const lock = createPromiseLock(); + const orchestrator = await serverCore.getOrchestratorApi(ctx); + + const interval = setInterval(async () => { + const queuedList = await orchestrator.getPreAggregationQueueStates(); + + if (queuedList.length === 0) { + lock.resolve(); + } + }, 500); + + await lock.promise; + clearInterval(interval); + + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(5); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned')).length).toEqual(0); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second')).length).toEqual(0); + }); + + test('Only `first` pre-aggregation with dateRange', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + + const { + refreshScheduler, mockDriver, serverCore + } = setupScheduler({ repository: repositoryWithPreAggregations }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + + let finish = false; + + while (!finish) { + try { + await refreshScheduler.postBuildJobs( + ctx, + { + metadata: undefined, + preAggregations: [{ id: 'Foo.first' }], + timezones: ['UTC', 'America/Los_Angeles'], + dateRange: ['2020-12-29T00:00:00.000', '2021-01-01T00:00:00.000'], + forceBuildPreAggregations: false, + throwErrors: false, + } + ); + finish = true; + } catch (err: any) { + if (err.error !== 'Continue wait') { + throw err; + } + } + } + + const lock = createPromiseLock(); + const orchestrator = await serverCore.getOrchestratorApi(ctx); + + const interval = setInterval(async () => { + const queuedList = await orchestrator.getPreAggregationQueueStates(); + + if (queuedList.length === 0) { + lock.resolve(); + } + }, 500); + + await lock.promise; + clearInterval(interval); + + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(3); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(3); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned')).length).toEqual(0); + expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second')).length).toEqual(0); + }); + }); + test('Iterator waits before advance', async () => { process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';