Skip to content

Commit 38418f3

Browse files
committed
manifest: remove L0Sublevels from Version
The `L0Sublevels` field of `Version` is very peculiar: it is only usable for the most recent version. This commit removes this field and switches all usages to `L0Organizer`. `L0Sublevels` is now unexported and is only an implementation detail of `L0Organizer`. In time we will rewrite the functionality of `L0Sublevels` into `L0Organizer` directly.
1 parent 02cd5b3 commit 38418f3

25 files changed

+349
-317
lines changed

compaction.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func newCompaction(
393393
c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
394394
}
395395
c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
396-
c.cmp, c.version, c.outputLevel.level, base.UserKeyBoundsFromInternal(c.smallest, c.largest),
396+
c.cmp, c.version, pc.l0Organizer, c.outputLevel.level, base.UserKeyBoundsFromInternal(c.smallest, c.largest),
397397
)
398398
c.kind = pc.kind
399399

@@ -530,7 +530,12 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64)
530530
}
531531

532532
func newFlush(
533-
opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
533+
opts *Options,
534+
cur *version,
535+
l0Organizer *manifest.L0Organizer,
536+
baseLevel int,
537+
flushing flushableList,
538+
beganAt time.Time,
534539
) (*compaction, error) {
535540
c := &compaction{
536541
kind: compactionKindFlush,
@@ -571,7 +576,7 @@ func newFlush(
571576
}
572577
}
573578

574-
c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
579+
c.l0Limits = l0Organizer.FlushSplitKeys()
575580

576581
smallestSet, largestSet := false, false
577582
updatePointBounds := func(iter internalIterator) {
@@ -1039,7 +1044,7 @@ func (d *DB) addInProgressCompaction(c *compaction) {
10391044
if isIntraL0 {
10401045
l0Inputs = append(l0Inputs, c.outputLevel.files)
10411046
}
1042-
if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
1047+
if err := d.mu.versions.l0Organizer.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
10431048
d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
10441049
}
10451050
}
@@ -1096,7 +1101,7 @@ func (d *DB) clearCompactingState(c *compaction, rollback bool) {
10961101
// may be able to pick a better compaction (though when this compaction
10971102
// succeeded we've also cleared the cache in logAndApply).
10981103
defer d.mu.versions.logUnlockAndInvalidatePickedCompactionCache()
1099-
d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
1104+
d.mu.versions.l0Organizer.InitCompactingFileInfo(l0InProgress)
11001105
}()
11011106
}
11021107

@@ -1460,7 +1465,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
14601465
}
14611466
}
14621467

