Skip to content

Commit 520d3f3

Browse files
committed
db: consolidate compaction metrics in a struct
Consolidate the various fields of the compaction struct related to metrics collection under a new compactionMetrics struct.
1 parent ee64cc6 commit 520d3f3

File tree

3 files changed

+82
-62
lines changed

3 files changed

+82
-62
lines changed

compaction.go

Lines changed: 74 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,6 @@ type compaction struct {
202202
comparer *base.Comparer
203203
logger Logger
204204
version *manifest.Version
205-
stats base.InternalIteratorStats
206-
beganAt time.Time
207205
// versionEditApplied is set to true when a compaction has completed and the
208206
// resulting version has been installed (if successful), but the compaction
209207
// goroutine is still cleaning up (eg, deleting obsolete files).
@@ -254,9 +252,6 @@ type compaction struct {
254252
// single output table with the tables in the grandparent level.
255253
maxOverlapBytes uint64
256254

257-
// bytesWritten contains the number of bytes that have been written to outputs.
258-
bytesWritten atomic.Int64
259-
260255
// The boundaries of the input data.
261256
bounds base.UserKeyBounds
262257

@@ -305,16 +300,35 @@ type compaction struct {
305300
l0Limits [][]byte
306301
}
307302

308-
metrics levelMetricsDelta
309-
310-
pickerMetrics pickedCompactionMetrics
303+
// metrics encapsulates various metrics collected during a compaction.
304+
metrics compactionMetrics
311305

312306
grantHandle CompactionGrantHandle
313307

314308
tableFormat sstable.TableFormat
315309
objCreateOpts objstorage.CreateOptions
316310
}
317311

312+
// compactionMetrics contians metrics surrounding a compaction.
313+
type compactionMetrics struct {
314+
// beganAt is the time when the compaction began.
315+
beganAt time.Time
316+
// bytesWritten contains the number of bytes that have been written to
317+
// outputs. It's updated whenever the compaction outputs'
318+
// objstorage.Writables receive new writes. See newCompactionOutputObj.
319+
bytesWritten atomic.Int64
320+
// internalIterStats contains statistics from the internal iterators used by
321+
// the compaction.
322+
//
323+
// TODO(jackson): Use these to power the compaction BytesRead metric.
324+
internalIterStats base.InternalIteratorStats
325+
// perLevel contains metrics for each level involved in the compaction.
326+
perLevel levelMetricsDelta
327+
// picker contains metrics from the compaction picker when the compaction
328+
// was picked.
329+
picker pickedCompactionMetrics
330+
}
331+
318332
// inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
319333
// input sstables.
320334
func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
@@ -360,11 +374,11 @@ func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
360374
info.Output.Level = numLevels - 1
361375
}
362376

