Skip to content

Commit

Permalink
fix: Non default data source cache key and table schema queries are f…
Browse files Browse the repository at this point in the history
…orwarded to the default data source
  • Loading branch information
paveltiunov committed Jan 22, 2021
1 parent 31e20f2 commit 2f7c672
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
Expand Up @@ -114,9 +114,9 @@ class PreAggregationLoadCache {

constructor(redisPrefix, clientFactory: DriverFactory, queryCache, preAggregations, options: {
requestId?: string,
dataSource?: string
dataSource: string
}) {
options = options || {};
options = options || { dataSource: 'default' };
this.redisPrefix = `${redisPrefix}_${options.dataSource}`;
this.dataSource = options.dataSource;
this.driverFactory = clientFactory;
Expand Down Expand Up @@ -203,7 +203,8 @@ class PreAggregationLoadCache {
renewalKey: keyQuery,
waitForRenew,
priority,
requestId: this.requestId
requestId: this.requestId,
dataSource: this.dataSource
}
);
}
Expand Down Expand Up @@ -819,7 +820,8 @@ export class PreAggregations {
if (!loadCacheByDataSource[dataSource]) {
loadCacheByDataSource[dataSource] =
new PreAggregationLoadCache(this.redisPrefix, () => this.driverFactory(dataSource || 'default'), this.queryCache, this, {
requestId: queryBody.requestId
requestId: queryBody.requestId,
dataSource
});
}
return loadCacheByDataSource[dataSource];
Expand Down Expand Up @@ -865,7 +867,7 @@ export class PreAggregations {
() => this.driverFactory(dataSource),
this.queryCache,
this,
{ requestId }
{ requestId, dataSource }
),
{ requestId, externalRefresh: this.externalRefresh }
);
Expand Down Expand Up @@ -898,7 +900,7 @@ export class PreAggregations {
} = q;
const loadCache = new PreAggregationLoadCache(
this.redisPrefix, () => this.driverFactory(dataSource), this.queryCache, this,
{ requestId }
{ requestId, dataSource }
);
return loadCache.fetchTables(preAggregation);
}, {
Expand Down
Expand Up @@ -96,6 +96,7 @@ export class QueryCache {
this.startRenewCycle(query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, {
external: queryBody.external,
requestId: queryBody.requestId,
dataSource: queryBody.dataSource,
refreshKeyRenewalThresholds
});

Expand All @@ -121,6 +122,7 @@ export class QueryCache {
this.startRenewCycle(query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, {
external: queryBody.external,
requestId: queryBody.requestId,
dataSource: queryBody.dataSource,
refreshKeyRenewalThresholds
});
}
Expand Down Expand Up @@ -269,7 +271,13 @@ export class QueryCache {
return queue;
}

public startRenewCycle(query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, options) {
public startRenewCycle(query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, options: {
requestId?: string,
skipRefreshKeyWaitForRenew?: boolean,
external?: boolean,
refreshKeyRenewalThresholds?: Array<number>
dataSource: string
}) {
this.renewQuery(
query, values, cacheKeyQueries, expireSecs, cacheKey, renewalThreshold, options
).catch(e => {
Expand Down
Expand Up @@ -246,6 +246,49 @@ describe('QueryOrchestrator', () => {
expect(externalMockDriver.executedQueries.join(',')).toMatch(/SELECT \* FROM stb_pre_aggregations\.orders.*, stb_pre_aggregations\.customers.*/);
});

test('non default data source pre-aggregation', async () => {
const query = {
query: 'SELECT * FROM stb_pre_aggregations.orders, stb_pre_aggregations.customers',
values: [],
cacheKeyQueries: {
renewalThreshold: 21600,
queries: [['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', []]]
},
preAggregations: [{
preAggregationsSchema: 'stb_pre_aggregations',
tableName: 'stb_pre_aggregations.orders',
loadSql: ['CREATE TABLE stb_pre_aggregations.orders', []],
invalidateKeyQueries: [['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', []]],
dataSource: 'foo'
}],
renewQuery: true,
requestId: 'non default data source pre-aggregation',
dataSource: 'foo',
};
const result = await queryOrchestrator.fetchQuery(query);
console.log(result.data[0]);
expect(fooMockDriver.executedQueries.join(',')).toMatch(/CREATE TABLE stb_pre_aggregations.orders/);
expect(mockDriver.executedQueries.length).toEqual(0);
});

test('non default data source query', async () => {
const query = {
query: 'SELECT * FROM orders',
values: [],
cacheKeyQueries: {
renewalThreshold: 21600,
queries: [['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', []]]
},
renewQuery: true,
requestId: 'non default data source query',
dataSource: 'foo',
};
const result = await queryOrchestrator.fetchQuery(query);
console.log(result.data[0]);
expect(fooMockDriver.executedQueries.join(',')).toMatch(/orders/);
expect(mockDriver.executedQueries.length).toEqual(0);
});

test('silent truncate', async () => {
const query = {
query: 'SELECT "orders__created_at_week" "orders__created_at_week", sum("orders__count") "orders__count" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count_and_very_very_very_very_very_very_long20191101) as partition_union WHERE ("orders__created_at_week" >= ($1::timestamptz::timestamptz AT TIME ZONE \'UTC\') AND "orders__created_at_week" <= ($2::timestamptz::timestamptz AT TIME ZONE \'UTC\')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000',
Expand Down

0 comments on commit 2f7c672

Please sign in to comment.