1463-
c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
1468+
c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.l0Organizer,
14641469
d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
14651470
if err != nil {
14661471
return 0, err
@@ -1923,7 +1928,7 @@ func (d *DB) tryScheduleDownloadCompactions(env compactionEnv, maxConcurrentDown
19231928
break
19241929
}
19251930
download := d.mu.compact.downloads[i]
1926-
switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) {
1931+
switch d.tryLaunchDownloadCompaction(download, vers, d.mu.versions.l0Organizer, env, maxConcurrentDownloads) {
19271932
case launchedCompaction:
19281933
started = true
19291934
continue
@@ -1942,7 +1947,7 @@ func (d *DB) pickManualCompaction(env compactionEnv) (pc *pickedCompaction) {
19421947
v := d.mu.versions.currentVersion()
19431948
for len(d.mu.compact.manual) > 0 {
19441949
manual := d.mu.compact.manual[0]
1945-
pc, retryLater := newPickedManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
1950+
pc, retryLater := newPickedManualCompaction(v, d.mu.versions.l0Organizer, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
19461951
if pc != nil {
19471952
return pc
19481953
}

compaction_picker.go

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ type pickedCompaction struct {
228228
smallest InternalKey
229229
largest InternalKey
230230
version *version
231+
l0Organizer *manifest.L0Organizer
231232
pickerMetrics compactionPickerMetrics
232233
}
233234

@@ -247,7 +248,10 @@ func defaultOutputLevel(startLevel, baseLevel int) int {
247248
}
248249

249250
func newPickedCompaction(
250-
opts *Options, cur *version, startLevel, outputLevel, baseLevel int,
251+
opts *Options,
252+
cur *version,
253+
l0Organizer *manifest.L0Organizer,
254+
startLevel, outputLevel, baseLevel int,
251255
) *pickedCompaction {
252256
if startLevel > 0 && startLevel < baseLevel {
253257
panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
@@ -258,6 +262,7 @@ func newPickedCompaction(
258262
pc := &pickedCompaction{
259263
cmp: opts.Comparer.Compare,
260264
version: cur,
265+
l0Organizer: l0Organizer,
261266
baseLevel: baseLevel,
262267
inputs: []compactionLevel{{level: startLevel}, {level: outputLevel}},
263268
maxOutputFileSize: uint64(opts.Level(adjustedLevel).TargetFileSize),
@@ -284,14 +289,19 @@ func adjustedOutputLevel(outputLevel int, baseLevel int) int {
284289
}
285290

286291
func newPickedCompactionFromL0(
287-
lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
292+
lcf *manifest.L0CompactionFiles,
293+
opts *Options,
294+
vers *version,
295+
l0Organizer *manifest.L0Organizer,
296+
baseLevel int,
297+
isBase bool,
288298
) *pickedCompaction {
289299
outputLevel := baseLevel
290300
if !isBase {
291301
outputLevel = 0 // Intra L0
292302
}
293303

294-
pc := newPickedCompaction(opts, vers, 0, outputLevel, baseLevel)
304+
pc := newPickedCompaction(opts, vers, l0Organizer, 0, outputLevel, baseLevel)
295305
pc.lcf = lcf
296306
pc.outputLevel.level = outputLevel
297307

@@ -349,8 +359,9 @@ func (pc *pickedCompaction) clone() *pickedCompaction {
349359
pickerMetrics: pc.pickerMetrics,
350360

351361
// Both copies see the same manifest, therefore, it's ok for them to se
352-
// share the same pc. version.
353-
version: pc.version,
362+
// share the same pc.version and pc.l0Organizer.
363+
version: pc.version,
364+
l0Organizer: pc.l0Organizer,
354365
}
355366

356367
newPC.inputs = make([]compactionLevel, len(pc.inputs))
@@ -475,7 +486,7 @@ func (pc *pickedCompaction) setupInputs(
475486
})
476487
}
477488
oldLcf := pc.lcf.Clone()
478-
if pc.version.L0Sublevels.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
489+
if pc.l0Organizer.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
479490
var newStartLevelFiles []*tableMetadata
480491
iter := pc.version.Levels[0].Iter()
481492
var sizeSum uint64
@@ -578,13 +589,15 @@ func anyTablesCompacting(inputs manifest.LevelSlice) bool {
578589
// installed).
579590
func newCompactionPickerByScore(
580591
v *version,
592+
l0Organizer *manifest.L0Organizer,
581593
virtualBackings *manifest.VirtualBackings,
582594
opts *Options,
583595
inProgressCompactions []compactionInfo,
584596
) *compactionPickerByScore {
585597
p := &compactionPickerByScore{
586598
opts: opts,
587599
vers: v,
600+
l0Organizer: l0Organizer,
588601
virtualBackings: virtualBackings,
589602
}
590603
p.initLevelMaxBytes(inProgressCompactions)
@@ -661,6 +674,7 @@ func totalCompensatedSize(iter iter.Seq[*manifest.TableMetadata]) uint64 {
661674
type compactionPickerByScore struct {
662675
opts *Options
663676
vers *version
677+
l0Organizer *manifest.L0Organizer
664678
virtualBackings *manifest.VirtualBackings
665679
// The level to target for L0 compactions. Levels L1 to baseLevel must be
666680
// empty.
@@ -890,7 +904,7 @@ func (p *compactionPickerByScore) calculateLevelScores(
890904
scores[i].level = i
891905
scores[i].outputLevel = i + 1
892906
}
893-
l0UncompensatedScore := calculateL0UncompensatedScore(p.vers, p.opts, inProgressCompactions)
907+
l0UncompensatedScore := calculateL0UncompensatedScore(p.vers, p.l0Organizer, p.opts, inProgressCompactions)
894908
scores[0] = candidateLevelInfo{
895909
outputLevel: p.baseLevel,
896910
uncompensatedScore: l0UncompensatedScore,
@@ -967,11 +981,14 @@ func (p *compactionPickerByScore) calculateLevelScores(
967981
// L0 may overlap one another, so a different set of heuristics that take into
968982
// account read amplification apply.
969983
func calculateL0UncompensatedScore(
970-
vers *version, opts *Options, inProgressCompactions []compactionInfo,
984+
vers *version,
985+
l0Organizer *manifest.L0Organizer,
986+
opts *Options,
987+
inProgressCompactions []compactionInfo,
971988
) float64 {
972989
// Use the sublevel count to calculate the score. The base vs intra-L0
973990
// compaction determination happens in pickAuto, not here.
974-
score := float64(2*vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
991+
score := float64(2*l0Organizer.MaxDepthAfterOngoingCompactions()) /
975992
float64(opts.L0CompactionThreshold)
976993

977994
// Also calculate a score based on the file count but use it only if it
@@ -1175,7 +1192,7 @@ func (p *compactionPickerByScore) getCompactionConcurrency() int {
11751192
// l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency + 1 compactions
11761193
l0ReadAmpCompactions := 1
11771194
if p.opts.Experimental.L0CompactionConcurrency > 0 {
1178-
l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
1195+
l0ReadAmp := p.l0Organizer.MaxDepthAfterOngoingCompactions()
11791196
l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency) + 1
11801197
}
11811198
// compactionDebt >= ccSignal2 then can run another compaction, where
@@ -1270,7 +1287,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
12701287
}
12711288

12721289
if info.level == 0 {
1273-
pc = pickL0(env, p.opts, p.vers, p.baseLevel)
1290+
pc = pickL0(env, p.opts, p.vers, p.l0Organizer, p.baseLevel)
12741291
// Fail-safe to protect against compacting the same sstable
12751292
// concurrently.
12761293
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
@@ -1292,7 +1309,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
12921309
continue
12931310
}
12941311

1295-
pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)
1312+
pc := pickAutoLPositive(env, p.opts, p.vers, p.l0Organizer, *info, p.baseLevel)
12961313
// Fail-safe to protect against compacting the same sstable concurrently.
12971314
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
12981315
p.addScoresToPickedCompactionMetrics(pc, scores)
@@ -1469,7 +1486,7 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
14691486
}
14701487
}
14711488

1472-
pc := newPickedCompaction(p.opts, p.vers, startLevel, outputLevel, p.baseLevel)
1489+
pc := newPickedCompaction(p.opts, p.vers, p.l0Organizer, startLevel, outputLevel, p.baseLevel)
14731490
pc.kind = kind
14741491
pc.startLevel.files = inputs
14751492
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.All())
@@ -1588,13 +1605,18 @@ func (p *compactionPickerByScore) pickTombstoneDensityCompaction(
15881605
// file in a positive-numbered level. This function must not be used for
15891606
// L0.
15901607
func pickAutoLPositive(
1591-
env compactionEnv, opts *Options, vers *version, cInfo candidateLevelInfo, baseLevel int,
1608+
env compactionEnv,
1609+
opts *Options,
1610+
vers *version,
1611+
l0Organizer *manifest.L0Organizer,
1612+
cInfo candidateLevelInfo,
1613+
baseLevel int,
15921614
) (pc *pickedCompaction) {
15931615
if cInfo.level == 0 {
15941616
panic("pebble: pickAutoLPositive called for L0")
15951617
}
15961618

1597-
pc = newPickedCompaction(opts, vers, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
1619+
pc = newPickedCompaction(opts, vers, l0Organizer, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
15981620
if pc.outputLevel.level != cInfo.outputLevel {
15991621
panic("pebble: compaction picked unexpected output level")
16001622
}
@@ -1741,17 +1763,19 @@ func (wa WriteAmpHeuristic) String() string {
17411763

17421764
// Helper method to pick compactions originating from L0. Uses information about
17431765
// sublevels to generate a compaction.
1744-
func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) *pickedCompaction {
1766+
func pickL0(
1767+
env compactionEnv, opts *Options, vers *version, l0Organizer *manifest.L0Organizer, baseLevel int,
1768+
) *pickedCompaction {
17451769
// It is important to pass information about Lbase files to L0Sublevels
17461770
// so it can pick a compaction that does not conflict with an Lbase => Lbase+1
17471771
// compaction. Without this, we observed reduced concurrency of L0=>Lbase
17481772
// compactions, and increasing read amplification in L0.
17491773
//
17501774
// TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
17511775
// has been shown to not cause a performance regression.
1752-
lcf := vers.L0Sublevels.PickBaseCompaction(opts.Logger, 1, vers.Levels[baseLevel].Slice())
1776+
lcf := l0Organizer.PickBaseCompaction(opts.Logger, 1, vers.Levels[baseLevel].Slice())
17531777
if lcf != nil {
1754-
pc := newPickedCompactionFromL0(lcf, opts, vers, baseLevel, true)
1778+
pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, baseLevel, true)
17551779
if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
17561780
if pc.startLevel.files.Empty() {
17571781
opts.Logger.Errorf("%v", base.AssertionFailedf("empty compaction chosen"))
@@ -1766,9 +1790,9 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) *pic
17661790
// compaction. Note that we pass in L0CompactionThreshold here as opposed to
17671791
// 1, since choosing a single sublevel intra-L0 compaction is
17681792
// counterproductive.
1769-
lcf = vers.L0Sublevels.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
1793+
lcf = l0Organizer.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
17701794
if lcf != nil {
1771-
pc := newPickedCompactionFromL0(lcf, opts, vers, 0, false)
1795+
pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, 0, false)
17721796
if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
17731797
if pc.startLevel.files.Empty() {
17741798
opts.Logger.Fatalf("empty compaction chosen")
@@ -1787,7 +1811,12 @@ func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) *pic
17871811
}
17881812

17891813
func newPickedManualCompaction(
1790-
vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
1814+
vers *version,
1815+
l0Organizer *manifest.L0Organizer,
1816+
opts *Options,
1817+
env compactionEnv,
1818+
baseLevel int,
1819+
manual *manualCompaction,
17911820
) (pc *pickedCompaction, retryLater bool) {
17921821
outputLevel := manual.level + 1
17931822
if manual.level == 0 {
@@ -1809,7 +1838,7 @@ func newPickedManualCompaction(
18091838
if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
18101839
return nil, true
18111840
}
1812-
pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
1841+
pc = newPickedCompaction(opts, vers, l0Organizer, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
18131842
pc.manualID = manual.id
18141843
manual.outputLevel = pc.outputLevel.level
18151844
pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
@@ -1844,6 +1873,7 @@ func newPickedManualCompaction(
18441873
// the backing file or a rewrite compaction.
18451874
func pickDownloadCompaction(
18461875
vers *version,
1876+
l0Organizer *manifest.L0Organizer,
18471877
opts *Options,
18481878
env compactionEnv,
18491879
baseLevel int,
@@ -1858,7 +1888,7 @@ func pickDownloadCompaction(
18581888
if kind != compactionKindCopy && kind != compactionKindRewrite {
18591889
panic("invalid download/rewrite compaction kind")
18601890
}
1861-
pc = newPickedCompaction(opts, vers, level, level, baseLevel)
1891+
pc = newPickedCompaction(opts, vers, l0Organizer, level, level, baseLevel)
18621892
pc.kind = kind
18631893
pc.startLevel.files = manifest.NewLevelSliceKeySorted(opts.Comparer.Compare, []*tableMetadata{file})
18641894
if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
@@ -1909,7 +1939,7 @@ func pickReadTriggeredCompactionHelper(
19091939
return nil
19101940
}
19111941

1912-
pc = newPickedCompaction(p.opts, p.vers, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
1942+
pc = newPickedCompaction(p.opts, p.vers, p.l0Organizer, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
19131943

19141944
pc.startLevel.files = overlapSlice
19151945
if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {

0 commit comments

Comments
 (0)