Skip to content

Commit

Permalink
fix: Optimize compute time during pre-aggregations load checks for qu…
Browse files Browse the repository at this point in the history
…eries with many partitions
  • Loading branch information
paveltiunov committed Feb 26, 2021
1 parent d1767bb commit b9b590b
Showing 1 changed file with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ type PreAggregationLoadCacheOptions = {
dataSource: string
};

type VersionEntriesObj = {
versionEntries: VersionEntry[],
byStructure: { [key: string]: VersionEntry },
byContent: { [key: string]: VersionEntry },
byTableName: { [key: string]: VersionEntry },
};

class PreAggregationLoadCache {
private redisPrefix: string;

Expand Down Expand Up @@ -180,22 +187,44 @@ class PreAggregationLoadCache {
return this.tables;
}

protected async getVersionEntries(preAggregation) {
protected async getVersionEntries(preAggregation): Promise<VersionEntriesObj> {
if (!this.versionEntries) {
this.versionEntries = tablesToVersionEntries(
const entries = tablesToVersionEntries(
preAggregation.preAggregationsSchema,
await this.getTablesQuery(preAggregation)
);
// eslint-disable-next-line
const [active, toProcess, queries] = await this.fetchQueryStageState();
const targetTableNamesInQueue = (Object.keys(queries))
// eslint-disable-next-line no-use-before-define
.map(q => PreAggregations.targetTableName(queries[q].query.newVersionEntry));

const versionEntries = entries.filter(
// eslint-disable-next-line no-use-before-define
e => targetTableNamesInQueue.indexOf(PreAggregations.targetTableName(e)) === -1
);

const byContent: { [key: string]: VersionEntry } = {};
const byStructure: { [key: string]: VersionEntry } = {};
const byTableName: { [key: string]: VersionEntry } = {};

versionEntries.forEach(e => {
const contentKey = `${e.table_name}_${e.content_version}`;
if (!byContent[contentKey]) {
byContent[contentKey] = e;
}
const structureKey = `${e.table_name}_${e.structure_version}`;
if (!byStructure[structureKey]) {
byStructure[structureKey] = e;
}
if (!byTableName[e.table_name]) {
byTableName[e.table_name] = e;
}
});

this.versionEntries = { versionEntries, byContent, byStructure, byTableName };
}
// eslint-disable-next-line
const [active, toProcess, queries] = await this.fetchQueryStageState();
const targetTableNamesInQueue = (Object.keys(queries))
// eslint-disable-next-line no-use-before-define
.map(q => PreAggregations.targetTableName(queries[q].query.newVersionEntry));
return this.versionEntries.filter(
// eslint-disable-next-line no-use-before-define
e => targetTableNamesInQueue.indexOf(PreAggregations.targetTableName(e)) === -1
);
return this.versionEntries;
}

protected async keyQueryResult(keyQuery, waitForRenew, priority, renewalThreshold) {
Expand Down Expand Up @@ -321,16 +350,14 @@ class PreAggregationLoader {
const structureVersion = this.structureVersion();

const getVersionsStarted = new Date();
const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation);
const { byStructure } = await this.loadCache.getVersionEntries(this.preAggregation);
this.logger('Load PreAggregations Tables', {
preAggregation: this.preAggregation,
requestId: this.requestId,
duration: (new Date().getTime() - getVersionsStarted.getTime())
});

const versionEntryByStructureVersion = versionEntries.find(
v => v.table_name === this.preAggregation.tableName && v.structure_version === structureVersion
);
const versionEntryByStructureVersion = byStructure[`${this.preAggregation.tableName}_${structureVersion}`];
if (this.externalRefresh) {
if (!versionEntryByStructureVersion) {
throw new Error('One or more pre-aggregation tables could not be found to satisfy that query');
Expand Down Expand Up @@ -375,9 +402,7 @@ class PreAggregationLoader {

const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation);

const getVersionEntryByContentVersion = (entries) => entries.find(
v => v.table_name === this.preAggregation.tableName && v.content_version === contentVersion
);
const getVersionEntryByContentVersion = ({ byContent }: VersionEntriesObj) => byContent[`${this.preAggregation.tableName}_${contentVersion}`];

const versionEntryByContentVersion = getVersionEntryByContentVersion(versionEntries);
if (versionEntryByContentVersion) {
Expand All @@ -390,26 +415,22 @@ class PreAggregationLoader {
// eslint-disable-next-line no-use-before-define
await this.loadCache.getQueryStage(PreAggregations.preAggregationQueryCacheKey(this.preAggregation))
) {
const versionEntryByStructureVersion = versionEntries.find(
v => v.table_name === this.preAggregation.tableName && v.structure_version === structureVersion
);
const versionEntryByStructureVersion = versionEntries.byStructure[`${this.preAggregation.tableName}_${structureVersion}`];
if (versionEntryByStructureVersion) {
return this.targetTableName(versionEntryByStructureVersion);
}
}

if (!versionEntries.length) {
if (!versionEntries.versionEntries.length) {
const client = this.preAggregation.external ?
await this.externalDriverFactory() :
await this.driverFactory();
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema);
}
// ensure we find appropriate structure version before invalidating anything
const versionEntry = versionEntries.find(
v => v.table_name === this.preAggregation.tableName && v.structure_version === structureVersion
) || versionEntries.find(
e => e.table_name === this.preAggregation.tableName
);
const versionEntry =
versionEntries.byStructure[`${this.preAggregation.tableName}_${structureVersion}`] ||
versionEntries.byTableName[this.preAggregation.tableName];

const newVersionEntry = {
table_name: this.preAggregation.tableName,
Expand Down

0 comments on commit b9b590b

Please sign in to comment.