@@ -165,6 +165,7 @@ class PreAggregationLoader {
165
165
this . loadCache = loadCache ;
166
166
this . waitForRenew = options . waitForRenew ;
167
167
this . externalDriverFactory = preAggregations . externalDriverFactory ;
168
+ this . requestId = options . requestId ;
168
169
}
169
170
170
171
async loadPreAggregation ( ) {
@@ -179,7 +180,11 @@ class PreAggregationLoader {
179
180
if ( versionEntryByStructureVersion ) {
180
181
this . loadPreAggregationWithKeys ( ) . catch ( e => {
181
182
if ( ! ( e instanceof ContinueWaitError ) ) {
182
- this . logger ( 'Error loading pre-aggregation' , { error : ( e . stack || e ) , preAggregation : this . preAggregation } ) ;
183
+ this . logger ( 'Error loading pre-aggregation' , {
184
+ error : ( e . stack || e ) ,
185
+ preAggregation : this . preAggregation ,
186
+ requestId : this . requestId
187
+ } ) ;
183
188
}
184
189
} ) ;
185
190
return this . targetTableName ( versionEntryByStructureVersion ) ;
@@ -248,12 +253,18 @@ class PreAggregationLoader {
248
253
249
254
if ( versionEntry ) {
250
255
if ( versionEntry . structure_version !== newVersionEntry . structure_version ) {
251
- this . logger ( 'Invalidating pre-aggregation structure' , { preAggregation : this . preAggregation } ) ;
256
+ this . logger ( 'Invalidating pre-aggregation structure' , {
257
+ preAggregation : this . preAggregation ,
258
+ requestId : this . requestId
259
+ } ) ;
252
260
await this . executeInQueue ( invalidationKeys , 10 , newVersionEntry ) ;
253
261
return mostRecentTargetTableName ( ) ;
254
262
} else if ( versionEntry . content_version !== newVersionEntry . content_version ) {
255
263
if ( this . waitForRenew ) {
256
- this . logger ( 'Waiting for pre-aggregation renew' , { preAggregation : this . preAggregation } ) ;
264
+ this . logger ( 'Waiting for pre-aggregation renew' , {
265
+ preAggregation : this . preAggregation ,
266
+ requestId : this . requestId
267
+ } ) ;
257
268
await this . executeInQueue ( invalidationKeys , 0 , newVersionEntry ) ;
258
269
return mostRecentTargetTableName ( ) ;
259
270
} else {
@@ -268,7 +279,10 @@ class PreAggregationLoader {
268
279
}
269
280
}
270
281
} else {
271
- this . logger ( 'Creating pre-aggregation from scratch' , { preAggregation : this . preAggregation } ) ;
282
+ this . logger ( 'Creating pre-aggregation from scratch' , {
283
+ preAggregation : this . preAggregation ,
284
+ requestId : this . requestId
285
+ } ) ;
272
286
await this . executeInQueue ( invalidationKeys , 10 , newVersionEntry ) ;
273
287
return mostRecentTargetTableName ( ) ;
274
288
}
@@ -283,7 +297,10 @@ class PreAggregationLoader {
283
297
}
284
298
285
299
scheduleRefresh ( invalidationKeys , newVersionEntry ) {
286
- this . logger ( 'Refreshing pre-aggregation content' , { preAggregation : this . preAggregation } ) ;
300
+ this . logger ( 'Refreshing pre-aggregation content' , {
301
+ preAggregation : this . preAggregation ,
302
+ requestId : this . requestId
303
+ } ) ;
287
304
this . executeInQueue ( invalidationKeys , 0 , newVersionEntry )
288
305
. then ( ( ) => {
289
306
delete this . preAggregations . refreshErrors [ newVersionEntry . table_name ] ;
@@ -297,9 +314,11 @@ class PreAggregationLoader {
297
314
this . preAggregations . refreshErrors [ newVersionEntry . table_name ] [ newVersionEntry . content_version ] . error = e ;
298
315
this . preAggregations . refreshErrors [ newVersionEntry . table_name ] [ newVersionEntry . content_version ] . counter += 1 ;
299
316
}
300
- this . logger ( 'Error refreshing pre-aggregation' , { error : ( e . stack || e ) , preAggregation : this . preAggregation } )
317
+ this . logger ( 'Error refreshing pre-aggregation' , {
318
+ error : ( e . stack || e ) , preAggregation : this . preAggregation , requestId : this . requestId
319
+ } ) ;
301
320
}
302
- } )
321
+ } ) ;
303
322
}
304
323
305
324
executeInQueue ( invalidationKeys , priority , newVersionEntry ) {
@@ -309,7 +328,8 @@ class PreAggregationLoader {
309
328
{
310
329
preAggregation : this . preAggregation ,
311
330
preAggregationsTablesToTempTables : this . preAggregationsTablesToTempTables ,
312
- newVersionEntry
331
+ newVersionEntry,
332
+ requestId : this . requestId
313
333
} ,
314
334
priority ,
315
335
{ stageQueryKey : PreAggregations . preAggregationQueryCacheKey ( this . preAggregation ) }
@@ -360,14 +380,20 @@ class PreAggregationLoader {
360
380
throw new Error ( `Can't load external pre-aggregation: source driver doesn't support downloadTable()` ) ;
361
381
}
362
382
const table = this . targetTableName ( newVersionEntry ) ;
363
- this . logger ( 'Downloading external pre-aggregation' , { preAggregation : this . preAggregation } ) ;
383
+ this . logger ( 'Downloading external pre-aggregation' , {
384
+ preAggregation : this . preAggregation ,
385
+ requestId : this . requestId
386
+ } ) ;
364
387
const tableData = await client . downloadTable ( table ) ;
365
388
const columns = await client . tableColumnTypes ( table ) ;
366
389
const externalDriver = await this . externalDriverFactory ( ) ;
367
390
if ( ! externalDriver . uploadTable ) {
368
391
throw new Error ( `Can't load external pre-aggregation: destination driver doesn't support uploadTable()` ) ;
369
392
}
370
- this . logger ( 'Uploading external pre-aggregation' , { preAggregation : this . preAggregation } ) ;
393
+ this . logger ( 'Uploading external pre-aggregation' , {
394
+ preAggregation : this . preAggregation ,
395
+ requestId : this . requestId
396
+ } ) ;
371
397
await externalDriver . uploadTable ( table , columns , tableData ) ;
372
398
await this . loadCache . reset ( this . preAggregation ) ;
373
399
await this . dropOrphanedTables ( externalDriver , table ) ;
@@ -389,7 +415,10 @@ class PreAggregationLoader {
389
415
const toDrop = actualTables
390
416
. map ( t => `${ this . preAggregation . preAggregationsSchema } .${ t . table_name || t . TABLE_NAME } ` )
391
417
. filter ( t => tablesToSave . indexOf ( t ) === - 1 ) ;
392
- this . logger ( 'Dropping orphaned tables' , { tablesToDrop : JSON . stringify ( toDrop ) } ) ;
418
+ this . logger ( 'Dropping orphaned tables' , {
419
+ tablesToDrop : JSON . stringify ( toDrop ) ,
420
+ requestId : this . requestId
421
+ } ) ;
393
422
await Promise . all ( toDrop . map ( table => client . dropTable ( table ) ) ) ;
394
423
}
395
424
}
@@ -434,7 +463,7 @@ class PreAggregations {
434
463
p ,
435
464
preAggregationsTablesToTempTables ,
436
465
loadCache ,
437
- { waitForRenew : queryBody . renewQuery }
466
+ { waitForRenew : queryBody . renewQuery , requestId : queryBody . requestId }
438
467
) ;
439
468
const preAggregationPromise = ( ) => loader . loadPreAggregation ( ) . then ( async targetTableName => {
440
469
const usedPreAggregation = typeof targetTableName === 'string' ? { targetTableName } : targetTableName ;
@@ -448,7 +477,9 @@ class PreAggregations {
448
477
getQueue ( ) {
449
478
if ( ! this . queue ) {
450
479
this . queue = QueryCache . createQueue ( `SQL_PRE_AGGREGATIONS_${ this . redisPrefix } ` , this . driverFactory , ( client , q ) => {
451
- const { preAggregation, preAggregationsTablesToTempTables, newVersionEntry } = q ;
480
+ const {
481
+ preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId
482
+ } = q ;
452
483
const loader = new PreAggregationLoader (
453
484
this . redisPrefix ,
454
485
this . driverFactory ,
@@ -457,7 +488,8 @@ class PreAggregations {
457
488
this ,
458
489
preAggregation ,
459
490
preAggregationsTablesToTempTables ,
460
- new PreAggregationLoadCache ( this . redisPrefix , this . driverFactory , this . queryCache , this )
491
+ new PreAggregationLoadCache ( this . redisPrefix , this . driverFactory , this . queryCache , this ) ,
492
+ { requestId }
461
493
) ;
462
494
return loader . refresh ( newVersionEntry ) ( client ) ;
463
495
} , {
0 commit comments