@@ -58,16 +58,14 @@ class PreAggregationLoadCache {
58
58
async tablesFromCache ( preAggregation , forceRenew ) {
59
59
let tables = forceRenew ? null : await this . cacheDriver . get ( this . tablesRedisKey ( preAggregation ) ) ;
60
60
if ( ! tables ) {
61
- if ( this . fetchTablesPromise ) {
62
- tables = await this . fetchTablesPromise ;
63
- } else {
64
- this . fetchTablesPromise = this . fetchTables ( preAggregation ) ;
65
- try {
66
- tables = await this . fetchTablesPromise ;
67
- } finally {
68
- this . fetchTablesPromise = null ;
69
- }
70
- }
61
+ tables = await this . preAggregations . getLoadCacheQueue ( ) . executeInQueue (
62
+ 'query' ,
63
+ preAggregation . preAggregationsSchema ,
64
+ {
65
+ preAggregation
66
+ } ,
67
+ 0
68
+ ) ;
71
69
}
72
70
return tables ;
73
71
}
@@ -596,6 +594,25 @@ class PreAggregations {
596
594
return this . queue ;
597
595
}
598
596
597
+ getLoadCacheQueue ( ) {
598
+ if ( ! this . loadCacheQueue ) {
599
+ this . loadCacheQueue = QueryCache . createQueue ( `SQL_PRE_AGGREGATIONS_CACHE_${ this . redisPrefix } ` , this . driverFactory , ( client , q ) => {
600
+ const {
601
+ preAggregation
602
+ } = q ;
603
+ const loadCache = new PreAggregationLoadCache ( this . redisPrefix , this . driverFactory , this . queryCache , this ) ;
604
+ return loadCache . fetchTables ( preAggregation ) ;
605
+ } , {
606
+ concurrency : 4 ,
607
+ logger : this . logger ,
608
+ cacheAndQueueDriver : this . options . cacheAndQueueDriver ,
609
+ redisPool : this . options . redisPool ,
610
+ ...this . options . loadCacheQueueOptions
611
+ } ) ;
612
+ }
613
+ return this . loadCacheQueue ;
614
+ }
615
+
599
616
static preAggregationQueryCacheKey ( preAggregation ) {
600
617
return preAggregation . tableName ;
601
618
}
0 commit comments