@@ -281,15 +281,19 @@ class PreAggregationLoader {
281
281
if ( versionEntry . structure_version !== newVersionEntry . structure_version ) {
282
282
this . logger ( 'Invalidating pre-aggregation structure' , {
283
283
preAggregation : this . preAggregation ,
284
- requestId : this . requestId
284
+ requestId : this . requestId ,
285
+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
286
+ newVersionEntry
285
287
} ) ;
286
288
await this . executeInQueue ( invalidationKeys , this . priority ( 10 ) , newVersionEntry ) ;
287
289
return mostRecentTargetTableName ( ) ;
288
290
} else if ( versionEntry . content_version !== newVersionEntry . content_version ) {
289
291
if ( this . waitForRenew ) {
290
292
this . logger ( 'Waiting for pre-aggregation renew' , {
291
293
preAggregation : this . preAggregation ,
292
- requestId : this . requestId
294
+ requestId : this . requestId ,
295
+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
296
+ newVersionEntry
293
297
} ) ;
294
298
await this . executeInQueue ( invalidationKeys , this . priority ( 0 ) , newVersionEntry ) ;
295
299
return mostRecentTargetTableName ( ) ;
@@ -300,7 +304,9 @@ class PreAggregationLoader {
300
304
} else {
301
305
this . logger ( 'Creating pre-aggregation from scratch' , {
302
306
preAggregation : this . preAggregation ,
303
- requestId : this . requestId
307
+ requestId : this . requestId ,
308
+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
309
+ newVersionEntry
304
310
} ) ;
305
311
await this . executeInQueue ( invalidationKeys , this . priority ( 10 ) , newVersionEntry ) ;
306
312
return mostRecentTargetTableName ( ) ;
@@ -345,7 +351,9 @@ class PreAggregationLoader {
345
351
scheduleRefresh ( invalidationKeys , newVersionEntry ) {
346
352
this . logger ( 'Refreshing pre-aggregation content' , {
347
353
preAggregation : this . preAggregation ,
348
- requestId : this . requestId
354
+ requestId : this . requestId ,
355
+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
356
+ newVersionEntry
349
357
} ) ;
350
358
this . executeInQueue ( invalidationKeys , this . priority ( 0 ) , newVersionEntry )
351
359
. catch ( e => {
@@ -360,24 +368,29 @@ class PreAggregationLoader {
360
368
executeInQueue ( invalidationKeys , priority , newVersionEntry ) {
361
369
return this . preAggregations . getQueue ( ) . executeInQueue (
362
370
'query' ,
363
- [ this . preAggregation . loadSql , invalidationKeys ] ,
371
+ this . preAggregationQueryKey ( invalidationKeys ) ,
364
372
{
365
373
preAggregation : this . preAggregation ,
366
374
preAggregationsTablesToTempTables : this . preAggregationsTablesToTempTables ,
367
375
newVersionEntry,
368
- requestId : this . requestId
376
+ requestId : this . requestId ,
377
+ invalidationKeys
369
378
} ,
370
379
priority ,
371
380
// eslint-disable-next-line no-use-before-define
372
381
{ stageQueryKey : PreAggregations . preAggregationQueryCacheKey ( this . preAggregation ) , requestId : this . requestId }
373
382
) ;
374
383
}
375
384
385
+ preAggregationQueryKey ( invalidationKeys ) {
386
+ return [ this . preAggregation . loadSql , invalidationKeys ] ;
387
+ }
388
+
376
389
targetTableName ( versionEntry ) {
377
390
return `${ versionEntry . table_name } _${ versionEntry . content_version } _${ versionEntry . structure_version } _${ versionEntry . last_updated_at } ` ;
378
391
}
379
392
380
- refresh ( newVersionEntry ) {
393
+ refresh ( newVersionEntry , invalidationKeys ) {
381
394
return ( client ) => {
382
395
let refreshStrategy = this . refreshImplStoreInSourceStrategy ;
383
396
if ( this . preAggregation . external ) {
@@ -388,12 +401,23 @@ class PreAggregationLoader {
388
401
this . refreshImplStreamExternalStrategy : this . refreshImplTempTableExternalStrategy ;
389
402
}
390
403
return cancelCombinator (
391
- saveCancelFn => refreshStrategy . bind ( this ) ( client , newVersionEntry , saveCancelFn )
404
+ saveCancelFn => refreshStrategy . bind ( this ) ( client , newVersionEntry , saveCancelFn , invalidationKeys )
392
405
) ;
393
406
} ;
394
407
}
395
408
396
- async refreshImplStoreInSourceStrategy ( client , newVersionEntry , saveCancelFn ) {
409
+ logExecutingSql ( invalidationKeys , query , params , targetTableName , newVersionEntry ) {
410
+ this . logger ( 'Executing Load Pre Aggregation SQL' , {
411
+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
412
+ query,
413
+ values : params ,
414
+ targetTableName,
415
+ requestId : this . requestId ,
416
+ newVersionEntry,
417
+ } ) ;
418
+ }
419
+
420
+ async refreshImplStoreInSourceStrategy ( client , newVersionEntry , saveCancelFn , invalidationKeys ) {
397
421
const [ loadSql , params ] =
398
422
Array . isArray ( this . preAggregation . loadSql ) ? this . preAggregation . loadSql : [ this . preAggregation . loadSql , [ ] ] ;
399
423
const targetTableName = this . targetTableName ( newVersionEntry ) ;
@@ -402,13 +426,7 @@ class PreAggregationLoader {
402
426
this . preAggregation . tableName ,
403
427
targetTableName
404
428
) ;
405
- this . logger ( 'Executing Load Pre Aggregation SQL' , {
406
- queryKey : this . preAggregation . loadSql ,
407
- query,
408
- values : params ,
409
- targetTableName,
410
- requestId : this . requestId
411
- } ) ;
429
+ this . logExecutingSql ( invalidationKeys , query , params , targetTableName , newVersionEntry ) ;
412
430
await saveCancelFn ( client . loadPreAggregationIntoTable (
413
431
targetTableName ,
414
432
query ,
@@ -420,7 +438,7 @@ class PreAggregationLoader {
420
438
await this . loadCache . reset ( this . preAggregation ) ;
421
439
}
422
440
423
- async refreshImplTempTableExternalStrategy ( client , newVersionEntry , saveCancelFn ) {
441
+ async refreshImplTempTableExternalStrategy ( client , newVersionEntry , saveCancelFn , invalidationKeys ) {
424
442
const [ loadSql , params ] =
425
443
Array . isArray ( this . preAggregation . loadSql ) ? this . preAggregation . loadSql : [ this . preAggregation . loadSql , [ ] ] ;
426
444
await client . createSchemaIfNotExists ( this . preAggregation . preAggregationsSchema ) ;
@@ -430,13 +448,7 @@ class PreAggregationLoader {
430
448
this . preAggregation . tableName ,
431
449
targetTableName
432
450
) ;
433
- this . logger ( 'Executing Load Pre Aggregation SQL' , {
434
- queryKey : this . preAggregation . loadSql ,
435
- query,
436
- values : params ,
437
- targetTableName,
438
- requestId : this . requestId
439
- } ) ;
451
+ this . logExecutingSql ( invalidationKeys , query , params , targetTableName , newVersionEntry ) ;
440
452
await saveCancelFn ( client . loadPreAggregationIntoTable (
441
453
targetTableName ,
442
454
query ,
@@ -448,13 +460,14 @@ class PreAggregationLoader {
448
460
await this . dropOrphanedTables ( client , targetTableName , saveCancelFn ) ;
449
461
}
450
462
451
- async refreshImplStreamExternalStrategy ( client , newVersionEntry , saveCancelFn ) {
463
+ async refreshImplStreamExternalStrategy ( client , newVersionEntry , saveCancelFn , invalidationKeys ) {
452
464
const [ sql , params ] =
453
465
Array . isArray ( this . preAggregation . sql ) ? this . preAggregation . sql : [ this . preAggregation . sql , [ ] ] ;
454
466
if ( ! client . downloadQueryResults ) {
455
467
throw new Error ( `Can't load external pre-aggregation: source driver doesn't support downloadQueryResults()` ) ;
456
468
}
457
469
470
+ this . logExecutingSql ( invalidationKeys , sql , params , this . targetTableName ( newVersionEntry ) , newVersionEntry ) ;
458
471
this . logger ( 'Downloading external pre-aggregation via query' , {
459
472
preAggregation : this . preAggregation ,
460
473
requestId : this . requestId
@@ -614,7 +627,7 @@ class PreAggregations {
614
627
if ( ! this . queue ) {
615
628
this . queue = QueryCache . createQueue ( `SQL_PRE_AGGREGATIONS_${ this . redisPrefix } ` , this . driverFactory , ( client , q ) => {
616
629
const {
617
- preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId
630
+ preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId, invalidationKeys
618
631
} = q ;
619
632
const loader = new PreAggregationLoader (
620
633
this . redisPrefix ,
@@ -627,7 +640,7 @@ class PreAggregations {
627
640
new PreAggregationLoadCache ( this . redisPrefix , this . driverFactory , this . queryCache , this , { requestId } ) ,
628
641
{ requestId }
629
642
) ;
630
- return loader . refresh ( newVersionEntry ) ( client ) ;
643
+ return loader . refresh ( newVersionEntry , invalidationKeys ) ( client ) ;
631
644
} , {
632
645
concurrency : 1 ,
633
646
logger : this . logger ,
0 commit comments