Skip to content

Commit

Permalink
feat: Manual build pre-aggregations, getter for queue state, debug API (
Browse files Browse the repository at this point in the history
  • Loading branch information
RusovDmitriy committed Jul 13, 2021
1 parent d7014a2 commit 692372e
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 7 deletions.
52 changes: 52 additions & 0 deletions packages/cubejs-api-gateway/src/gateway.ts
Expand Up @@ -389,6 +389,21 @@ export class ApiGateway {
res: this.resToResultFn(res)
});
}));

app.post('/cubejs-system/v1/pre-aggregations/build', jsonParser, systemMiddlewares, (async (req, res) => {
await this.buildPreAggregations({
query: req.body.query,
context: req.context,
res: this.resToResultFn(res)
});
}));

app.post('/cubejs-system/v1/pre-aggregations/queue', jsonParser, systemMiddlewares, (async (req, res) => {
await this.getPreAggregationsInQueue({
context: req.context,
res: this.resToResultFn(res)
});
}));
}

app.get('/readyz', guestMiddlewares, cachedHandler(this.readiness));
Expand Down Expand Up @@ -540,6 +555,43 @@ export class ApiGateway {
}
}

public async buildPreAggregations(
{ query, context, res }: { query: any, context: RequestContext, res: ResponseResultFn }
) {
const requestStarted = new Date();
try {
query = normalizeQueryPreAggregations(this.parseQueryParam(query));
const result = await this.refreshScheduler()
.buildPreAggregations(
context,
this.getCompilerApi(context),
query
);

res({ result });
} catch (e) {
this.handleError({
e, context, res, requestStarted
});
}
}

public async getPreAggregationsInQueue(
{ context, res }: { context: RequestContext, res: ResponseResultFn }
) {
const requestStarted = new Date();
try {
const orchestratorApi = this.getAdapterApi(context);
res({
result: await orchestratorApi.getPreAggregationQueueStates()
});
} catch (e) {
this.handleError({
e, context, res, requestStarted
});
}
}

