11import { TableStructure } from '@cubejs-backend/base-driver' ;
2+ import { asyncDebounce } from '@cubejs-backend/shared' ;
23import { DriverFactory } from './DriverFactory' ;
34import { QueryCache , QueryWithParams } from './QueryCache' ;
45import {
@@ -16,16 +17,6 @@ type PreAggregationLoadCacheOptions = {
1617 tablePrefixes ?: string [ ] ,
1718} ;
1819
19- function createDeferred ( ) {
20- let resolve ;
21- let reject ;
22- const promise = new Promise ( ( res , rej ) => {
23- resolve = res ;
24- reject = rej ;
25- } ) ;
26- return { promise, resolve, reject } ;
27- }
28-
2920export class PreAggregationLoadCache {
3021 private readonly driverFactory : DriverFactory ;
3122
@@ -35,8 +26,6 @@ export class PreAggregationLoadCache {
3526
3627 private readonly queryResults : any ;
3728
38- private queryResultRequests : { [ redisKey : string ] : { resolve : CallableFunction , reject : CallableFunction } [ ] } = { } ;
39-
4029 private readonly externalDriverFactory : any ;
4130
4231 private readonly requestId : any ;
@@ -56,6 +45,8 @@ export class PreAggregationLoadCache {
5645
5746 private readonly tablePrefixes : string [ ] | null ;
5847
48+ private readonly cacheQueryResultDebounced : Function ;
49+
5950 public constructor (
6051 clientFactory : DriverFactory ,
6152 queryCache ,
@@ -73,6 +64,7 @@ export class PreAggregationLoadCache {
7364 this . versionEntries = { } ;
7465 this . tables = { } ;
7566 this . tableColumnTypes = { } ;
67+ this . cacheQueryResultDebounced = asyncDebounce ( this . queryCache . cacheQueryResult . bind ( this . queryCache ) ) ;
7668 }
7769
7870 protected async tablesFromCache ( preAggregation , forceRenew : boolean = false ) {
@@ -209,53 +201,26 @@ export class PreAggregationLoadCache {
209201 return this . queryResults [ queryKey ] ;
210202 }
211203
212- // There is ongoing request
213- if ( this . queryResultRequests [ queryKey ] ) {
214- const { promise, resolve, reject } = createDeferred ( ) ;
215- this . queryResultRequests [ queryKey ] . push ( { resolve, reject } ) ;
216-
217- return promise ;
218- }
219-
220- // Making query for a first time
221- this . queryResultRequests [ queryKey ] = [ ] ;
222204
223- try {
224- this . queryResults [ queryKey ] = await this . queryCache . cacheQueryResult (
225- query ,
226- values ,
227- [ query , values ] ,
228- 60 * 60 ,
229- {
230- renewalThreshold : this . queryCache . options . refreshKeyRenewalThreshold
231- || queryOptions ?. renewalThreshold || 2 * 60 ,
232- renewalKey : [ query , values ] ,
233- waitForRenew,
234- priority,
235- requestId : this . requestId ,
236- dataSource : this . dataSource ,
237- useInMemory : true ,
238- external : queryOptions ?. external
239- }
240- ) ;
241-
242- let r = ( this . queryResultRequests [ queryKey ] || [ ] ) . pop ( ) ;
243- while ( r ) {
244- r . resolve ( this . queryResults [ queryKey ] ) ;
245- r = this . queryResultRequests [ queryKey ] . pop ( ) ;
205+ this . queryResults [ queryKey ] = await this . cacheQueryResultDebounced (
206+ query ,
207+ values ,
208+ [ query , values ] ,
209+ 60 * 60 ,
210+ {
211+ renewalThreshold : this . queryCache . options . refreshKeyRenewalThreshold
212+ || queryOptions ?. renewalThreshold || 2 * 60 ,
213+ renewalKey : [ query , values ] ,
214+ waitForRenew,
215+ priority,
216+ requestId : this . requestId ,
217+ dataSource : this . dataSource ,
218+ useInMemory : true ,
219+ external : queryOptions ?. external
246220 }
221+ ) ;
247222
248- return this . queryResults [ queryKey ] ;
249- } catch ( err ) {
250- let r = ( this . queryResultRequests [ queryKey ] || [ ] ) . pop ( ) ;
251- while ( r ) {
252- r . reject ( err ) ;
253- r = this . queryResultRequests [ queryKey ] . pop ( ) ;
254- }
255- throw err ;
256- } finally {
257- this . queryResultRequests [ queryKey ] = null ;
258- }
223+ return this . queryResults [ queryKey ] ;
259224 }
260225
261226 public hasKeyQueryResult ( keyQuery ) {
0 commit comments