Skip to content

Commit 1520f91

Browse files
committed
db: track duration, bytes read for block reads in TableIngestInfo
This patch tracks the duration of block reads, along with the amount of uncached block bytes read, inside of `TableIngestInfo`. This gives us more observability into slow ingests. Fixes: #4970
1 parent 503a214 commit 1520f91

14 files changed

+100
-54
lines changed

blob_rewrite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
175175
info.Output.DiskFileNum = ve.NewBlobFiles[0].Physical.FileNum
176176
info.Output.Size = ve.NewBlobFiles[0].Physical.Size
177177
info.Output.ValueSize = ve.NewBlobFiles[0].Physical.ValueSize
178-
err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
178+
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
179179
// It's possible that concurrent compactions removed references to
180180
// the blob file while the blob file rewrite compaction was running.
181181
// Now that we have the manifest lock, check if the blob file is

compaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1739,7 +1739,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17391739
ve, stats, compactionErr = d.runCompaction(jobID, c)
17401740
}
17411741

1742-
err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1742+
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
17431743
err := compactionErr
17441744
if c.kind == compactionKindIngestedFlushable {
17451745
ve, err = d.runIngestFlush(c)
@@ -2697,7 +2697,7 @@ func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
26972697
info.Duration = d.timeNow().Sub(startTime)
26982698
if err == nil {
26992699
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
2700-
err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
2700+
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
27012701
// Check if this compaction had a conflicting operation (eg. a d.excise())
27022702
// that necessitates it restarting from scratch. Note that since we hold
27032703
// the manifest lock, we don't expect this bool to change its value

data_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
11861186
}
11871187

11881188
jobID := d.newJobIDLocked()
1189-
err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1189+
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
11901190
return versionUpdate{
11911191
VE: ve,
11921192
JobID: jobID,

event.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,12 @@ type TableIngestInfo struct {
541541
WaitFlushDuration time.Duration
542542
// ManifestUpdateDuration is the time spent updating the manifest.
543543
ManifestUpdateDuration time.Duration
544+
// BlockReadDuration is the total time spent reading blocks for the ingested
545+
// sstable.
546+
BlockReadDuration time.Duration
547+
// BlockReadBytes is the total number of bytes from blocks read for the
548+
// ingested sstable. This does not include bytes read from the block cache.
549+
BlockReadBytes uint64
544550
}
545551

546552
func (i TableIngestInfo) String() string {
@@ -573,7 +579,9 @@ func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
573579
w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
574580
redact.Safe(humanize.Bytes.Uint64(t.Size)))
575581
}
576-
w.Printf("; manifest update took %.1fs", redact.Safe(i.ManifestUpdateDuration.Seconds()))
582+
w.Printf("; manifest update took %.1fs; block reads took %.1fs with %s block bytes read",
583+
redact.Safe(i.ManifestUpdateDuration.Seconds()), redact.Safe(i.BlockReadDuration.Seconds()),
584+
redact.Safe(humanize.Bytes.Uint64(i.BlockReadBytes)))
577585
}
578586

579587
// TableStatsInfo contains the info for a table stats loaded event.

