Skip to content

Commit

Permalink
feat: Mixed rolling window and regular measure queries from rollup su…
Browse files Browse the repository at this point in the history
…pport (#3326)

* feat: Mixed rolling window and regular measure queries from rollup support

* chore: Mixed rolling window and regular measure queries from rollup support -- revert runningTotal support as it can't be coerced to rollingWindow

* chore: Measure mixing cases fixing

* chore: Add offset support

* chore(cubestore): Week interval support
  • Loading branch information
paveltiunov committed Aug 31, 2021
1 parent c6e908e commit 3147e33
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 91 deletions.
23 changes: 23 additions & 0 deletions packages/cubejs-backend-shared/src/time.ts
Expand Up @@ -42,6 +42,10 @@ export const FROM_PARTITION_RANGE = '__FROM_PARTITION_RANGE';

export const TO_PARTITION_RANGE = '__TO_PARTITION_RANGE';

export const BUILD_RANGE_START_LOCAL = '__BUILD_RANGE_START_LOCAL';

export const BUILD_RANGE_END_LOCAL = '__BUILD_RANGE_END_LOCAL';

export const inDbTimeZone = (timezone: string, timestampFormat: string, timestamp: string): string => {
if (timestamp.length === 23) {
const zone = moment.tz.zone(timezone);
Expand All @@ -60,6 +64,25 @@ export const inDbTimeZone = (timezone: string, timestampFormat: string, timestam
return moment.tz(timestamp, timezone).utc().format(timestampFormat);
};

export const utcToLocalTimeZone = (timezone: string, timestampFormat: string, timestamp: string): string => {
if (timestamp.length === 23) {
const zone = moment.tz.zone(timezone);
if (!zone) {
throw new Error(`Unknown timezone: ${timezone}`);
}
const parsedTime = Date.parse(`${timestamp}Z`);
// TODO parsedTime might be incorrect offset for conversion
const offset = zone.utcOffset(parsedTime);
const inDbTimeZoneDate = new Date(parsedTime - offset * 60 * 1000);
if (timestampFormat === 'YYYY-MM-DD[T]HH:mm:ss.SSS[Z]' || timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSSZ') {
return inDbTimeZoneDate.toJSON();
} else if (timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSS') {
return inDbTimeZoneDate.toJSON().replace('Z', '');
}
}
return moment.tz(timestamp, 'UTC').tz(timezone).format(timestampFormat);
};

export const extractDate = (data: any): string => {
data = JSON.parse(JSON.stringify(data));
return moment.tz(data[0] && data[0][Object.keys(data[0])[0]], 'UTC').utc().format(moment.HTML5_FMT.DATETIME_LOCAL_MS);
Expand Down
157 changes: 156 additions & 1 deletion packages/cubejs-cubestore-driver/src/CubeStoreQuery.ts
@@ -1,5 +1,5 @@
import moment from 'moment-timezone';
import { BaseFilter, BaseQuery, UserError } from '@cubejs-backend/schema-compiler';
import { BaseFilter, BaseQuery, UserError, BaseMeasure } from '@cubejs-backend/schema-compiler';

const GRANULARITY_TO_INTERVAL: Record<string, string> = {
day: 'day',
Expand All @@ -18,6 +18,12 @@ class CubeStoreFilter extends BaseFilter {
}
}

type RollingWindow = {
trailing?: string | 'unbounded';
leading?: string | 'unbounded';
offset?: 'start' | 'end';
};

export class CubeStoreQuery extends BaseQuery {
public newFilter(filter) {
return new CubeStoreFilter(this, filter);
Expand Down Expand Up @@ -90,4 +96,153 @@ export class CubeStoreQuery extends BaseQuery {
// TODO: We should throw an error, but this gets called even when only `hllMerge` result is used.
return `approx_distinct_is_unsupported_in_cubestore(${sql}))`;
}

public regularAndTimeSeriesRollupQuery(
regularMeasures: BaseMeasure[],
multipliedMeasures: BaseMeasure[],
cumulativeMeasures: Array<[boolean, BaseMeasure]>,
preAggregationForQuery: any
) {
if (!cumulativeMeasures.length) {
return super.regularAndTimeSeriesRollupQuery(regularMeasures, multipliedMeasures, cumulativeMeasures, preAggregationForQuery);
}
const cumulativeMeasuresWithoutMultiplied = cumulativeMeasures.map(([multiplied, measure]) => measure);
const allMeasures = regularMeasures.concat(multipliedMeasures).concat(
cumulativeMeasuresWithoutMultiplied
);
const timeDimension = this.timeDimensions.find(d => d.granularity);
const baseQueryAlias = this.cubeAlias('base');
const maxRollingWindow = cumulativeMeasuresWithoutMultiplied.reduce((a, b) => this.maxRollingWindow(a, b.rollingWindowDefinition()), <RollingWindow><unknown>null);
const commonDateCondition =
this.rollingWindowDateJoinCondition(maxRollingWindow.trailing, maxRollingWindow.leading, maxRollingWindow.offset);
const filters = this.segments.concat(this.filters).concat(
timeDimension?.dateRange && this.dateFromStartToEndConditionSql(commonDateCondition, true, true) || []
);
const rollupGranularity = this.preAggregations?.castGranularity(preAggregationForQuery.preAggregation.granularity) || 'day';
const granularityOverride = timeDimension &&
cumulativeMeasuresWithoutMultiplied.reduce((a, b) => this.minGranularity(a, b.windowGranularity()), timeDimension.granularity) || rollupGranularity;
return this.evaluateSymbolSqlWithContext(
() => this.overTimeSeriesSelectRollup(
cumulativeMeasuresWithoutMultiplied,
regularMeasures.concat(multipliedMeasures),
this.evaluateSymbolSqlWithContext(() => this.preAggregations?.rollupPreAggregation(preAggregationForQuery, allMeasures, false, filters), {
granularityOverride
}),
baseQueryAlias,
timeDimension,
preAggregationForQuery
),
{
wrapQuery: true,
wrappedGranularity: timeDimension?.granularity || rollupGranularity,
rollupGranularity: granularityOverride
}
);
}

public overTimeSeriesSelectRollup(cumulativeMeasures, otherMeasures, baseQuery, baseQueryAlias, timeDimension, preAggregationForQuery) {
const cumulativeDimensions = this.dimensions.map(s => s.cumulativeSelectColumns()).filter(c => !!c).join(', ');
const partitionByClause = this.dimensions.length ? `PARTITION BY ${cumulativeDimensions}` : '';
const groupByDimensionClause = otherMeasures.length && timeDimension ? ` GROUP BY DIMENSION ${timeDimension.dimensionSql()}` : '';
const rollingWindowOrGroupByClause = timeDimension ?
` ROLLING_WINDOW DIMENSION ${timeDimension.aliasName()}${partitionByClause}${groupByDimensionClause} FROM ${this.timeGroupedColumn(timeDimension.granularity, timeDimension.localDateTimeFromOrBuildRangeParam())} TO ${this.timeGroupedColumn(timeDimension.granularity, timeDimension.localDateTimeToOrBuildRangeParam())} EVERY INTERVAL '1 ${timeDimension.granularity}'` :
this.groupByClause();
const forSelect = this.overTimeSeriesForSelectRollup(cumulativeMeasures, otherMeasures, timeDimension, preAggregationForQuery);
return `SELECT ${forSelect} FROM (${baseQuery}) ${baseQueryAlias}${rollingWindowOrGroupByClause}`;
}

public toInterval(interval) {
if (interval === 'unbounded') {
return 'UNBOUNDED';
} else {
return `INTERVAL '${interval}'`;
}
}

public maxRollingWindow(a: RollingWindow, b: RollingWindow): RollingWindow {
if (!a) {
return b;
}
if (!b) {
return a;
}
let trailing;
if (a.trailing === 'unbounded' || b.trailing === 'unbounded') {
trailing = 'unbounded';
} else if (!a.trailing) {
trailing = b.trailing;
} else if (!b.trailing) {
trailing = a.trailing;
} else {
trailing = this.parseSecondDuration(a.trailing) > this.parseSecondDuration(b.trailing) ? a.trailing : b.trailing;
}

let leading;
if (a.leading === 'unbounded' || b.leading === 'unbounded') {
leading = 'unbounded';
} else if (!a.leading) {
leading = b.leading;
} else if (!b.leading) {
leading = a.leading;
} else {
leading = this.parseSecondDuration(a.leading) > this.parseSecondDuration(b.leading) ? a.leading : b.leading;
}

if ((a.offset || 'end') !== (b.offset || 'end')) {
// TODO introduce virtual 'both' offset and return it if max receives 'start' and 'end'
throw new Error('Mixed offset rolling window querying is not supported');
}

return {
trailing,
leading,
offset: a.offset
};
}

public overTimeSeriesForSelectRollup(cumulativeMeasures, otherMeasures, timeDimension, preAggregationForQuery) {
const rollupMeasures = this.preAggregations?.rollupMeasures(preAggregationForQuery);
const renderedReference = rollupMeasures.map(measure => {
const m = this.newMeasure(measure);
const renderSql = () => {
if (timeDimension && m.isCumulative()) {
const measureSql = m.cumulativeMeasureSql();
const rollingWindow = m.rollingWindowDefinition();
const preceding = rollingWindow.trailing ? `${this.toInterval(rollingWindow.trailing)} PRECEDING` : '';
const following = rollingWindow.leading ? `${this.toInterval(rollingWindow.leading)} FOLLOWING` : '';
const offset = ` OFFSET ${rollingWindow.offset || 'end'}`;
return `ROLLING(${measureSql} ${preceding && following ? 'RANGE BETWEEN ' : 'RANGE '}${preceding}${preceding && following ? ' ' : ''}${following}${offset})`;
} else {
const conditionFn = m.isCumulative() ? this.dateFromStartToEndConditionSql(m.dateJoinCondition(), true, true)[0] : timeDimension;
return this.evaluateSymbolSqlWithContext(
() => {
const aliasName = m.aliasName();
return this.aggregateOnGroupedColumn(
m.measureDefinition(),
aliasName,
true,
m.measure
);
},
{
cumulativeMeasureFilters: { [m.measure]: conditionFn }
}
);
}
};

return {
[measure]: renderSql()
};
}).reduce((a, b) => ({ ...a, ...b }), {});
return this.evaluateSymbolSqlWithContext(
() => this.dimensions.concat(this.timeDimensions.filter(d => d.granularity)).map(s => s.cumulativeSelectColumns()).concat(
this.measures.map(m => m.selectColumns())
).filter(c => !!c)
.join(', '),
{
renderedReference
}
);
}
}
Expand Up @@ -8,6 +8,9 @@ import {
inDbTimeZone,
timeSeries,
TO_PARTITION_RANGE,
BUILD_RANGE_START_LOCAL,
BUILD_RANGE_END_LOCAL,
utcToLocalTimeZone
} from '@cubejs-backend/shared';

import { cancelCombinator, SaveCancelFn } from '../driver/utils';
Expand Down Expand Up @@ -105,6 +108,7 @@ type PreAggregationDescription = {
matchedTimeDimensionDateRange: QueryDateRange;
partitionGranularity: string;
preAggregationStartEndQueries: [QueryWithParams, QueryWithParams];
timestampFormat: string;
};

const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy(
Expand Down Expand Up @@ -1057,6 +1061,24 @@ export class PreAggregationPartitionRangeLoader {
return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue;
}

public async replaceQueryBuildRangeParams(queryValues: string[]): Promise<string[] | null> {
if (queryValues?.find(p => p === BUILD_RANGE_START_LOCAL || p === BUILD_RANGE_END_LOCAL)) {
const [buildRangeStart, buildRangeEnd] = await this.loadBuildRange();
return queryValues?.map(
param => {
if (param === BUILD_RANGE_START_LOCAL) {
return utcToLocalTimeZone(this.preAggregation.timezone, this.preAggregation.timestampFormat, buildRangeStart);
} else if (param === BUILD_RANGE_END_LOCAL) {
return utcToLocalTimeZone(this.preAggregation.timezone, this.preAggregation.timestampFormat, buildRangeEnd);
} else {
return param;
}
},
);
}
return null;
}

private replacePartitionSqlAndParams(
query: QueryWithParams,
dateRange: QueryDateRange,
Expand Down Expand Up @@ -1162,54 +1184,43 @@ export class PreAggregationPartitionRangeLoader {
}

private async partitionRanges() {
const { preAggregationStartEndQueries } = this.preAggregation;
const [startDate, endDate] = await Promise.all(
preAggregationStartEndQueries.map(
async rangeQuery => PreAggregationPartitionRangeLoader.extractDate(await this.loadRangeQuery(rangeQuery)),
),
);
const buildRange = await this.loadBuildRange();
let dateRange = PreAggregationPartitionRangeLoader.intersectDateRanges(
[startDate, endDate],
buildRange,
this.preAggregation.matchedTimeDimensionDateRange,
);
if (!dateRange) {
// If there's no date range intersection between query data range and pre-aggregation build range
// use last partition so outer query can receive expected table structure.
dateRange = [endDate, endDate];
dateRange = [buildRange[1], buildRange[1]];
}
let timeSeriesRanges = PreAggregationPartitionRangeLoader.timeSeries(
return PreAggregationPartitionRangeLoader.timeSeries(
this.preAggregation.partitionGranularity,
dateRange,
);
}

public async loadBuildRange(): Promise<QueryDateRange> {
const { preAggregationStartEndQueries } = this.preAggregation;
const [startDate, endDate] = await Promise.all(
preAggregationStartEndQueries.map(
async rangeQuery => PreAggregationPartitionRangeLoader.extractDate(await this.loadRangeQuery(rangeQuery)),
),
);
const wholeSeriesRanges = PreAggregationPartitionRangeLoader.timeSeries(
this.preAggregation.partitionGranularity,
[startDate, endDate],
);
const [preciseStartDate, preciseEndDate] = await Promise.all(
const [rangeStart, rangeEnd] = await Promise.all(
preAggregationStartEndQueries.map(
async (rangeQuery, i) => PreAggregationPartitionRangeLoader.extractDate(
await this.loadRangeQuery(
rangeQuery, i === 0 ? wholeSeriesRanges[0] : wholeSeriesRanges[wholeSeriesRanges.length - 1]
)
rangeQuery, i === 0 ? wholeSeriesRanges[0] : wholeSeriesRanges[wholeSeriesRanges.length - 1],
),
),
),
);
if (preciseStartDate !== startDate || preciseEndDate !== endDate) {
dateRange = PreAggregationPartitionRangeLoader.intersectDateRanges(
[preciseStartDate, preciseEndDate],
this.preAggregation.matchedTimeDimensionDateRange,
);
if (!dateRange) {
// If there's no date range intersection between query data range and pre-aggregation build range
// use last partition so outer query can receive expected table structure.
dateRange = [preciseEndDate, preciseEndDate];
}
timeSeriesRanges = PreAggregationPartitionRangeLoader.timeSeries(
this.preAggregation.partitionGranularity,
dateRange,
);
}
return timeSeriesRanges;
return [rangeStart, rangeEnd];
}

private static checkDataRangeType(range: QueryDateRange) {
Expand Down Expand Up @@ -1358,7 +1369,9 @@ export class PreAggregations {
return loadCacheByDataSource[dataSource];
};

return preAggregations.map(p => (preAggregationsTablesToTempTables) => {
let queryParamsReplacement = null;

const preAggregationsTablesToTempTablesPromise = preAggregations.map((p, i) => (preAggregationsTablesToTempTables) => {
const loader = new PreAggregationPartitionRangeLoader(
this.redisPrefix,
() => this.driverFactory(p.dataSource || 'default'),
Expand All @@ -1385,11 +1398,20 @@ export class PreAggregations {
};
await this.addTableUsed(usedPreAggregation.targetTableName);

if (i === preAggregations.length - 1 && queryBody.values) {
queryParamsReplacement = await loader.replaceQueryBuildRangeParams(queryBody.values);
}

return [p.tableName, usedPreAggregation];
});

return preAggregationPromise().then(res => preAggregationsTablesToTempTables.concat([res]));
}).reduce((promise, fn) => promise.then(fn), Promise.resolve([]));

return preAggregationsTablesToTempTablesPromise.then(preAggregationsTablesToTempTables => ({
preAggregationsTablesToTempTables,
values: queryParamsReplacement
}));
}

public async expandPartitionsInPreAggregations(queryBody) {
Expand Down
Expand Up @@ -94,7 +94,14 @@ export class QueryOrchestrator {
}

public async fetchQuery(queryBody: any): Promise<any> {
const preAggregationsTablesToTempTables = await this.preAggregations.loadAllPreAggregationsIfNeeded(queryBody);
const { preAggregationsTablesToTempTables, values } = await this.preAggregations.loadAllPreAggregationsIfNeeded(queryBody);

if (values) {
queryBody = {
...queryBody,
values
};
}

const usedPreAggregations = R.fromPairs(preAggregationsTablesToTempTables);
if (this.rollupOnlyMode && Object.keys(usedPreAggregations).length === 0) {
Expand Down

0 comments on commit 3147e33

Please sign in to comment.