protected async getNormalizedQueries(query, context: RequestContext): Promise<any> {
query = this.parseQueryParam(query);
let queryType = QUERY_TYPE.REGULAR_QUERY;
Expand Down
3 changes: 2 additions & 1 deletion packages/cubejs-api-gateway/src/query.js
Expand Up @@ -219,7 +219,8 @@ const queryPreAggregationsSchema = Joi.object().keys({
timezones: Joi.array().items(Joi.string()),
preAggregations: Joi.array().items(Joi.object().keys({
id: Joi.string().required(),
refreshRange: Joi.array().items(Joi.string()).length(2)
refreshRange: Joi.array().items(Joi.string()).length(2),
partitions: Joi.array().items(Joi.string())
}))
});

Expand Down
Expand Up @@ -330,6 +330,8 @@ class PreAggregationLoader {

private waitForRenew: boolean;

private forceBuild: boolean;

private externalDriverFactory: DriverFactory;

private requestId: string;
Expand All @@ -355,6 +357,7 @@ class PreAggregationLoader {
this.preAggregationsTablesToTempTables = preAggregationsTablesToTempTables;
this.loadCache = loadCache;
this.waitForRenew = options.waitForRenew;
this.forceBuild = options.forceBuild;
this.externalDriverFactory = preAggregations.externalDriverFactory;
this.requestId = options.requestId;
this.structureVersionPersistTime = preAggregations.structureVersionPersistTime;
Expand Down Expand Up @@ -436,7 +439,7 @@ class PreAggregationLoader {
const getVersionEntryByContentVersion = ({ byContent }: VersionEntriesObj) => byContent[`${this.preAggregation.tableName}_${contentVersion}`];

const versionEntryByContentVersion = getVersionEntryByContentVersion(versionEntries);
if (versionEntryByContentVersion) {
if (versionEntryByContentVersion && !this.forceBuild) {
return this.targetTableName(versionEntryByContentVersion);
}

Expand Down Expand Up @@ -482,6 +485,17 @@ class PreAggregationLoader {
return this.targetTableName(lastVersion);
};

if (this.forceBuild) {
this.logger('Force build pre-aggregation', {
preAggregation: this.preAggregation,
requestId: this.requestId,
queryKey: this.preAggregationQueryKey(invalidationKeys),
newVersionEntry
});
await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry);
return mostRecentTargetTableName();
}

if (versionEntry) {
if (versionEntry.structure_version !== newVersionEntry.structure_version) {
this.logger('Invalidating pre-aggregation structure', {
Expand Down Expand Up @@ -565,7 +579,8 @@ class PreAggregationLoader {
preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables,
newVersionEntry,
requestId: this.requestId,
invalidationKeys
invalidationKeys,
forceBuild: this.forceBuild
},
priority,
// eslint-disable-next-line no-use-before-define
Expand Down Expand Up @@ -1012,6 +1027,7 @@ export class PreAggregations {
getLoadCacheByDataSource(p.dataSource),
{
waitForRenew: queryBody.renewQuery,
forceBuild: queryBody.forceBuildPreAggregations,
requestId: queryBody.requestId,
externalRefresh: this.externalRefresh
}
Expand Down Expand Up @@ -1150,4 +1166,9 @@ export class PreAggregations {
);
return data.filter(res => res);
}

public async getQueueState(dataSource = undefined) {
const queries = await this.getQueue(dataSource).getQueries();
return queries;
}
}
Expand Up @@ -211,4 +211,8 @@ export class QueryOrchestrator {

return data || [];
}

public async getPreAggregationQueueStates() {
return this.preAggregations.getQueueState();
}
}
Expand Up @@ -62,7 +62,8 @@ export class QueryQueue {
if (!(priority >= -10000 && priority <= 10000)) {
throw new Error('Priority should be between -10000 and 10000');
}
let result = await redisClient.getResult(queryKey);
let result = !query.forceBuild && await redisClient.getResult(queryKey);

if (result) {
return this.parseResult(result);
}
Expand Down Expand Up @@ -151,6 +152,50 @@ export class QueryQueue {
return this.reconcilePromise;
}

async getQueries() {
const redisClient = await this.queueDriver.createConnection();
try {
const [stalledQueries, orphanedQueries, activeQueries, toProcessQueries] = await Promise.all([
redisClient.getStalledQueries(),
redisClient.getOrphanedQueries(),
redisClient.getActiveQueries(),
redisClient.getToProcessQueries()
]);

const mapWithDefinition = (arr) => Promise.all(arr.map(async queryKey => ({
...(await redisClient.getQueryDef(queryKey)),
queryKey
})));

const [stalled, orphaned, active, toProcess] = await Promise.all(
[stalledQueries, orphanedQueries, activeQueries, toProcessQueries].map(arr => mapWithDefinition(arr))
);

const result = {
orphaned,
stalled,
active,
toProcess
};

return Object.values(Object.keys(result).reduce((obj, status) => {
result[status].forEach(query => {
if (!obj[query.queryKey]) {
obj[query.queryKey] = {
...query,
status: []
};
}

obj[query.queryKey].status.push(status);
});
return obj;
}, {}));
} finally {
this.queueDriver.release(redisClient);
}
}

async reconcileQueueImpl() {
const redisClient = await this.queueDriver.createConnection();
try {
Expand Down
5 changes: 5 additions & 0 deletions packages/cubejs-server-core/src/core/OrchestratorApi.ts
Expand Up @@ -179,4 +179,9 @@ export class OrchestratorApi {
public getPreAggregationPreview(context: RequestContext, preAggregation, versionEntry) {
return this.orchestrator.getPreAggregationPreview(context.requestId, preAggregation, versionEntry);
}

public async getPreAggregationQueueStates() {
const result = await this.orchestrator.getPreAggregationQueueStates();
return result;
}
}
47 changes: 44 additions & 3 deletions packages/cubejs-server-core/src/core/RefreshScheduler.ts
Expand Up @@ -28,7 +28,8 @@ type PreAggregationsQueryingOptions = {
timezones: string[],
preAggregations: {
id: string,
refreshRange?: [string, string]
refreshRange?: [string, string],
partitions?: string[]
}[]
};

Expand Down Expand Up @@ -246,7 +247,7 @@ export class RefreshScheduler {

return Promise.all(preAggregations.map(async preAggregation => {
const { timezones } = queryingOptions;
const { refreshRange } = preAggregationsQueringOptions[preAggregation.id] || {};
const { refreshRange, partitions: partitionsFilter } = preAggregationsQueringOptions[preAggregation.id] || {};

const queriesForPreAggregation = preAggregation && (await Promise.all(
timezones.map(
Expand Down Expand Up @@ -283,7 +284,9 @@ export class RefreshScheduler {
return {
timezones,
preAggregation,
partitions
partitions: partitions.filter(p => !partitionsFilter ||
!partitionsFilter.length ||
partitionsFilter.includes(p.sql?.tableName))
};
}));
}
Expand Down Expand Up @@ -422,4 +425,42 @@ export class RefreshScheduler {
}
}));
}

public async buildPreAggregations(
context: RequestContext,
compilerApi: CompilerApi,
queryingOptions: PreAggregationsQueryingOptions
) {
const orchestratorApi = this.serverCore.getOrchestratorApi(context);
const preAggregations = await this.preAggregationPartitions(context, compilerApi, queryingOptions);
const preAggregationsLoadCacheByDataSource = {};

Promise.all(preAggregations.map(async (p: any) => {
const { partitions } = p;
return Promise.all(partitions.map(async query => {
const sqlQuery = await compilerApi.getSql(query);

await orchestratorApi.executeQuery({
...sqlQuery,
continueWait: true,
renewQuery: true,
forceBuildPreAggregations: true,
requestId: context.requestId,
timezone: query.timezone,
scheduledRefresh: false,
preAggregationsLoadCacheByDataSource
});
}));
})).catch(e => {
if (e.error !== 'Continue wait') {
this.serverCore.logger('Manual Build Pre-aggregations Error', {
error: e.error || e.stack || e.toString(),
securityContext: context.securityContext,
requestId: context.requestId
});
}
});

return true;
}
}

0 comments on commit 692372e

Please sign in to comment.