Skip to content

Commit 16f3b57

Browse files
committed
db: change MaxConcurrentCompactions() to return a range
`MaxConcurrentCompactions()` returns an upper limit on the compaction concurrency. Within this upper limit, Pebble decides what the concurrency limit is (depending on L0 amplification and compaction debt). There are cases where we want more concurrency for other reasons (like space amp). Having a knob can be useful to tweak things in a production environment. This change makes `MaxConcurrentCompactions()` return a "baseline limit" and an "upper limit". The baseline limit is the "starting" value for the max concurrency, which Pebble can increase dynamically up to the upper limit. Prior to this change, the (implicit) baseline limit was always 1.
1 parent b751e87 commit 16f3b57

20 files changed

+154
-66
lines changed

cmd/pebble/db.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func newPebbleDB(dir string) DB {
7272
Merger: &pebble.Merger{
7373
Name: "cockroach_merge_operator",
7474
},
75-
MaxConcurrentCompactions: func() int {
76-
return 3
75+
MaxConcurrentCompactions: func() (int, int) {
76+
return 1, 3
7777
},
7878
}
7979
// In FormatColumnarBlocks (the value of FormatNewest at the time of

cmd/pebble/replay_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,15 @@ func TestParseOptionsStr(t *testing.T) {
2121
testCases := []testCase{
2222
{
2323
c: replayConfig{optionsString: `[Options] max_concurrent_compactions=9`},
24-
options: &pebble.Options{MaxConcurrentCompactions: func() int { return 9 }},
24+
options: &pebble.Options{MaxConcurrentCompactions: func() (int, int) { return 1, 9 }},
25+
},
26+
{
27+
c: replayConfig{optionsString: `[Options] baseline_concurrent_compactions=4`},
28+
options: &pebble.Options{MaxConcurrentCompactions: func() (int, int) { return 4, 4 }},
29+
},
30+
{
31+
c: replayConfig{optionsString: `[Options] baseline_concurrent_compactions=4 max_concurrent_compactions=9`},
32+
options: &pebble.Options{MaxConcurrentCompactions: func() (int, int) { return 4, 9 }},
2533
},
2634
{
2735
c: replayConfig{optionsString: `[Options] bytes_per_sync=90000`},

compaction.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64
5858
//
5959
// NB: this heuristic is an approximation since we may run more compactions
6060
// than MaxConcurrentCompactions.
61-
diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
61+
_, upperConcurrencyLimit := opts.MaxConcurrentCompactions()
62+
diskMax := (availBytes / 2) / uint64(upperConcurrencyLimit)
6263
if v > diskMax {
6364
v = diskMax
6465
}
@@ -1918,7 +1919,7 @@ func (d *DB) GetAllowedWithoutPermission() int {
19181919
allowedBasedOnManual := 0
19191920
manualBacklog := int(d.mu.compact.manualLen.Load())
19201921
if manualBacklog > 0 {
1921-
maxAllowed := d.opts.MaxConcurrentCompactions()
1922+
_, maxAllowed := d.opts.MaxConcurrentCompactions()
19221923
allowedBasedOnManual = min(maxAllowed, manualBacklog+allowedBasedOnBacklog)
19231924
}
19241925
return max(allowedBasedOnBacklog, allowedBasedOnManual)
@@ -1982,10 +1983,12 @@ func (d *DB) pickManualCompaction(env compactionEnv) (pc *pickedCompaction) {
19821983
// Returns true iff a compaction was started.
19831984
func (d *DB) tryScheduleDeleteOnlyCompaction() bool {
19841985
if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions ||
1985-
d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() ||
19861986
len(d.mu.compact.deletionHints) == 0 {
19871987
return false
19881988
}
1989+
if _, upperLimit := d.opts.MaxConcurrentCompactions(); d.mu.compact.compactingCount >= upperLimit {
1990+
return false
1991+
}
19891992
v := d.mu.versions.currentVersion()
19901993
snapshots := d.mu.snapshots.toSlice()
19911994
// We need to save the value of exciseEnabled in the compaction itself, as

compaction_picker.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,7 +1218,7 @@ func responsibleForGarbageBytes(
12181218
}
12191219

12201220
func (p *compactionPickerByScore) getCompactionConcurrency() int {
1221-
maxConcurrentCompactions := p.opts.MaxConcurrentCompactions()
1221+
baselineLimit, upperLimit := p.opts.MaxConcurrentCompactions()
12221222
// Compaction concurrency is controlled by L0 read-amp. We allow one
12231223
// additional compaction per L0CompactionConcurrency sublevels, as well as
12241224
// one additional compaction per CompactionDebtConcurrency bytes of
@@ -1235,24 +1235,26 @@ func (p *compactionPickerByScore) getCompactionConcurrency() int {
12351235
// Rearranging,
12361236
// n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency.
12371237
// So we can run up to
1238-
// l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency + 1 compactions
1239-
l0ReadAmpCompactions := 1
1238+
// l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency extra compactions.
1239+
l0ReadAmpCompactions := 0
12401240
if p.opts.Experimental.L0CompactionConcurrency > 0 {
12411241
l0ReadAmp := p.l0Organizer.MaxDepthAfterOngoingCompactions()
1242-
l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency) + 1
1242+
l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency)
12431243
}
12441244
// compactionDebt >= ccSignal2 then can run another compaction, where
12451245
// ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
12461246
// Rearranging,
12471247
// n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency
12481248
// So we can run up to
1249-
// compactionDebt / p.opts.Experimental.CompactionDebtConcurrency + 1 compactions.
1250-
compactionDebtCompactions := 1
1249+
// compactionDebt / p.opts.Experimental.CompactionDebtConcurrency extra
1250+
// compactions.
1251+
compactionDebtCompactions := 0
12511252
if p.opts.Experimental.CompactionDebtConcurrency > 0 {
12521253
compactionDebt := p.estimatedCompactionDebt(0)
1253-
compactionDebtCompactions = int(compactionDebt/p.opts.Experimental.CompactionDebtConcurrency) + 1
1254+
compactionDebtCompactions = int(compactionDebt / p.opts.Experimental.CompactionDebtConcurrency)
12541255
}
1255-
return max(min(maxConcurrentCompactions, max(l0ReadAmpCompactions, compactionDebtCompactions)), 1)
1256+
extraCompactions := max(l0ReadAmpCompactions, compactionDebtCompactions)
1257+
return min(baselineLimit+extraCompactions, upperLimit)
12561258
}
12571259

12581260
// TODO(sumeer): remove unless someone actually finds this useful.

compaction_picker_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,13 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
216216
// <level>: <size> [compensation]
217217
var errMsg string
218218
vers, l0Organizer, opts, errMsg = loadVersion(t, d)
219-
opts.MaxConcurrentCompactions = func() int {
219+
opts.MaxConcurrentCompactions = func() (int, int) {
220220
// This test only limits the count based on the L0 read amp and
221221
// compaction debt, so we would like to return math.MaxInt. But we
222222
// don't since it is also used in expandedCompactionByteSizeLimit,
223223
// and causes the expanded bytes to reduce. The test cases never
224224
// pick more than 4 compactions, so we use 4.
225-
return 4
225+
return 1, 4
226226
}
227227
if errMsg != "" {
228228
return errMsg
@@ -636,7 +636,8 @@ func TestCompactionPickerL0(t *testing.T) {
636636
func TestCompactionPickerConcurrency(t *testing.T) {
637637
opts := DefaultOptions()
638638
opts.Experimental.L0CompactionConcurrency = 1
639-
opts.MaxConcurrentCompactions = func() int { return 4 }
639+
baselineConcurrencyLimit, upperConcurrencyLimit := 1, 4
640+
opts.MaxConcurrentCompactions = func() (int, int) { return baselineConcurrencyLimit, upperConcurrencyLimit }
640641

641642
parseMeta := func(s string) (*tableMetadata, error) {
642643
parts := strings.Split(s, ":")
@@ -804,6 +805,8 @@ func TestCompactionPickerConcurrency(t *testing.T) {
804805
td.MaybeScanArgs(t, "l0_compaction_threshold", &opts.L0CompactionThreshold)
805806
td.MaybeScanArgs(t, "l0_compaction_concurrency", &opts.Experimental.L0CompactionConcurrency)
806807
td.MaybeScanArgs(t, "compaction_debt_concurrency", &opts.Experimental.CompactionDebtConcurrency)
808+
td.MaybeScanArgs(t, "upper_concurrency_limit", &upperConcurrencyLimit)
809+
td.MaybeScanArgs(t, "baseline_concurrency_limit", &baselineConcurrencyLimit)
807810

808811
env := compactionEnv{
809812
earliestUnflushedSeqNum: math.MaxUint64,
@@ -831,8 +834,10 @@ func TestCompactionPickerConcurrency(t *testing.T) {
831834
fmt.Fprintf(&result, "nil")
832835
}
833836
return result.String()
837+
838+
default:
839+
return fmt.Sprintf("unrecognized command: %s", td.Cmd)
834840
}
835-
return fmt.Sprintf("unrecognized command: %s", td.Cmd)
836841
})
837842
}
838843

compaction_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,10 +1157,12 @@ func TestCompaction(t *testing.T) {
11571157
return ""
11581158

11591159
case "set-concurrent-compactions":
1160-
var concurrentCompactions int
1161-
td.ScanArgs(t, "num", &concurrentCompactions)
1162-
d.opts.MaxConcurrentCompactions = func() int {
1163-
return concurrentCompactions
1160+
var upperLimit int
1161+
baselineLimit := 1
1162+
td.ScanArgs(t, "max", &upperLimit)
1163+
td.MaybeScanArgs(t, "baseline", &baselineLimit)
1164+
d.opts.MaxConcurrentCompactions = func() (int, int) {
1165+
return baselineLimit, upperLimit
11641166
}
11651167
return ""
11661168

db_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,8 +1187,8 @@ func TestDBConcurrentCompactClose(t *testing.T) {
11871187
for i := 0; i < 100; i++ {
11881188
opts := &Options{
11891189
FS: mem,
1190-
MaxConcurrentCompactions: func() int {
1191-
return 2
1190+
MaxConcurrentCompactions: func() (int, int) {
1191+
return 1, 2
11921192
},
11931193
}
11941194
d, err := Open("", testingRandomized(t, opts))
@@ -1522,8 +1522,8 @@ func TestMemtableIngestInversion(t *testing.T) {
15221522
MemTableStopWritesThreshold: 1000,
15231523
L0StopWritesThreshold: 1000,
15241524
L0CompactionThreshold: 2,
1525-
MaxConcurrentCompactions: func() int {
1526-
return 1000
1525+
MaxConcurrentCompactions: func() (int, int) {
1526+
return 1, 1000
15271527
},
15281528
}
15291529

error_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func TestDBCompactionCrash(t *testing.T) {
460460
FS: fs,
461461
Logger: testLogger{t: t},
462462
MemTableSize: 128 << 10,
463-
MaxConcurrentCompactions: func() int { return maxConcurrentCompactions },
463+
MaxConcurrentCompactions: func() (int, int) { return 1, maxConcurrentCompactions },
464464
LBaseMaxBytes: 64 << 10,
465465
L0CompactionThreshold: 2,
466466
L0CompactionFileThreshold: 2,

iterator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2133,7 +2133,7 @@ func TestRangeKeyMaskingRandomized(t *testing.T) {
21332133
FS: vfs.NewCrashableMem(),
21342134
Comparer: testkeys.Comparer,
21352135
FormatMajorVersion: FormatNewest,
2136-
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
2136+
MaxConcurrentCompactions: func() (int, int) { return 1, maxProcs/2 + 1 },
21372137
BlockPropertyCollectors: []func() BlockPropertyCollector{
21382138
sstable.NewTestKeysBlockPropertyCollector,
21392139
},
@@ -2146,7 +2146,7 @@ func TestRangeKeyMaskingRandomized(t *testing.T) {
21462146
FS: vfs.NewCrashableMem(),
21472147
Comparer: testkeys.Comparer,
21482148
FormatMajorVersion: FormatNewest,
2149-
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
2149+
MaxConcurrentCompactions: func() (int, int) { return 1, maxProcs/2 + 1 },
21502150
BlockPropertyCollectors: []func() BlockPropertyCollector{
21512151
sstable.NewTestKeysBlockPropertyCollector,
21522152
},
@@ -2308,7 +2308,7 @@ func BenchmarkIterator_RangeKeyMasking(b *testing.B) {
23082308
FS: mem,
23092309
Comparer: testkeys.Comparer,
23102310
FormatMajorVersion: FormatNewest,
2311-
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
2311+
MaxConcurrentCompactions: func() (int, int) { return 1, maxProcs/2 + 1 },
23122312
BlockPropertyCollectors: []func() BlockPropertyCollector{
23132313
sstable.NewTestKeysBlockPropertyCollector,
23142314
},

metamorphic/options.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,12 @@ func RandomOptions(
687687
}
688688
opts.LBaseMaxBytes = 1 << uint(rng.IntN(30)) // 1B - 1GB
689689
maxConcurrentCompactions := rng.IntN(3) + 1 // 1-3
690-
opts.MaxConcurrentCompactions = func() int {
691-
return maxConcurrentCompactions
690+
baselineConcurrentCompactions := 1
691+
if rng.IntN(4) == 0 {
692+
baselineConcurrentCompactions = rng.IntN(maxConcurrentCompactions) + 1
693+
}
694+
opts.MaxConcurrentCompactions = func() (int, int) {
695+
return baselineConcurrentCompactions, maxConcurrentCompactions
692696
}
693697
maxConcurrentDownloads := rng.IntN(3) + 1 // 1-3
694698
opts.MaxConcurrentDownloads = func() int {

0 commit comments

Comments
 (0)