363-
for i, score := range c.pickerMetrics.scores {
377+
for i, score := range c.metrics.picker.scores {
364378
info.Input[i].Score = score
365379
}
366-
info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
367-
info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
380+
info.SingleLevelOverlappingRatio = c.metrics.picker.singleLevelOverlappingRatio
381+
info.MultiLevelOverlappingRatio = c.metrics.picker.multiLevelOverlappingRatio
368382
if len(info.Input) > 2 {
369383
info.Annotations = append(info.Annotations, "multilevel")
370384
}
@@ -393,13 +407,15 @@ func newCompaction(
393407
bounds: pc.bounds,
394408
logger: opts.Logger,
395409
version: pc.version,
396-
beganAt: beganAt,
397410
getValueSeparation: getValueSeparation,
398411
maxOutputFileSize: pc.maxOutputFileSize,
399412
maxOverlapBytes: pc.maxOverlapBytes,
400-
pickerMetrics: pc.pickerMetrics,
401-
grantHandle: grantHandle,
402-
tableFormat: tableFormat,
413+
metrics: compactionMetrics{
414+
beganAt: beganAt,
415+
picker: pc.pickerMetrics,
416+
},
417+
grantHandle: grantHandle,
418+
tableFormat: tableFormat,
403419
}
404420
// Acquire a reference to the version to ensure that files and in-memory
405421
// version state necessary for reading files remain available. Ignoring
@@ -542,9 +558,11 @@ func newDeleteOnlyCompaction(
542558
comparer: opts.Comparer,
543559
logger: opts.Logger,
544560
version: cur,
545-
beganAt: beganAt,
546561
inputs: inputs,
547562
grantHandle: noopGrantHandle{},
563+
metrics: compactionMetrics{
564+
beganAt: beganAt,
565+
},
548566
}
549567
c.deleteOnly.hints = hints
550568
c.deleteOnly.exciseEnabled = exciseEnabled
@@ -664,13 +682,15 @@ func newFlush(
664682
comparer: opts.Comparer,
665683
logger: opts.Logger,
666684
version: cur,
667-
beganAt: beganAt,
668685
inputs: []compactionLevel{{level: -1}, {level: 0}},
669686
getValueSeparation: getValueSeparation,
670687
maxOutputFileSize: math.MaxUint64,
671688
maxOverlapBytes: math.MaxUint64,
672689
grantHandle: noopGrantHandle{},
673690
tableFormat: tableFormat,
691+
metrics: compactionMetrics{
692+
beganAt: beganAt,
693+
},
674694
}
675695
c.flush.flushables = flushing
676696
c.flush.l0Limits = l0Organizer.FlushSplitKeys()
@@ -1030,7 +1050,7 @@ func (c *compaction) newInputIters(
10301050
// iter.
10311051
pointIter = iters[0]
10321052
if len(iters) > 1 {
1033-
pointIter = newMergingIter(c.logger, &c.stats, cmp, nil, iters...)
1053+
pointIter = newMergingIter(c.logger, &c.metrics.internalIterStats, cmp, nil, iters...)
10341054
}
10351055

10361056
// In normal operation, levelIter iterates over the point operations in a
@@ -1382,10 +1402,10 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
13821402
ingestFlushable := c.flush.flushables[0].flushable.(*ingestedFlushable)
13831403

13841404
updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
1385-
levelMetrics := c.metrics[level]
1405+
levelMetrics := c.metrics.perLevel[level]
13861406
if levelMetrics == nil {
13871407
levelMetrics = &LevelMetrics{}
1388-
c.metrics[level] = levelMetrics
1408+
c.metrics.perLevel[level] = levelMetrics
13891409
}
13901410
levelMetrics.TablesCount--
13911411
levelMetrics.TablesSize -= int64(m.Size)
@@ -1448,11 +1468,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
14481468
level: level,
14491469
})
14501470
}
1451-
levelMetrics := c.metrics[level]
1452-
if levelMetrics == nil {
1453-
levelMetrics = &LevelMetrics{}
1454-
c.metrics[level] = levelMetrics
1455-
}
1471+
levelMetrics := c.metrics.perLevel.level(level)
14561472
levelMetrics.TableBytesIngested += file.Size
14571473
levelMetrics.TablesIngested++
14581474
}
@@ -1628,7 +1644,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
16281644
// oldest unflushed memtable.
16291645
ve.MinUnflushedLogNum = minUnflushedLogNum
16301646
if c.kind != compactionKindIngestedFlushable {
1631-
l0Metrics := c.metrics[0]
1647+
l0Metrics := c.metrics.perLevel.level(0)
16321648
if d.opts.DisableWAL {
16331649
// If the WAL is disabled, every flushable has a zero [logSize],
16341650
// resulting in zero bytes in. Instead, use the number of bytes we
@@ -1677,7 +1693,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
16771693
return versionUpdate{
16781694
VE: ve,
16791695
JobID: jobID,
1680-
Metrics: c.metrics,
1696+
Metrics: c.metrics.perLevel,
16811697
InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
16821698
}, nil
16831699
})
@@ -1692,7 +1708,8 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
16921708

16931709
d.clearCompactingState(c, err != nil)
16941710
delete(d.mu.compact.inProgress, c)
1695-
d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics, c.bytesWritten.Load(), err)
1711+
d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.picker,
1712+
c.metrics.bytesWritten.Load(), err)
16961713

