Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4e8400b
spelling
KSDaemon Mar 7, 2025
3dd8075
add dateRange to PreAggsJobsRequest
KSDaemon Mar 7, 2025
5fc7e35
pass dateRange to preAggregations(filter)
KSDaemon Mar 7, 2025
e1bf5af
a bit of code polishment
KSDaemon Mar 7, 2025
a655524
add preAggsJobsRequestSchema validator
KSDaemon Mar 7, 2025
c5fce7d
fix PreAggsJobsRequest types
KSDaemon Mar 7, 2025
3f67b7f
specify type for parseLocalDate()
KSDaemon Mar 7, 2025
d74ba88
refresh only matched partitions
KSDaemon Mar 7, 2025
2be7cbe
add html reporters
KSDaemon Mar 7, 2025
21a1cdd
some code polishment
KSDaemon Mar 7, 2025
03c3634
fix/add some tests to dateParser()
KSDaemon Mar 7, 2025
625e466
code polishment
KSDaemon Mar 7, 2025
3a2b6f7
add another check
KSDaemon Mar 7, 2025
2aa4f1b
add tests for api-gw preagg jobs endpoint
KSDaemon Mar 7, 2025
c4cf6b2
align refactor some types
KSDaemon Mar 11, 2025
036dc79
code polish
KSDaemon Mar 11, 2025
a2ef014
refresh only matched partitions fix
KSDaemon Mar 11, 2025
f1b7c9d
code polish
KSDaemon Mar 11, 2025
73c584f
add tests for pre-agg jobs
KSDaemon Mar 11, 2025
44d88f8
jest reporters
KSDaemon Mar 11, 2025
e72697f
add tests for refreshScheduler.getCachedBuildJobs()
KSDaemon Mar 11, 2025
3752770
improve caching in pre-agg-load-cache
KSDaemon Mar 12, 2025
4f15cf1
increase delay in tests to pass
KSDaemon Mar 12, 2025
c3f26f2
use asyncDebounce for caching in pre-agg-load-cache
KSDaemon Mar 12, 2025
fa60a51
Revert "use asyncDebounce for caching in pre-agg-load-cache"
KSDaemon Mar 12, 2025
99013fd
Revert "improve caching in pre-agg-load-cache"
KSDaemon Mar 12, 2025
d91bd5a
set coverageReporters
KSDaemon Mar 12, 2025
7466733
add docs
KSDaemon Mar 13, 2025
3b03974
fix regexp
KSDaemon Mar 13, 2025
2cfecf5
fix dateParser + tests
KSDaemon Mar 13, 2025
d25d1d9
fix preAggsJobsRequestSchema validator
KSDaemon Mar 13, 2025
db45d38
fix error message
KSDaemon Mar 14, 2025
1cfc7db
simplify regexp in dateParser
KSDaemon Mar 14, 2025
e1541d7
tests polish
KSDaemon Mar 14, 2025
6b904a1
little fix
KSDaemon Mar 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/pages/product/apis-integrations/rest-api/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ Trigger pre-aggregation build jobs or retrieve statuses of such jobs.
| `selector.datasources` | Array of data source names which have pre-aggregations defined | ❌ |
| `selector.cubes` | Array of cube names which contain pre-aggregations | ❌ |
| `selector.preAggregations` | Array of pre-aggregation names | ❌ |
| `selector.dateRange` | Date Range tuple ['range-date-start', 'range-date-end'] | ❌ |

