Skip to content

Commit

Permalink
feat: introducing query persistent flag (#5744)
Browse files Browse the repository at this point in the history
  • Loading branch information
buntarb committed Dec 8, 2022
1 parent f1e25bb commit 699e772
Show file tree
Hide file tree
Showing 8 changed files with 488 additions and 187 deletions.
5 changes: 4 additions & 1 deletion packages/cubejs-api-gateway/src/gateway.ts
Expand Up @@ -1257,6 +1257,7 @@ class ApiGateway {
context: RequestContext,
normalizedQuery: NormalizedQuery,
sqlQuery: any,
apiType: string,
) {
const queries = [{
...sqlQuery,
Expand All @@ -1265,7 +1266,8 @@ class ApiGateway {
continueWait: true,
renewQuery: normalizedQuery.renewQuery,
requestId: context.requestId,
context
context,
persistent: apiType === 'sql',
}];
if (normalizedQuery.total) {
const normalizedTotal = structuredClone(normalizedQuery);
Expand Down Expand Up @@ -1420,6 +1422,7 @@ class ApiGateway {
context,
normalizedQuery,
sqlQueries[index],
apiType,
);

return this.getResultInternal(
Expand Down
187 changes: 112 additions & 75 deletions packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts
Expand Up @@ -20,7 +20,14 @@ import { cancelCombinator, SaveCancelFn, DriverInterface, BaseDriver,
StreamOptions,
UnloadOptions,
DriverCapabilities } from '@cubejs-backend/base-driver';
import { Query, QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
import {
Query,
QueryCache,
QueryTuple,
QueryWithParams,
QueryBody,
PreAggTableToTempTable,
} from './QueryCache';
import { ContinueWaitError } from './ContinueWaitError';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { QueryQueue } from './QueryQueue';
Expand Down Expand Up @@ -72,7 +79,9 @@ function nowTimestamp(client: DriverInterface) {
}

// Returns the oldest timestamp, if any.
export function getLastUpdatedAtTimestamp(timestamps: (number | undefined)[]): number | undefined {
export function getLastUpdatedAtTimestamp(
timestamps: (number | undefined)[]
): number | undefined {
timestamps = timestamps.filter(t => t !== undefined);
if (timestamps.length === 0) {
return undefined;
Expand All @@ -94,18 +103,12 @@ function getStructureVersion(preAggregation) {
}

type VersionEntry = {
// eslint-disable-next-line camelcase
table_name: string,
// eslint-disable-next-line camelcase
content_version: string,
// eslint-disable-next-line camelcase
structure_version: string,
// eslint-disable-next-line camelcase
last_updated_at: number,
// eslint-disable-next-line camelcase
build_range_end?: string,
// eslint-disable-next-line camelcase
naming_version?: number
'table_name': string,
'content_version': string,
'structure_version': string,
'last_updated_at': number,
'build_range_end'?: string,
'naming_version'?: number
};

type IndexesSql = { sql: [string, unknown[]], indexName: string }[];
Expand Down Expand Up @@ -402,8 +405,8 @@ class PreAggregationLoadCache {
if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) {
this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult(
query,
values,
[query, values],
<string[]>values,
[query, <string[]>values],
60 * 60,
{
renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold
Expand Down Expand Up @@ -885,11 +888,15 @@ export class PreAggregationLoader {
const [loadSql, params] =
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
const targetTableName = this.targetTableName(newVersionEntry);
const query = QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
.replace(
this.preAggregation.tableName,
targetTableName
);
const query = (
<string>QueryCache.replacePreAggregationTableNames(
loadSql,
this.preAggregationsTablesToTempTables,
)
).replace(
this.preAggregation.tableName,
targetTableName
);
const queryOptions = this.queryOptions(invalidationKeys, query, params, targetTableName, newVersionEntry);
this.logExecutingSql(queryOptions);

Expand Down Expand Up @@ -1023,11 +1030,15 @@ export class PreAggregationLoader {
const [loadSql, params] =
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];

const query = QueryCache.replacePreAggregationTableNames(loadSql, this.preAggregationsTablesToTempTables)
.replace(
this.preAggregation.tableName,
targetTableName
);
const query = (
<string>QueryCache.replacePreAggregationTableNames(
loadSql,
this.preAggregationsTablesToTempTables,
)
).replace(
this.preAggregation.tableName,
targetTableName
);
const queryOptions = this.queryOptions(invalidationKeys, query, params, targetTableName, newVersionEntry);
this.logExecutingSql(queryOptions);
await saveCancelFn(client.loadPreAggregationIntoTable(
Expand Down Expand Up @@ -1415,8 +1426,12 @@ export class PreAggregationPartitionRangeLoader {

return this.queryCache.cacheQueryResult(
query,
values,
QueryCache.queryCacheKey({ query, values, invalidate }),
<string[]>values,
QueryCache.queryCacheKey({
query,
values: (<string[]>values),
invalidate,
}),
24 * 60 * 60,
{
renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold
Expand Down Expand Up @@ -1607,10 +1622,10 @@ export class PreAggregationPartitionRangeLoader {
});
const { data } = await this.queryCache.renewQuery(
query,
values,
<string[]>values,
cacheKeyQueries,
60 * 60,
[query, values],
[query, <string[]>values],
undefined,
{
requestId: this.requestId,
Expand Down Expand Up @@ -1789,6 +1804,12 @@ type PreAggregationsOptions = {
skipExternalCacheAndQueue?: boolean;
};

type PreAggregationQueryBody = QueryBody & {
preAggregationsLoadCacheByDataSource?: {
[key: string]: PreAggregationLoadCache,
};
};

export class PreAggregations {
public options: PreAggregationsOptions;

Expand Down Expand Up @@ -1906,12 +1927,20 @@ export class PreAggregations {
return [true, status];
}

public loadAllPreAggregationsIfNeeded(queryBody) {
public loadAllPreAggregationsIfNeeded(
queryBody: PreAggregationQueryBody,
): Promise<{
preAggregationsTablesToTempTables: PreAggTableToTempTable[],
values: null | string[],
}> {
const preAggregations = queryBody.preAggregations || [];

const loadCacheByDataSource = queryBody.preAggregationsLoadCacheByDataSource || {};

const getLoadCacheByDataSource = (dataSource = 'default', preAggregationSchema) => {
const getLoadCacheByDataSource = (
dataSource = 'default',
preAggregationSchema: string,
) => {
if (!loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`]) {
loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`] =
new PreAggregationLoadCache(
Expand All @@ -1931,54 +1960,56 @@ export class PreAggregations {
}
);
}

return loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`];
};

let queryParamsReplacement = null;

const preAggregationsTablesToTempTablesPromise = preAggregations.map((p: PreAggregationDescription, i) => (preAggregationsTablesToTempTables) => {
const loader = new PreAggregationPartitionRangeLoader(
this.redisPrefix,
() => this.driverFactory(p.dataSource || 'default'),
this.logger,
this.queryCache,
this,
p,
preAggregationsTablesToTempTables,
getLoadCacheByDataSource(p.dataSource, p.preAggregationsSchema),
{
maxPartitions: this.options.maxPartitions,
maxSourceRowLimit: this.options.maxSourceRowLimit,
isJob: queryBody.isJob,
waitForRenew: queryBody.renewQuery,
// TODO workaround to avoid continuous waiting on building pre-aggregation dependencies
forceBuild: i === preAggregations.length - 1 ? queryBody.forceBuildPreAggregations : false,
requestId: queryBody.requestId,
metadata: queryBody.metadata,
orphanedTimeout: queryBody.orphanedTimeout,
lambdaQuery: (queryBody.lambdaQueries ?? {})[p.preAggregationId],
externalRefresh: this.externalRefresh
},
);
const preAggregationsTablesToTempTablesPromise =
preAggregations.map((p: PreAggregationDescription, i) => (preAggregationsTablesToTempTables) => {
const loader = new PreAggregationPartitionRangeLoader(
this.redisPrefix,
() => this.driverFactory(p.dataSource || 'default'),
this.logger,
this.queryCache,
this,
p,
preAggregationsTablesToTempTables,
getLoadCacheByDataSource(p.dataSource, p.preAggregationsSchema),
{
maxPartitions: this.options.maxPartitions,
maxSourceRowLimit: this.options.maxSourceRowLimit,
isJob: queryBody.isJob,
waitForRenew: queryBody.renewQuery,
// TODO workaround to avoid continuous waiting on building pre-aggregation dependencies
forceBuild: i === preAggregations.length - 1 ? queryBody.forceBuildPreAggregations : false,
requestId: queryBody.requestId,
metadata: queryBody.metadata,
orphanedTimeout: queryBody.orphanedTimeout,
lambdaQuery: (queryBody.lambdaQueries ?? {})[p.preAggregationId],
externalRefresh: this.externalRefresh
},
);

const preAggregationPromise = async () => {
const loadResult = await loader.loadPreAggregations();
const usedPreAggregation = {
...loadResult,
type: p.type,
};
await this.addTableUsed(usedPreAggregation.targetTableName);
const preAggregationPromise = async () => {
const loadResult = await loader.loadPreAggregations();
const usedPreAggregation = {
...loadResult,
type: p.type,
};
await this.addTableUsed(usedPreAggregation.targetTableName);

if (i === preAggregations.length - 1 && queryBody.values) {
queryParamsReplacement = await loader.replaceQueryBuildRangeParams(queryBody.values);
}
if (i === preAggregations.length - 1 && queryBody.values) {
queryParamsReplacement = await loader.replaceQueryBuildRangeParams(
<string[]>queryBody.values,
);
}

return [p.tableName, usedPreAggregation];
};
return [p.tableName, usedPreAggregation];
};

return preAggregationPromise().then(res => preAggregationsTablesToTempTables.concat([res]));
}).reduce((promise, fn) => promise.then(fn), Promise.resolve([]));
return preAggregationPromise().then(res => preAggregationsTablesToTempTables.concat([res]));
}).reduce((promise, fn) => promise.then(fn), Promise.resolve([]));

return preAggregationsTablesToTempTablesPromise.then(preAggregationsTablesToTempTables => ({
preAggregationsTablesToTempTables,
Expand Down Expand Up @@ -2037,7 +2068,7 @@ export class PreAggregations {
this,
{
requestId: queryBody.requestId,
dataSource
dataSource,
}
);
}
Expand Down Expand Up @@ -2100,7 +2131,10 @@ export class PreAggregations {
() => this.driverFactory(dataSource),
this.queryCache,
this,
{ requestId, dataSource }
{
requestId,
dataSource,
},
),
{ requestId, externalRefresh: this.externalRefresh, buildRangeEnd }
);
Expand Down Expand Up @@ -2144,7 +2178,10 @@ export class PreAggregations {
() => this.driverFactory(dataSource),
this.queryCache,
this,
{ requestId, dataSource }
{
requestId,
dataSource,
}
);
return loadCache.fetchTables(preAggregation);
},
Expand Down Expand Up @@ -2205,7 +2242,7 @@ export class PreAggregations {
this,
{
requestId,
dataSource
dataSource,
}
);
}
Expand Down

0 comments on commit 699e772

Please sign in to comment.