event_listener_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func TestEventListener(t *testing.T) {
6060
// Make deterministic.
6161
info.WaitFlushDuration = 200 * time.Millisecond
6262
info.ManifestUpdateDuration = 100 * time.Millisecond
63+
info.BlockReadDuration = 300 * time.Millisecond
64+
info.BlockReadBytes = 7894
6365
tableIngested(info)
6466
}
6567
opts := &Options{

file_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func TestVirtualReadsWiring(t *testing.T) {
406406
}
407407

408408
applyVE := func(ve *manifest.VersionEdit) error {
409-
err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
409+
_, err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
410410
return versionUpdate{
411411
VE: ve,
412412
JobID: d.newJobIDLocked(),

format_major_version.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ func (d *DB) markFilesLocked(findFn findFilesFunc) error {
612612

613613
// Lock the manifest for a coherent view of the LSM. The database lock has
614614
// been re-acquired by the defer within the above anonymous function.
615-
return d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
615+
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
616616
vers := d.mu.versions.currentVersion()
617617
for l, filesToMark := range files {
618618
if len(filesToMark) == 0 {
@@ -653,4 +653,5 @@ func (d *DB) markFilesLocked(findFn findFilesFunc) error {
653653
InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) },
654654
}, nil
655655
})
656+
return err
656657
}

ingest.go

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -259,44 +259,49 @@ func ingestLoad1(
259259
cacheHandle *cache.Handle,
260260
tableNum base.TableNum,
261261
rangeKeyValidator rangeKeyIngestValidator,
262-
) (meta *manifest.TableMetadata, lastRangeKey keyspan.Span, err error) {
262+
) (
263+
meta *manifest.TableMetadata,
264+
lastRangeKey keyspan.Span,
265+
blockReadStats base.BlockReadStats,
266+
err error,
267+
) {
263268
o := opts.MakeReaderOptions()
264269
o.CacheOpts = sstableinternal.CacheOptions{
265270
CacheHandle: cacheHandle,
266271
FileNum: base.PhysicalTableDiskFileNum(tableNum),
267272
}
268273
r, err := sstable.NewReader(ctx, readable, o)
269274
if err != nil {
270-
return nil, keyspan.Span{}, errors.CombineErrors(err, readable.Close())
275+
return nil, keyspan.Span{}, base.BlockReadStats{}, errors.CombineErrors(err, readable.Close())
271276
}
272277
defer func() { _ = r.Close() }()
273278

274279
// Avoid ingesting tables with format versions this DB doesn't support.
275280
tf, err := r.TableFormat()
276281
if err != nil {
277-
return nil, keyspan.Span{}, err
282+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
278283
}
279284
if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
280-
return nil, keyspan.Span{}, errors.Newf(
285+
return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
281286
"pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
282287
tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
283288
)
284289
}
285290

286291
if r.Attributes.Has(sstable.AttributeBlobValues) {
287-
return nil, keyspan.Span{}, errors.Newf(
292+
return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
288293
"pebble: ingesting tables with blob references is not supported")
289294
}
290295

291296
props, err := r.ReadPropertiesBlock(ctx, nil /* buffer pool */)
292297
if err != nil {
293-
return nil, keyspan.Span{}, err
298+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
294299
}
295300

296301
// If this is a columnar block, read key schema name from properties block.
297302
if tf.BlockColumnar() {
298303
if _, ok := opts.KeySchemas[props.KeySchemaName]; !ok {
299-
return nil, keyspan.Span{}, errors.Newf(
304+
return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
300305
"pebble: table uses key schema %q unknown to the database",
301306
props.KeySchemaName)
302307
}
@@ -319,55 +324,71 @@ func ingestLoad1(
319324
// calculating stats before we can remove the original link.
320325
maybeSetStatsFromProperties(meta.PhysicalMeta(), &props, opts.Logger)
321326

327+
var iterStats base.InternalIteratorStats
328+
env := sstable.ReadEnv{
329+
Block: block.ReadEnv{
330+
Stats: &iterStats,
331+
},
332+
}
322333
{
323-
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */, sstable.AssertNoBlobHandles)
334+
iterOpts := sstable.IterOptions{
335+
Lower: nil,
336+
Upper: nil,
337+
Transforms: sstable.NoTransforms,
338+
Filterer: nil,
339+
FilterBlockSizeLimit: sstable.AlwaysUseFilterBlock,
340+
Env: env,
341+
ReaderProvider: sstable.MakeTrivialReaderProvider(r),
342+
BlobContext: sstable.AssertNoBlobHandles,
343+
}
344+
iter, err := r.NewPointIter(ctx, iterOpts)
324345
if err != nil {
325-
return nil, keyspan.Span{}, err
346+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
326347
}
327348
defer func() { _ = iter.Close() }()
328349
var smallest InternalKey
329350
if kv := iter.First(); kv != nil {
330351
if err := ingestValidateKey(opts, &kv.K); err != nil {
331-
return nil, keyspan.Span{}, err
352+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
332353
}
333354
smallest = kv.K.Clone()
334355
}
335356
if err := iter.Error(); err != nil {
336-
return nil, keyspan.Span{}, err
357+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
337358
}
338359
if kv := iter.Last(); kv != nil {
339360
if err := ingestValidateKey(opts, &kv.K); err != nil {
340-
return nil, keyspan.Span{}, err
361+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
341362
}
342363
meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
343364
}
344365
if err := iter.Error(); err != nil {
345-
return nil, keyspan.Span{}, err
366+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
346367
}
347368
}
348369

349-
iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms, sstable.NoReadEnv)
370+
iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms, env)
350371
if err != nil {
351-
return nil, keyspan.Span{}, err
372+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
352373
}
353374
if iter != nil {
354375
defer iter.Close()
355376
var smallest InternalKey
356377
if s, err := iter.First(); err != nil {
357-
return nil, keyspan.Span{}, err
378+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
358379
} else if s != nil {
359380
key := s.SmallestKey()
360381
if err := ingestValidateKey(opts, &key); err != nil {
361-
return nil, keyspan.Span{}, err
382+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
362383
}
363384
smallest = key.Clone()
364385
}
365386
if s, err := iter.Last(); err != nil {
366-
return nil, keyspan.Span{}, err
387+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
367388
} else if s != nil {
368389
k := s.SmallestKey()
369390
if err := ingestValidateKey(opts, &k); err != nil {
370-
return nil, keyspan.Span{}, err
391+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
371392
}
372393
largest := s.LargestKey().Clone()
373394
meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
@@ -376,34 +397,34 @@ func ingestLoad1(
376397

377398
// Update the range-key bounds for the table.
378399
{
379-
iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms, sstable.NoReadEnv)
400+
iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms, env)
380401
if err != nil {
381-
return nil, keyspan.Span{}, err
402+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
382403
}
383404
if iter != nil {
384405
defer iter.Close()
385406
var smallest InternalKey
386407
if s, err := iter.First(); err != nil {
387-
return nil, keyspan.Span{}, err
408+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
388409
} else if s != nil {
389410
key := s.SmallestKey()
390411
if err := ingestValidateKey(opts, &key); err != nil {
391-
return nil, keyspan.Span{}, err
412+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
392413
}
393414
smallest = key.Clone()
394415
// Range keys need some additional validation as we need to ensure they
395416
// defragment cleanly with the lastRangeKey from the previous file.
396417
if err := rangeKeyValidator.Validate(s); err != nil {
397-
return nil, keyspan.Span{}, err
418+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
398419
}
399420
}
400421
lastRangeKey = keyspan.Span{}
401422
if s, err := iter.Last(); err != nil {
402-
return nil, keyspan.Span{}, err
423+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
403424
} else if s != nil {
404425
k := s.SmallestKey()
405426
if err := ingestValidateKey(opts, &k); err != nil {
406-
return nil, keyspan.Span{}, err
427+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
407428
}
408429
// As range keys are fragmented, the end key of the last range key in
409430
// the table provides the upper bound for the table.
@@ -413,27 +434,27 @@ func ingestLoad1(
413434
} else {
414435
// s == nil.
415436
if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
416-
return nil, keyspan.Span{}, err
437+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
417438
}
418439
}
419440
} else {
420441
if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
421-
return nil, keyspan.Span{}, err
442+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
422443
}
423444
lastRangeKey = keyspan.Span{}
424445
}
425446
}
426447