To trigger pre-aggregation builds, send a `POST` request with a payload
including `post` as the `action` and `selector` properties. The response will
Expand Down Expand Up @@ -340,6 +341,27 @@ curl \
https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs
```

Example request triggering builds of the `main` pre-aggregation defined in the
`orders` cube within date range with some security context data
and an `America/Los_Angeles` timezone:

```bash{outputLines: 2-13}
curl \
-d '{
"action": "post",
"selector": {
"contexts": [{ "securityContext": { "tenantId": "tenant1" } }],
"timezones": ["America/Los_Angeles"],
"preAggregations": ["orders.main"],
"dateRange": ["2020-01-01", "2020-02-01"]
}
}' \
-H "Authorization: EXAMPLE-API-TOKEN" \
-H "Content-Type: application/json" \
-X POST \
https://localhost:4000/cubejs-api/v1/pre-aggregations/jobs
```

Example response:

```json
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-api-gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"jest": {
"testEnvironment": "node",
"collectCoverage": false,
"coverageReporters": ["text", "html"],
"coverageDirectory": "coverage/",
"collectCoverageFrom": [
"dist/src/**/*.js",
Expand Down
17 changes: 10 additions & 7 deletions packages/cubejs-api-gateway/src/dateParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,19 @@ export function dateParser(dateString, timezone, now = new Date()) {
moment.tz(timezone).endOf('day').add(1, 'day')
];
} else if (dateString.match(/^from (.*) to (.*)$/)) {
// eslint-disable-next-line no-unused-vars,@typescript-eslint/no-unused-vars
const [all, from, to] = dateString.match(/^from (.*) to (.*)$/);
let [, from, to] = dateString.match(/^from(.{0,50})to(.{0,50})$/);
from = from.trim();
to = to.trim();

const current = moment(now).tz(timezone);
const fromResults = parse(from, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
const toResults = parse(to, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
const fromResults = parse(from.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
const toResults = parse(to.trim(), new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));

if (!Array.isArray(fromResults) || !fromResults.length) {
throw new UserError(`Can't parse date: '${from}'`);
}

if (!Array.isArray(fromResults) || !fromResults.length) {
if (!Array.isArray(toResults) || !toResults.length) {
throw new UserError(`Can't parse date: '${to}'`);
}

Expand All @@ -88,8 +89,10 @@ export function dateParser(dateString, timezone, now = new Date()) {

momentRange = [momentRange[0].startOf(exactGranularity), momentRange[1].endOf(exactGranularity)];
} else {
const results = parse(dateString, new Date(moment().tz(timezone).format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));
if (!results || !results.length) {
const current = moment(now).tz(timezone);
const results = parse(dateString, new Date(current.format(moment.HTML5_FMT.DATETIME_LOCAL_MS)));

if (!results?.length) {
throw new UserError(`Can't parse date: '${dateString}'`);
}

Expand Down
116 changes: 52 additions & 64 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import structuredClone from '@ungap/structured-clone';
import {
getEnv,
getRealType,
parseLocalDate,
QueryAlias,
} from '@cubejs-backend/shared';
import {
Expand Down Expand Up @@ -81,7 +82,9 @@ import {
normalizeQuery,
normalizeQueryCancelPreAggregations,
normalizeQueryPreAggregationPreview,
normalizeQueryPreAggregations, remapToQueryAdapterFormat,
normalizeQueryPreAggregations,
preAggsJobsRequestSchema,
remapToQueryAdapterFormat,
} from './query';
import { cachedHandler } from './cached-handler';
import { createJWKsFetcher } from './jwk';
Expand Down Expand Up @@ -768,8 +771,8 @@ class ApiGateway {
preAggregations: [{ id: preAggregationId }]
}
);
const { partitions } = (preAggregationPartitions && preAggregationPartitions[0] || {});
const preAggregationPartition = partitions && partitions.find(p => p?.tableName === versionEntry.table_name);
const { partitions } = (preAggregationPartitions?.[0] || {});
const preAggregationPartition = partitions?.find(p => p?.tableName === versionEntry.table_name);

res({
preview: preAggregationPartition && await orchestratorApi.getPreAggregationPreview(
Expand Down Expand Up @@ -843,7 +846,6 @@ class ApiGateway {
* ]
* }
* ```
* TODO (buntarb): selector object validator.
*/
private async preAggregationsJobs(req: Request, res: ExpressResponse) {
const response = this.resToResultFn(res);
Expand All @@ -853,49 +855,26 @@ class ApiGateway {
let result;
try {
await this.assertApiScope('jobs', req?.context?.securityContext);

if (!query || Object.keys(query).length === 0) {
throw new UserError('No job description provided');
}

const { error } = preAggsJobsRequestSchema.validate(query);
if (error) {
throw new UserError(`Invalid Job query format: ${error.message || error.toString()}`);
}

switch (query.action) {
case 'post':
if (
!(<PreAggsSelector>query.selector).timezones ||
(<PreAggsSelector>query.selector).timezones.length === 0
) {
throw new UserError(
'A user\'s selector must contain at least one time zone.'
);
}
if (
!(<PreAggsSelector>query.selector).contexts ||
(
<{securityContext: any}[]>(
<PreAggsSelector>query.selector
).contexts
).length === 0
) {
throw new UserError(
'A user\'s selector must contain at least one context element.'
);
} else {
let e = false;
(<{securityContext: any}[]>(
<PreAggsSelector>query.selector
).contexts).forEach((c) => {
if (!c.securityContext) e = true;
});
if (e) {
throw new UserError(
'Every context element must contain the ' +
'\'securityContext\' property.'
);
}
}
result = await this.preAggregationsJobsPOST(
context,
<PreAggsSelector>query.selector
);
if (result.length === 0) {
throw new UserError(
'A user\'s selector doesn\'t match any of the ' +
'pre-aggregations described by the Cube schemas.'
'pre-aggregations defined in the data model.'
);
}
break;
Expand Down Expand Up @@ -926,30 +905,38 @@ class ApiGateway {
selector: PreAggsSelector,
): Promise<string[]> {
let jobs: string[] = [];
if (!selector.contexts?.length) {
jobs = await this.postPreAggregationsBuildJobs(
context,
selector,
);
} else {
const promise = Promise.all(
selector.contexts.map(async (config) => {
const ctx = <RequestContext>{
...context,
...config,
};
const _jobs = await this.postPreAggregationsBuildJobs(
ctx,
selector,
);
return _jobs;
})
);
const resolve = await promise;
resolve.forEach((_jobs) => {
jobs = jobs.concat(_jobs);
});

// There might be a few contexts but dateRange if present is still the same
// so let's normalize it only once.
// It's expected that selector.dateRange is provided in local time (without timezone)
// At the same time it is ok to get timestamps with `Z` (in UTC).
if (selector.dateRange) {
const start = parseLocalDate([{ val: selector.dateRange[0] }], 'UTC');
const end = parseLocalDate([{ val: selector.dateRange[1] }], 'UTC');
if (!start || !end) {
throw new UserError(`Cannot parse selector date range ${selector.dateRange}`);
}
selector.dateRange = [start, end];
}

const promise = Promise.all(
selector.contexts.map(async (config) => {
const ctx = <RequestContext>{
...context,
...config,
};
const _jobs = await this.postPreAggregationsBuildJobs(
ctx,
selector,
);
return _jobs;
})
);
const resolve = await promise;
resolve.forEach((_jobs) => {
jobs = jobs.concat(_jobs);
});

return jobs;
}

Expand All @@ -961,7 +948,7 @@ class ApiGateway {
selector: PreAggsSelector
): Promise<string[]> {
const compiler = await this.getCompilerApi(context);
const { timezones } = selector;
const { timezones, dateRange } = selector;
const preaggs = await compiler.preAggregations({
dataSources: selector.dataSources,
cubes: selector.cubes,
Expand All @@ -977,12 +964,13 @@ class ApiGateway {
{
metadata: undefined,
timezones,
dateRange,
preAggregations: preaggs.map(p => ({
id: p.id,
cacheOnly: undefined, // boolean
cacheOnly: false,
partitions: undefined, // string[]
})),
forceBuildPreAggregations: undefined,
forceBuildPreAggregations: false,
throwErrors: false,
}
);
Expand Down
39 changes: 35 additions & 4 deletions packages/cubejs-api-gateway/src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import { getEnv } from '@cubejs-backend/shared';
import { UserError } from './UserError';
import { dateParser } from './dateParser';
import { QueryType } from './types/enums';
import { PreAggsJobsRequest } from "./types/request";

const getQueryGranularity = (queries) => R.pipe(
R.map(({ timeDimensions }) => timeDimensions[0] && timeDimensions[0].granularity || null),
R.map(({ timeDimensions }) => timeDimensions[0]?.granularity),
R.filter(Boolean),
R.uniq
)(queries);
Expand Down Expand Up @@ -145,6 +146,36 @@ const normalizeQueryOrder = order => {
return result;
};

export const preAggsJobsRequestSchema = Joi.object({
action: Joi.string().valid('post', 'get').required(),
selector: Joi.when('action', {
is: 'post',
then: Joi.object({
contexts: Joi.array().items(
Joi.object({
securityContext: Joi.required(),
})
).min(1).required(),
timezones: Joi.array().items(Joi.string()).min(1).required(),
dataSources: Joi.array().items(Joi.string()),
cubes: Joi.array().items(Joi.string()),
preAggregations: Joi.array().items(Joi.string()),
dateRange: Joi.array().length(2).items(Joi.string()),
}).optional(),
otherwise: Joi.forbidden(),
}),
tokens: Joi.when('action', {
is: 'get',
then: Joi.array().items(Joi.string()).min(1).required(),
otherwise: Joi.forbidden(),
}),
resType: Joi.when('action', {
is: 'get',
then: Joi.string().valid('object').optional(),
otherwise: Joi.forbidden(),
}),
});

const DateRegex = /^\d\d\d\d-\d\d-\d\d$/;

const normalizeQueryFilters = (filter) => (
Expand Down Expand Up @@ -196,9 +227,9 @@ const normalizeQuery = (query, persistent) => {
if (error) {
throw new UserError(`Invalid query format: ${error.message || error.toString()}`);
}
const validQuery = query.measures && query.measures.length ||
query.dimensions && query.dimensions.length ||
query.timeDimensions && query.timeDimensions.filter(td => !!td.granularity).length;
const validQuery = query.measures?.length ||
query.dimensions?.length ||
query.timeDimensions?.filter(td => !!td.granularity).length;
if (!validQuery) {
throw new UserError(
'Query should contain either measures, dimensions or timeDimensions with granularities in order to be valid'
Expand Down
7 changes: 4 additions & 3 deletions packages/cubejs-api-gateway/src/types/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type ResponseResultFn =
(
message: (Record<string, any> | Record<string, any>[]) | DataResult | ErrorResponse,
extra?: { status: number }
) => void;
) => void | Promise<void>;

/**
* Base HTTP request parameters map data type.
Expand Down Expand Up @@ -148,11 +148,12 @@ type SqlApiRequest = BaseRequest & {
* Pre-aggregations selector object.
*/
type PreAggsSelector = {
contexts?: {securityContext: any}[],
contexts: {securityContext: any}[],
timezones: string[],
dataSources?: string[],
cubes?: string[],
preAggregations?: string[],
dateRange?: [string, string], // We expect only single date Range for rebuilding
};

/**
Expand All @@ -177,7 +178,7 @@ type PreAggJob = {
* The `/cubejs-system/v1/pre-aggregations/jobs` endpoint object type.
*/
type PreAggsJobsRequest = {
action: 'post' | 'get' | 'delete',
action: 'post' | 'get',
selector?: PreAggsSelector,
tokens?: string[]
resType?: 'object' | 'array'
Expand Down
24 changes: 24 additions & 0 deletions packages/cubejs-api-gateway/test/dateParser.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,28 @@ describe('dateParser', () => {

Date.now.mockRestore();
});

test('throws error on from invalid date to date', () => {
expect(() => dateParser('from invalid to 2020-02-02', 'UTC')).toThrow(
'Can\'t parse date: \'invalid\''
);
});

test('throws error on from date to invalid date', () => {
expect(() => dateParser('from 2020-02-02 to invalid', 'UTC')).toThrow(
'Can\'t parse date: \'invalid\''
);
});

test('from 12AM till now by hour', () => {
Date.now = jest.fn().mockReturnValue(new Date(2021, 2, 5, 13, 0, 0, 0));
expect(dateParser('2 weeks ago by hour', 'UTC', new Date(Date.UTC(2021, 2, 5, 13, 0, 0, 0)))).toStrictEqual(
[
'2021-02-19T13:00:00.000',
'2021-02-19T13:59:59.999'
]
);

Date.now.mockRestore();
});
});
Loading
Loading