16971714
var flushed flushableList
16981715
if err == nil {
@@ -1702,7 +1719,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17021719
d.updateTableStatsLocked(ve.NewTables)
17031720
if ingest {
17041721
d.mu.versions.metrics.Flush.AsIngestCount++
1705-
for _, l := range c.metrics {
1722+
for _, l := range c.metrics.perLevel {
17061723
if l != nil {
17071724
d.mu.versions.metrics.Flush.AsIngestBytes += l.TableBytesIngested
17081725
d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
@@ -2476,7 +2493,7 @@ func (d *DB) compact(c *compaction, errChannel chan error) {
24762493
// must be atomic with the above removal of c from
24772494
// d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
24782495
// miss or double count a completing compaction's duration.
2479-
d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
2496+
d.mu.compact.duration += d.timeNow().Sub(c.metrics.beganAt)
24802497
}()
24812498
// Done must not be called while holding any lock that needs to be
24822499
// acquired by Schedule. Also, it must be called after new Version has
@@ -2625,7 +2642,7 @@ func (d *DB) compact1(jobID JobID, c *compaction) (err error) {
26252642
return versionUpdate{
26262643
VE: ve,
26272644
JobID: jobID,
2628-
Metrics: c.metrics,
2645+
Metrics: c.metrics.perLevel,
26292646
InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
26302647
}, nil
26312648
})
@@ -2646,10 +2663,11 @@ func (d *DB) compact1(jobID JobID, c *compaction) (err error) {
26462663
// NB: clearing compacting state must occur before updating the read state;
26472664
// L0Sublevels initialization depends on it.
26482665
d.clearCompactingState(c, err != nil)
2649-
d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics, c.bytesWritten.Load(), err)
2650-
d.mu.versions.incrementCompactionBytes(-c.bytesWritten.Load())
2666+
d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.picker,
2667+
c.metrics.bytesWritten.Load(), err)
2668+
d.mu.versions.incrementCompactionBytes(-c.metrics.bytesWritten.Load())
26512669

2652-
info.TotalDuration = d.timeNow().Sub(c.beganAt)
2670+
info.TotalDuration = d.timeNow().Sub(c.metrics.beganAt)
26532671
d.opts.EventListener.CompactionEnd(info)
26542672

26552673
// Update the read state before deleting obsolete files because the
@@ -2813,9 +2831,8 @@ func (d *DB) runCopyCompaction(
28132831
if errors.Is(err, sstable.ErrEmptySpan) {
28142832
// The virtual table was empty. Just remove the backing file.
28152833
// Note that deleteOnExit is true so we will delete the created object.
2816-
c.metrics[c.outputLevel.level] = &LevelMetrics{
2817-
TableBytesIn: inputMeta.Size,
2818-
}
2834+
outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
2835+
outputMetrics.TableBytesIn = inputMeta.Size
28192836

28202837
return ve, compact.Stats{}, nil
28212838
}
@@ -2839,11 +2856,10 @@ func (d *DB) runCopyCompaction(
28392856
if newMeta.Virtual {
28402857
ve.CreatedBackingTables = []*manifest.TableBacking{newMeta.TableBacking}
28412858
}
2842-
c.metrics[c.outputLevel.level] = &LevelMetrics{
2843-
TableBytesIn: inputMeta.Size,
2844-
TableBytesCompacted: newMeta.Size,
2845-
TablesCompacted: 1,
2846-
}
2859+
outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
2860+
outputMetrics.TableBytesIn = inputMeta.Size
2861+
outputMetrics.TableBytesCompacted = newMeta.Size
2862+
outputMetrics.TablesCompacted = 1
28472863

28482864
if err := d.objProvider.Sync(); err != nil {
28492865
return nil, compact.Stats{}, err
@@ -3022,12 +3038,11 @@ func (d *DB) runDeleteOnlyCompaction(
30223038
DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
30233039
}
30243040
for _, cl := range c.inputs {
3025-
levelMetrics := &LevelMetrics{}
3041+
levelMetrics := c.metrics.perLevel.level(cl.level)
30263042
err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.deleteOnly.exciseEnabled)
30273043
if err != nil {
30283044
return nil, stats, err
30293045
}
3030-
c.metrics[cl.level] = levelMetrics
30313046
}
30323047
// Remove any files that were added and deleted in the same versionEdit.
30333048
ve.NewTables = slices.DeleteFunc(ve.NewTables, func(e manifest.NewTableEntry) bool {
@@ -3067,10 +3082,9 @@ func (d *DB) runMoveCompaction(
30673082
if c.cancel.Load() {
30683083
return ve, stats, ErrCancelledCompaction
30693084
}
3070-
c.metrics[c.outputLevel.level] = &LevelMetrics{
3071-
TableBytesMoved: meta.Size,
3072-
TablesMoved: 1,
3073-
}
3085+
outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
3086+
outputMetrics.TableBytesMoved = meta.Size
3087+
outputMetrics.TablesMoved = 1
30743088
ve = &manifest.VersionEdit{
30753089
DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{
30763090
{Level: c.startLevel.level, FileNum: meta.TableNum}: meta,
@@ -3200,7 +3214,7 @@ func (d *DB) compactAndWrite(
32003214
defer c.bufferPool.Release()
32013215
blockReadEnv := block.ReadEnv{
32023216
BufferPool: &c.bufferPool,
3203-
Stats: &c.stats,
3217+
Stats: &c.metrics.internalIterStats,
32043218
IterStats: d.fileCache.SSTStatsCollector().Accumulator(
32053219
uint64(uintptr(unsafe.Pointer(c))),
32063220
categoryCompaction,
@@ -3333,13 +3347,13 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*manifest.VersionEd
33333347
}
33343348

33353349
startLevelBytes := c.startLevel.files.TableSizeSum()
3336-
outputMetrics := &LevelMetrics{
3337-
TableBytesIn: startLevelBytes,
3338-
// TODO(jackson): This BytesRead value does not include any blob files
3339-
// written. It either should, or we should add a separate metric.
3340-
TableBytesRead: c.outputLevel.files.TableSizeSum(),
3341-
BlobBytesCompacted: result.Stats.CumulativeBlobFileSize,
3342-
}
3350+
3351+
outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
3352+
outputMetrics.TableBytesIn = startLevelBytes
3353+
// TODO(jackson): This BytesRead value does not include any blob files
3354+
// written. It either should, or we should add a separate metric.
3355+
outputMetrics.TableBytesRead = c.outputLevel.files.TableSizeSum()
3356+
outputMetrics.BlobBytesCompacted = result.Stats.CumulativeBlobFileSize
33433357
if c.flush.flushables != nil {
33443358
outputMetrics.BlobBytesFlushed = result.Stats.CumulativeBlobFileSize
33453359
}
@@ -3348,12 +3362,11 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*manifest.VersionEd
33483362
}
33493363
outputMetrics.TableBytesRead += outputMetrics.TableBytesIn
33503364

3351-
c.metrics[c.outputLevel.level] = outputMetrics
3352-
if len(c.flush.flushables) == 0 && c.metrics[c.startLevel.level] == nil {
3353-
c.metrics[c.startLevel.level] = &LevelMetrics{}
3365+
if len(c.flush.flushables) == 0 {
3366+
c.metrics.perLevel.level(c.startLevel.level)
33543367
}
33553368
if len(c.extraLevels) > 0 {
3356-
c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
3369+
c.metrics.perLevel.level(c.extraLevels[0].level)
33573370
outputMetrics.MultiLevel.TableBytesInTop = startLevelBytes
33583371
outputMetrics.MultiLevel.TableBytesIn = outputMetrics.TableBytesIn
33593372
outputMetrics.MultiLevel.TableBytesRead = outputMetrics.TableBytesRead
@@ -3515,7 +3528,7 @@ func (d *DB) newCompactionOutputObj(
35153528
writable = &compactionWritable{
35163529
Writable: writable,
35173530
versions: d.mu.versions,
3518-
written: &c.bytesWritten,
3531+
written: &c.metrics.bytesWritten,
35193532
}
35203533
}
35213534
return writable, objMeta, nil

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2081,7 +2081,7 @@ func (d *DB) Metrics() *Metrics {
20812081
metrics.Compact.Duration = d.mu.compact.duration
20822082
for c := range d.mu.compact.inProgress {
20832083
if c.kind != compactionKindFlush && c.kind != compactionKindIngestedFlushable {
2084-
metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
2084+
metrics.Compact.Duration += d.timeNow().Sub(c.metrics.beganAt)
20852085
}
20862086
}
20872087
metrics.Compact.NumProblemSpans = d.problemSpans.Len()

metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,13 @@ func (m *Metrics) StringForTests() string {
910910
// (e.g. from compactions or flushes).
911911
type levelMetricsDelta [manifest.NumLevels]*LevelMetrics
912912

913+
func (m *levelMetricsDelta) level(level int) *LevelMetrics {
914+
if m[level] == nil {
915+
m[level] = &LevelMetrics{}
916+
}
917+
return m[level]
918+
}
919+
913920
func (m *Metrics) updateLevelMetrics(updates levelMetricsDelta) {
914921
for i, u := range updates {
915922
if u != nil {

0 commit comments

Comments
 (0)