Skip to content

Commit 032f305

Browse files
committed
db: clean up level metric updates
Replace the ad-hoc `map[int]*LevelMetrics` that is passed to `logAndApply` with a `levelMetricsDelta` type which makes it more clear what it is and how it is used.
1 parent 0d101e8 commit 032f305

File tree

6 files changed

+44
-39
lines changed

6 files changed

+44
-39
lines changed

compaction.go

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ type compaction struct {
287287
// this compaction is allowed to excise files.
288288
exciseEnabled bool
289289

290-
metrics map[int]*LevelMetrics
290+
metrics levelMetricsDelta
291291

292292
pickerMetrics compactionPickerMetrics
293293

@@ -1262,8 +1262,6 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
12621262
panic("pebble: ingestedFlushable must be flushed one at a time.")
12631263
}
12641264

1265-
// Construct the VersionEdit, levelMetrics etc.
1266-
c.metrics = make(map[int]*LevelMetrics, numLevels)
12671265
// Finding the target level for ingestion must use the latest version
12681266
// after the logLock has been acquired.
12691267
c.version = d.mu.versions.currentVersion()
@@ -1526,16 +1524,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
15261524
// oldest unflushed memtable.
15271525
ve.MinUnflushedLogNum = minUnflushedLogNum
15281526
if c.kind != compactionKindIngestedFlushable {
1529-
metrics := c.metrics[0]
1527+
l0Metrics := c.metrics[0]
15301528
if d.opts.DisableWAL {
15311529
// If the WAL is disabled, every flushable has a zero [logSize],
15321530
// resulting in zero bytes in. Instead, use the number of bytes we
15331531
// flushed as the BytesIn. This ensures we get a reasonable w-amp
15341532
// calculation even when the WAL is disabled.
1535-
metrics.BytesIn = metrics.BytesFlushed
1533+
l0Metrics.BytesIn = l0Metrics.BytesFlushed
15361534
} else {
15371535
for i := 0; i < n; i++ {
1538-
metrics.BytesIn += d.mu.mem.queue[i].logSize
1536+
l0Metrics.BytesIn += d.mu.mem.queue[i].logSize
15391537
}
15401538
}
15411539
} else {
@@ -1572,7 +1570,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
15721570
}
15731571
}
15741572
}
1575-
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
1573+
err = d.mu.versions.logAndApply(jobID, ve, &c.metrics, false, /* forceRotation */
15761574
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
15771575
if err != nil {
15781576
info.Err = err
@@ -1605,8 +1603,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
16051603
if ingest {
16061604
d.mu.versions.metrics.Flush.AsIngestCount++
16071605
for _, l := range c.metrics {
1608-
d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
1609-
d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
1606+
if l != nil {
1607+
d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
1608+
d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
1609+
}
16101610
}
16111611
}
16121612
d.maybeTransitionSnapshotsToFileOnlyLocked()
@@ -2467,7 +2467,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
24672467
d.mu.versions.logUnlockAndInvalidatePickedCompactionCache()
24682468
return err
24692469
}
2470-
return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
2470+
return d.mu.versions.logAndApply(jobID, ve, &c.metrics, false /* forceRotation */, func() []compactionInfo {
24712471
return d.getInProgressCompactionInfoLocked(c)
24722472
})
24732473
}()
@@ -2663,11 +2663,10 @@ func (d *DB) runCopyCompaction(
26632663
if errors.Is(err, sstable.ErrEmptySpan) {
26642664
// The virtual table was empty. Just remove the backing file.
26652665
// Note that deleteOnExit is true so we will delete the created object.
2666-
c.metrics = map[int]*LevelMetrics{
2667-
c.outputLevel.level: {
2668-
BytesIn: inputMeta.Size,
2669-
},
2666+
c.metrics[c.outputLevel.level] = &LevelMetrics{
2667+
BytesIn: inputMeta.Size,
26702668
}
2669+
26712670
return ve, compact.Stats{}, nil
26722671
}
26732672
return nil, compact.Stats{}, err
@@ -2690,12 +2689,10 @@ func (d *DB) runCopyCompaction(
26902689
if newMeta.Virtual {
26912690
ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
26922691
}
2693-
c.metrics = map[int]*LevelMetrics{
2694-
c.outputLevel.level: {
2695-
BytesIn: inputMeta.Size,
2696-
BytesCompacted: newMeta.Size,
2697-
TablesCompacted: 1,
2698-
},
2692+
c.metrics[c.outputLevel.level] = &LevelMetrics{
2693+
BytesIn: inputMeta.Size,
2694+
BytesCompacted: newMeta.Size,
2695+
TablesCompacted: 1,
26992696
}
27002697

27012698
if err := d.objProvider.Sync(); err != nil {
@@ -2869,7 +2866,6 @@ func fragmentDeleteCompactionHints(
28692866
func (d *DB) runDeleteOnlyCompaction(
28702867
jobID JobID, c *compaction, snapshots compact.Snapshots,
28712868
) (ve *versionEdit, stats compact.Stats, retErr error) {
2872-
c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
28732869
fragments := fragmentDeleteCompactionHints(d.cmp, c.deletionHints)
28742870
ve = &versionEdit{
28752871
DeletedTables: map[deletedFileEntry]*tableMetadata{},
@@ -2919,11 +2915,9 @@ func (d *DB) runMoveCompaction(
29192915
if c.cancel.Load() {
29202916
return ve, stats, ErrCancelledCompaction
29212917
}
2922-
c.metrics = map[int]*LevelMetrics{
2923-
c.outputLevel.level: {
2924-
BytesMoved: meta.Size,
2925-
TablesMoved: 1,
2926-
},
2918+
c.metrics[c.outputLevel.level] = &LevelMetrics{
2919+
BytesMoved: meta.Size,
2920+
TablesMoved: 1,
29272921
}
29282922
ve = &versionEdit{
29292923
DeletedTables: map[deletedFileEntry]*tableMetadata{
@@ -3177,9 +3171,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
31773171
}
31783172
outputMetrics.BytesRead += outputMetrics.BytesIn
31793173

3180-
c.metrics = map[int]*LevelMetrics{
3181-
c.outputLevel.level: outputMetrics,
3182-
}
3174+
c.metrics[c.outputLevel.level] = outputMetrics
31833175
if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
31843176
c.metrics[c.startLevel.level] = &LevelMetrics{}
31853177
}

file_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func TestVirtualReadsWiring(t *testing.T) {
393393
d.checkVirtualBounds(v2)
394394

395395
// Write the version edit.
396-
fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
396+
fileMetrics := func(ve *versionEdit) *levelMetricsDelta {
397397
metrics := newFileMetrics(ve.NewTables)
398398
for de, f := range ve.DeletedTables {
399399
lm := metrics[de.Level]

format_major_version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ func (d *DB) markFilesLocked(findFn findFilesFunc) error {
606606
return d.mu.versions.logAndApply(
607607
jobID,
608608
&manifest.VersionEdit{},
609-
map[int]*LevelMetrics{},
609+
nil, /* metrics */
610610
true, /* forceRotation */
611611
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) })
612612
}

ingest.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1914,7 +1914,7 @@ func (d *DB) ingestApply(
19141914
if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
19151915
ve.DeletedTables = map[manifest.DeletedTableEntry]*manifest.TableMetadata{}
19161916
}
1917-
metrics := make(map[int]*LevelMetrics)
1917+
var metrics levelMetricsDelta
19181918

19191919
// Lock the manifest for writing before we use the current version to
19201920
// determine the target level. This prevents two concurrent ingestion jobs
@@ -2150,7 +2150,7 @@ func (d *DB) ingestApply(
21502150
}
21512151
}
21522152

2153-
if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
2153+
if err := d.mu.versions.logAndApply(jobID, ve, &metrics, false /* forceRotation */, func() []compactionInfo {
21542154
return d.getInProgressCompactionInfoLocked(nil)
21552155
}); err != nil {
21562156
// Note: any error during logAndApply is fatal; this won't be reachable in production.

metrics.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/pebble/internal/base"
1313
"github.com/cockroachdb/pebble/internal/cache"
1414
"github.com/cockroachdb/pebble/internal/humanize"
15+
"github.com/cockroachdb/pebble/internal/manifest"
1516
"github.com/cockroachdb/pebble/internal/manual"
1617
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
1718
"github.com/cockroachdb/pebble/record"
@@ -718,3 +719,15 @@ func (m *Metrics) StringForTests() string {
718719
mCopy.manualMemory = manual.Metrics{}
719720
return redact.StringWithoutMarkers(&mCopy)
720721
}
722+
723+
// levelMetricsDelta accumulates incremental ("delta") level metric updates
724+
// (e.g. from compactions or flushes).
725+
type levelMetricsDelta [manifest.NumLevels]*LevelMetrics
726+
727+
func (m *Metrics) updateLevelMetrics(updates levelMetricsDelta) {
728+
for i, u := range updates {
729+
if u != nil {
730+
m.Levels[i].Add(u)
731+
}
732+
}
733+
}

version_set.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
444444
func (vs *versionSet) logAndApply(
445445
jobID JobID,
446446
ve *versionEdit,
447-
metrics map[int]*LevelMetrics,
447+
metrics *levelMetricsDelta,
448448
forceRotation bool,
449449
inProgressCompactions func() []compactionInfo,
450450
) error {
@@ -687,8 +687,8 @@ func (vs *versionSet) logAndApply(
687687
vs.manifestFileNum = newManifestFileNum
688688
}
689689

690-
for level, update := range metrics {
691-
vs.metrics.Levels[level].Add(update)
690+
if metrics != nil {
691+
vs.metrics.updateLevelMetrics(*metrics)
692692
}
693693
for i := range vs.metrics.Levels {
694694
l := &vs.metrics.Levels[i]
@@ -1131,8 +1131,8 @@ func findCurrentManifest(
11311131
return marker, manifestNum, true, nil
11321132
}
11331133

1134-
func newFileMetrics(newFiles []manifest.NewTableEntry) map[int]*LevelMetrics {
1135-
m := map[int]*LevelMetrics{}
1134+
func newFileMetrics(newFiles []manifest.NewTableEntry) *levelMetricsDelta {
1135+
var m levelMetricsDelta
11361136
for _, nf := range newFiles {
11371137
lm := m[nf.Level]
11381138
if lm == nil {
@@ -1142,5 +1142,5 @@ func newFileMetrics(newFiles []manifest.NewTableEntry) map[int]*LevelMetrics {
11421142
lm.NumFiles++
11431143
lm.Size += int64(nf.Meta.Size)
11441144
}
1145-
return m
1145+
return &m
11461146
}

0 commit comments

Comments
 (0)