@@ -15,7 +15,6 @@ import (
15
15
"unsafe"
16
16
17
17
"github.com/cespare/xxhash/v2"
18
- "github.com/cockroachdb/crlib/crtime"
19
18
"github.com/cockroachdb/crlib/fifo"
20
19
"github.com/cockroachdb/errors"
21
20
"github.com/cockroachdb/pebble/internal/base"
@@ -28,6 +27,7 @@ import (
28
27
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
29
28
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
30
29
"github.com/cockroachdb/pebble/sstable/block/blockkind"
30
+ "github.com/cockroachdb/redact"
31
31
)
32
32
33
33
// Kind is a convenience alias.
@@ -371,18 +371,23 @@ func (r *Reader) Read(
371
371
return CacheBufferHandle (cv ), nil
372
372
}
373
373
}
374
- value , err := r .doRead (ctx , env , readHandle , bh , kind , initBlockMetadataFn )
374
+ value , err := r .doRead (ctx , env , readHandle , bh , kind , 0 , initBlockMetadataFn )
375
375
if err != nil {
376
376
return BufferHandle {}, env .maybeReportCorruption (err )
377
377
}
378
378
return value .MakeHandle (), nil
379
379
}
380
380
381
- cv , crh , errorDuration , hit , err := r .opts .CacheOpts .CacheHandle .GetWithReadHandle (
381
+ cv , crh , errorDuration , waitDuration , hit , err := r .opts .CacheOpts .CacheHandle .GetWithReadHandle (
382
382
ctx , r .opts .CacheOpts .FileNum , bh .Offset )
383
- if errorDuration > 5 * time .Millisecond && r .opts .LoggerAndTracer .IsTracingEnabled (ctx ) {
383
+ const slowDur = 5 * time .Millisecond
384
+ if waitDuration > slowDur && r .opts .LoggerAndTracer .IsTracingEnabled (ctx ) {
384
385
r .opts .LoggerAndTracer .Eventf (
385
- ctx , "waited for turn when %s time wasted by failed reads" , errorDuration .String ())
386
+ ctx , "waited for reading turn for %v" , waitDuration )
387
+ }
388
+ if errorDuration > slowDur && r .opts .LoggerAndTracer .IsTracingEnabled (ctx ) {
389
+ r .opts .LoggerAndTracer .Eventf (
390
+ ctx , "failed reads by others wasted %v" , errorDuration )
386
391
}
387
392
// TODO(sumeer): consider tracing when waited longer than some duration
388
393
// for turn to do the read.
@@ -400,11 +405,15 @@ func (r *Reader) Read(
400
405
}
401
406
if hit {
402
407
recordCacheHit (ctx , env , readHandle , bh , kind )
408
+ } else {
409
+ // The block was not in the cache, and someone else read it, but this
410
+ // caller had to wait for that read. So account for it in the stats.
411
+ env .BlockRead (kind , bh .Length , waitDuration )
403
412
}
404
413
return CacheBufferHandle (cv ), nil
405
414
}
406
415
407
- value , err := r .doRead (ctx , env , readHandle , bh , kind , initBlockMetadataFn )
416
+ value , err := r .doRead (ctx , env , readHandle , bh , kind , waitDuration , initBlockMetadataFn )
408
417
if err != nil {
409
418
crh .SetReadError (err )
410
419
return BufferHandle {}, env .maybeReportCorruption (err )
@@ -423,9 +432,6 @@ func recordCacheHit(
423
432
env .BlockServedFromCache (kind , bh .Length )
424
433
}
425
434
426
- // TODO(sumeer): should the threshold be configurable.
427
- const slowReadTracingThreshold = 5 * time .Millisecond
428
-
429
435
// doRead is a helper for Read that does the read, checksum check,
430
436
// decompression, and returns either a Value or an error.
431
437
func (r * Reader ) doRead (
@@ -434,6 +440,7 @@ func (r *Reader) doRead(
434
440
readHandle objstorage.ReadHandle ,
435
441
bh Handle ,
436
442
kind Kind ,
443
+ waitBeforeReadDuration time.Duration ,
437
444
initBlockMetadataFn func (* Metadata , []byte ) error ,
438
445
) (Value , error ) {
439
446
ctx = objiotracing .WithBlockKind (ctx , kind )
@@ -447,21 +454,28 @@ func (r *Reader) doRead(
447
454
}
448
455
449
456
compressed := Alloc (int (bh .Length + TrailerLen ), env .BufferPool )
450
- readStopwatch := makeStopwatch ()
457
+ readStopwatch := base . MakeStopwatch ()
451
458
var err error
452
459
if readHandle != nil {
453
460
err = readHandle .ReadAt (ctx , compressed .BlockData (), int64 (bh .Offset ))
454
461
} else {
455
462
err = r .readable .ReadAt (ctx , compressed .BlockData (), int64 (bh .Offset ))
456
463
}
457
- readDuration := readStopwatch .stop ()
464
+ readDuration := readStopwatch .Stop ()
458
465
// Call IsTracingEnabled to avoid the allocations of boxing integers into an
459
466
// interface{}, unless necessary.
460
- if readDuration >= slowReadTracingThreshold && r .opts .LoggerAndTracer .IsTracingEnabled (ctx ) {
467
+ if (readDuration + waitBeforeReadDuration ) >= base .SlowReadTracingThreshold &&
468
+ r .opts .LoggerAndTracer .IsTracingEnabled (ctx ) {
461
469
_ , file1 , line1 , _ := runtime .Caller (1 )
462
470
_ , file2 , line2 , _ := runtime .Caller (2 )
463
- r .opts .LoggerAndTracer .Eventf (ctx , "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)" ,
464
- int (bh .Length + TrailerLen ), readDuration .String (),
471
+ var waitDurStr string
472
+ if waitBeforeReadDuration > 0 {
473
+ waitDurStr = fmt .Sprintf ("+ %v wait " , waitBeforeReadDuration )
474
+ }
475
+ r .opts .LoggerAndTracer .Eventf (
476
+ ctx , "reading block kind %s of %d bytes took %v %s(fileNum=%s; %s/%s:%d -> %s/%s:%d)" ,
477
+ redact .SafeString (kind .String ()),
478
+ int (bh .Length + TrailerLen ), readDuration , redact .SafeString (waitDurStr ),
465
479
r .opts .CacheOpts .FileNum ,
466
480
filepath .Base (filepath .Dir (file2 )), filepath .Base (file2 ), line2 ,
467
481
filepath .Base (filepath .Dir (file1 )), filepath .Base (file1 ), line1 )
@@ -470,7 +484,7 @@ func (r *Reader) doRead(
470
484
compressed .Release ()
471
485
return Value {}, err
472
486
}
473
- env .BlockRead (kind , bh .Length , readDuration )
487
+ env .BlockRead (kind , bh .Length , readDuration + waitBeforeReadDuration )
474
488
if err = ValidateChecksum (r .checksumType , compressed .BlockData (), bh ); err != nil {
475
489
compressed .Release ()
476
490
err = errors .Wrapf (err , "pebble: file %s" , r .opts .CacheOpts .FileNum )
@@ -557,51 +571,22 @@ func ReadRaw(
557
571
return nil , base .CorruptionErrorf ("pebble: invalid file %s (file size is too small)" , errors .Safe (fileNum ))
558
572
}
559
573
560
- readStopwatch := makeStopwatch ()
574
+ readStopwatch := base . MakeStopwatch ()
561
575
var err error
562
576
if readHandle != nil {
563
577
err = readHandle .ReadAt (ctx , buf , off )
564
578
} else {
565
579
err = f .ReadAt (ctx , buf , off )
566
580
}
567
- readDuration := readStopwatch .stop ()
581
+ readDuration := readStopwatch .Stop ()
568
582
// Call IsTracingEnabled to avoid the allocations of boxing integers into an
569
583
// interface{}, unless necessary.
570
- if readDuration >= slowReadTracingThreshold && logger .IsTracingEnabled (ctx ) {
571
- logger .Eventf (ctx , "reading footer of %d bytes took %s " ,
572
- len (buf ), readDuration . String () )
584
+ if readDuration >= base . SlowReadTracingThreshold && logger .IsTracingEnabled (ctx ) {
585
+ logger .Eventf (ctx , "reading footer of %d bytes took %v " ,
586
+ len (buf ), readDuration )
573
587
}
574
588
if err != nil {
575
589
return nil , errors .Wrap (err , "pebble: invalid file (could not read footer)" )
576
590
}
577
591
return buf , nil
578
592
}
579
-
580
- // DeterministicReadBlockDurationForTesting is for tests that want a
581
- // deterministic value of the time to read a block (that is not in the cache).
582
- // The return value is a function that must be called before the test exits.
583
- func DeterministicReadBlockDurationForTesting () func () {
584
- drbdForTesting := deterministicReadBlockDurationForTesting
585
- deterministicReadBlockDurationForTesting = true
586
- return func () {
587
- deterministicReadBlockDurationForTesting = drbdForTesting
588
- }
589
- }
590
-
591
- var deterministicReadBlockDurationForTesting = false
592
-
593
- type deterministicStopwatchForTesting struct {
594
- startTime crtime.Mono
595
- }
596
-
597
- func makeStopwatch () deterministicStopwatchForTesting {
598
- return deterministicStopwatchForTesting {startTime : crtime .NowMono ()}
599
- }
600
-
601
- func (w deterministicStopwatchForTesting ) stop () time.Duration {
602
- dur := w .startTime .Elapsed ()
603
- if deterministicReadBlockDurationForTesting {
604
- dur = slowReadTracingThreshold
605
- }
606
- return dur
607
- }
0 commit comments