427448
if !meta.HasPointKeys && !meta.HasRangeKeys {
428-
return nil, keyspan.Span{}, nil
449+
return nil, keyspan.Span{}, base.BlockReadStats{}, nil
429450
}
430451

431452
// Sanity check that the various bounds on the file were set consistently.
432453
if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
433-
return nil, keyspan.Span{}, err
454+
return nil, keyspan.Span{}, base.BlockReadStats{}, err
434455
}
435456

436-
return meta, lastRangeKey, nil
457+
return meta, lastRangeKey, iterStats.TotalBlockReads(), nil
437458
}
438459

439460
type ingestLoadResult struct {
@@ -442,6 +463,7 @@ type ingestLoadResult struct {
442463
external []ingestExternalMeta
443464

444465
externalFilesHaveLevel bool
466+
blockReadStats base.BlockReadStats
445467
}
446468

447469
type ingestLocalMeta struct {
@@ -485,6 +507,7 @@ func ingestLoad(
485507
var result ingestLoadResult
486508
result.local = make([]ingestLocalMeta, 0, len(paths))
487509
var lastRangeKey keyspan.Span
510+
var blockReadStats base.BlockReadStats
488511
// NB: we disable range key boundary assertions if we have shared or external files
489512
// present in this ingestion. This is because a suffixed range key in a local file
490513
// can possibly defragment with a suffixed range key in a shared or external file.
@@ -508,7 +531,7 @@ func ingestLoad(
508531
if !shouldDisableRangeKeyChecks {
509532
rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
510533
}
511-
m, lastRangeKey, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
534+
m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
512535
if err != nil {
513536
return ingestLoadResult{}, err
514537
}
@@ -517,6 +540,7 @@ func ingestLoad(
517540
TableMetadata: m,
518541
path: paths[i],
519542
})
543+
result.blockReadStats = blockReadStats
520544
}
521545
}
522546

@@ -1455,7 +1479,8 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
14551479

14561480
// Load the metadata for all the files being ingested. This step detects
14571481
// and elides empty sstables.
1458-
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheHandle, pendingOutputs)
1482+
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external,
1483+
d.cacheHandle, pendingOutputs)
14591484
if err != nil {
14601485
return IngestOperationStats{}, err
14611486
}
@@ -1734,6 +1759,8 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
17341759
flushable: asFlushable,
17351760
WaitFlushDuration: waitFlushDuration,
17361761
ManifestUpdateDuration: manifestUpdateDuration,
1762+
BlockReadDuration: loadResult.blockReadStats.BlockReadDuration,
1763+
BlockReadBytes: loadResult.blockReadStats.BlockBytes - loadResult.blockReadStats.BlockBytesInCache,
17371764
}
17381765
if len(loadResult.local) > 0 {
17391766
info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
@@ -1924,12 +1951,11 @@ func (d *DB) ingestApply(
19241951
}
19251952
var metrics levelMetricsDelta
19261953

1927-
manifestUpdateStart := crtime.NowMono()
19281954
// Determine the target level inside UpdateVersionLocked. This prevents two
19291955
// concurrent ingestion jobs from using the same version to determine the
19301956
// target level, and also provides serialization with concurrent compaction
19311957
// and flush jobs.
1932-
err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1958+
manifestUpdateDuration, err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
19331959
if mut != nil {
19341960
// Unref the mutable memtable to allows its flush to proceed. Now that we've
19351961
// acquired the manifest lock, we can be certain that if the mutable
@@ -2174,8 +2200,6 @@ func (d *DB) ingestApply(
21742200
if err != nil {
21752201
return nil, 0, err
21762202
}
2177-
manifestUpdateDuration := manifestUpdateStart.Elapsed()
2178-
21792203
// Check for any EventuallyFileOnlySnapshots that could be watching for
21802204
// an excise on this span. There should be none as the
21812205
// computePossibleOverlaps steps should have forced these EFOS to transition
@@ -2220,6 +2244,7 @@ func (d *DB) ingestApply(
22202244
}
22212245
}
22222246
d.maybeValidateSSTablesLocked(toValidate)
2247+
22232248
return ve, manifestUpdateDuration, nil
22242249
}
22252250

0 commit comments

Comments
 (0)