Skip to content

Commit 7c06a36

Browse files
committed
db: add option for CompactionGarbageFractionForMaxConcurrency
This increases the compaction concurrency up to the maximum concurrency based on the fraction of garbage. The default is 0.4, i.e., if 40% of the DB is garbage, it will allow up to MaxConcurrentCompactions. The assumption is that compensated level sizes used in scoring have captured this garbage state, so the levels responsible for garbage have scores > 1.0, so will eventually get picked for compactions. Informs #4602
1 parent a909ca0 commit 7c06a36

File tree

13 files changed

+242
-86
lines changed

13 files changed

+242
-86
lines changed

cmd/pebble/db.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func newPebbleDB(dir string) DB {
8080
// writing), columnar blocks are only written if explicitly opted into.
8181
opts.Experimental.EnableColumnarBlocks = func() bool { return true }
8282

83+
// Running the tool should not start compactions due to garbage.
84+
opts.Experimental.CompactionGarbageFractionForMaxConcurrency = func() float64 {
85+
return -1.0
86+
}
8387
for i := 0; i < len(opts.Levels); i++ {
8488
l := &opts.Levels[i]
8589
l.BlockSize = 32 << 10 // 32 KB

compaction_picker.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,7 @@ type compactionPickerByScore struct {
706706
// levelMaxBytes holds the dynamically adjusted max bytes setting for each
707707
// level.
708708
levelMaxBytes [numLevels]int64
709+
dbSizeBytes uint64
709710
}
710711

711712
var _ compactionPicker = &compactionPickerByScore{}
@@ -817,7 +818,10 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp
817818
p.levelMaxBytes[level] = math.MaxInt64
818819
}
819820

820-
if dbSize == 0 {
821+
dbSizeBelowL0 := dbSize
822+
dbSize += p.vers.Levels[0].Size()
823+
p.dbSizeBytes = dbSize
824+
if dbSizeBelowL0 == 0 {
821825
// No levels for L1 and up contain any data. Target L0 compactions for the
822826
// last level or to the level to which there is an ongoing L0 compaction.
823827
p.baseLevel = numLevels - 1
@@ -827,7 +831,6 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp
827831
return
828832
}
829833

830-
dbSize += p.vers.Levels[0].Size()
831834
bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
832835

833836
curLevelSize := bottomLevelSize
@@ -1252,7 +1255,19 @@ func (p *compactionPickerByScore) getCompactionConcurrency() int {
12521255
compactionDebt := p.estimatedCompactionDebt(0)
12531256
compactionDebtCompactions = int(compactionDebt/p.opts.Experimental.CompactionDebtConcurrency) + 1
12541257
}
1255-
return max(min(maxConcurrentCompactions, max(l0ReadAmpCompactions, compactionDebtCompactions)), 1)
1258+
compactableGarbageCompactions := 0
1259+
garbageFractionLimit := p.opts.Experimental.CompactionGarbageFractionForMaxConcurrency()
1260+
if garbageFractionLimit > 0 && p.dbSizeBytes > 0 {
1261+
compactableGarbageBytes :=
1262+
*pointDeletionsBytesEstimateAnnotator.MultiLevelAnnotation(p.vers.Levels[:]) +
1263+
*rangeDeletionsBytesEstimateAnnotator.MultiLevelAnnotation(p.vers.Levels[:])
1264+
garbageFraction := float64(compactableGarbageBytes) / float64(p.dbSizeBytes)
1265+
compactableGarbageCompactions =
1266+
int((garbageFraction / garbageFractionLimit) * float64(maxConcurrentCompactions))
1267+
}
1268+
concurrencyBasedOnSignals :=
1269+
max(l0ReadAmpCompactions, compactionDebtCompactions, compactableGarbageCompactions)
1270+
return min(maxConcurrentCompactions, max(concurrencyBasedOnSignals, 1))
12561271
}
12571272

12581273
// TODO(sumeer): remove unless someone actually finds this useful.

compaction_picker_test.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,7 @@ func loadVersion(
3434
opts := &Options{}
3535
opts.testingRandomized(t)
3636
opts.EnsureDefaults()
37-
38-
if len(d.CmdArgs) != 1 {
39-
return nil, nil, nil, fmt.Sprintf("%s expects 1 argument", d.Cmd)
40-
}
41-
var err error
42-
opts.LBaseMaxBytes, err = strconv.ParseInt(d.CmdArgs[0].Key, 10, 64)
43-
if err != nil {
44-
return nil, nil, nil, err.Error()
45-
}
46-
37+
d.ScanArgs(t, "l-base-max-bytes", &opts.LBaseMaxBytes)
4738
var files [numLevels][]*tableMetadata
4839
if len(d.Input) > 0 {
4940
// Parse each line as
@@ -190,15 +181,16 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
190181
f.CompactionState = manifest.CompactionStateNotCompacting
191182
}
192183
}
184+
l0Organizer.ResetForTesting(vers)
193185
}
194186

195-
pickAuto := func(env compactionEnv, pickerByScore *compactionPickerByScore) *pickedCompaction {
187+
pickAuto := func(env compactionEnv, pickerByScore *compactionPickerByScore) (pc *pickedCompaction, allowedCompactions int) {
196188
inProgressCompactions := len(env.inProgressCompactions)
197-
allowedCompactions := pickerByScore.getCompactionConcurrency()
189+
allowedCompactions = pickerByScore.getCompactionConcurrency()
198190
if inProgressCompactions >= allowedCompactions {
199-
return nil
191+
return nil, allowedCompactions
200192
}
201-
return pickerByScore.pickAutoScore(env)
193+
return pickerByScore.pickAutoScore(env), allowedCompactions
202194
}
203195

204196
datadriven.RunTest(t, "testdata/compaction_picker_target_level",
@@ -211,18 +203,26 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
211203
// and optionally additional compensation to be added during
212204
// compensated file size calculations. Eg:
213205
//
214-
// init <LBaseMaxBytes>
206+
// init l-base-max-bytes=<LBaseMaxBytes>
215207
// <level>: <size> [compensation]
216208
// <level>: <size> [compensation]
217209
var errMsg string
218210
vers, l0Organizer, opts, errMsg = loadVersion(t, d)
211+
// This test limits the count based on the L0 read amp, compaction
212+
// debt, and deleted garbage, so we would like to return math.MaxInt
213+
// for most test cases. But we don't since it is also used in
214+
// expandedCompactionByteSizeLimit, and causes the expanded bytes to
215+
// reduce. The test cases never pick more than 4 compactions, so we
216+
// use 4 as the default.
217+
maxConcurrentCompactions := 4
218+
if d.HasArg("max-concurrent-compactions") {
219+
d.ScanArgs(t, "max-concurrent-compactions", &maxConcurrentCompactions)
220+
}
219221
opts.MaxConcurrentCompactions = func() int {
220-
// This test only limits the count based on the L0 read amp and
221-
// compaction debt, so we would like to return math.MaxInt. But we
222-
// don't since it is also used in expandedCompactionByteSizeLimit,
223-
// and causes the expanded bytes to reduce. The test cases never
224-
// pick more than 4 compactions, so we use 4.
225-
return 4
222+
return maxConcurrentCompactions
223+
}
224+
if d.HasArg("compaction-debt-concurrency") {
225+
d.ScanArgs(t, "compaction-debt-concurrency", &opts.Experimental.CompactionDebtConcurrency)
226226
}
227227
if errMsg != "" {
228228
return errMsg
@@ -246,13 +246,20 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
246246
case "queue":
247247
var b strings.Builder
248248
var inProgress []compactionInfo
249+
printConcurrency := false
250+
if d.HasArg("print-compaction-concurrency") {
251+
printConcurrency = true
252+
}
249253
for {
250254
env := compactionEnv{
251255
diskAvailBytes: math.MaxUint64,
252256
earliestUnflushedSeqNum: base.SeqNumMax,
253257
inProgressCompactions: inProgress,
254258
}
255-
pc := pickAuto(env, pickerByScore)
259+
pc, concurrency := pickAuto(env, pickerByScore)
260+
if printConcurrency {
261+
fmt.Fprintf(&b, "compaction concurrency: %d\n", concurrency)
262+
}
256263
if pc == nil {
257264
break
258265
}
@@ -268,6 +275,10 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
268275
// because the test isn't marking files as Compacting.
269276
break
270277
}
278+
if pc.startLevel != nil && pc.startLevel.level == 0 {
279+
require.NoError(t, l0Organizer.UpdateStateForStartedCompaction(
280+
[]manifest.LevelSlice{pc.startLevel.files}, true /* isBase */))
281+
}
271282
for _, cl := range pc.inputs {
272283
for f := range cl.files.All() {
273284
f.CompactionState = manifest.CompactionStateCompacting
@@ -336,7 +347,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
336347

337348
var b strings.Builder
338349
fmt.Fprintf(&b, "Initial state before pick:\n%s", runVersionFileSizes(vers))
339-
pc := pickAuto(compactionEnv{
350+
pc, _ := pickAuto(compactionEnv{
340351
earliestUnflushedSeqNum: base.SeqNumMax,
341352
inProgressCompactions: inProgress,
342353
}, pickerByScore)

metamorphic/options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,11 @@ func RandomOptions(
690690
opts.MaxConcurrentCompactions = func() int {
691691
return maxConcurrentCompactions
692692
}
693+
// [-0.2, 0.4], in steps of 0.2.
694+
garbageFrac := float64(rng.IntN(5))/5.0 - 0.2
695+
opts.Experimental.CompactionGarbageFractionForMaxConcurrency = func() float64 {
696+
return garbageFrac
697+
}
693698
maxConcurrentDownloads := rng.IntN(3) + 1 // 1-3
694699
opts.MaxConcurrentDownloads = func() int {
695700
return maxConcurrentDownloads

metamorphic/options_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestOptionsRoundtrip(t *testing.T) {
7373
"EventListener:",
7474
"MaxConcurrentCompactions:",
7575
"MaxConcurrentDownloads:",
76+
"Experimental.CompactionGarbageFractionForMaxConcurrency:",
7677
"Experimental.DisableIngestAsFlushable:",
7778
"Experimental.EnableColumnarBlocks:",
7879
"Experimental.EnableValueBlocks:",
@@ -117,6 +118,12 @@ func TestOptionsRoundtrip(t *testing.T) {
117118
if o.Opts.Experimental.IngestSplit != nil && o.Opts.Experimental.IngestSplit() {
118119
require.Equal(t, o.Opts.Experimental.IngestSplit(), parsed.Opts.Experimental.IngestSplit())
119120
}
121+
require.Equal(t, o.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency == nil,
122+
parsed.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency == nil)
123+
if o.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency != nil {
124+
require.InDelta(t, o.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency(),
125+
parsed.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency(), 1e-5)
126+
}
120127
require.Equal(t, o.Opts.MaxConcurrentCompactions(), parsed.Opts.MaxConcurrentCompactions())
121128
require.Equal(t, o.Opts.MaxConcurrentDownloads(), parsed.Opts.MaxConcurrentDownloads())
122129
require.Equal(t, len(o.Opts.BlockPropertyCollectors), len(parsed.Opts.BlockPropertyCollectors))

options.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,13 @@ type Options struct {
571571
// concurrency slots as determined by the two options is chosen.
572572
CompactionDebtConcurrency uint64
573573

574+
// CompactionGarbageFractionForMaxConcurrency is the fraction of garbage
575+
// due to DELs and RANGEDELs that causes MaxConcurrentCompactions to be
576+
// allowed. Concurrent compactions are allowed in a linear manner upto
577+
// this limit being reached. A value <= 0.0 disables adding concurrency
578+
// due to garbage.
579+
CompactionGarbageFractionForMaxConcurrency func() float64
580+
574581
// IngestSplit, if it returns true, allows for ingest-time splitting of
575582
// existing sstables into two virtual sstables to allow ingestion sstables to
576583
// slot into a lower level than they otherwise would have.
@@ -1208,6 +1215,11 @@ func (o *Options) EnsureDefaults() {
12081215
if o.Experimental.CompactionDebtConcurrency <= 0 {
12091216
o.Experimental.CompactionDebtConcurrency = 1 << 30 // 1 GB
12101217
}
1218+
if o.Experimental.CompactionGarbageFractionForMaxConcurrency == nil {
1219+
// When 40% of the DB is garbage, the compaction concurrency is at the
1220+
// maximum permitted.
1221+
o.Experimental.CompactionGarbageFractionForMaxConcurrency = func() float64 { return 0.4 }
1222+
}
12111223
if o.KeySchema == "" && len(o.KeySchemas) == 0 {
12121224
ks := colblk.DefaultKeySchema(o.Comparer, 16 /* bundleSize */)
12131225
o.KeySchema = ks.Name
@@ -1434,6 +1446,8 @@ func (o *Options) String() string {
14341446
fmt.Fprintf(&buf, " cache_size=%d\n", cacheSize)
14351447
fmt.Fprintf(&buf, " cleaner=%s\n", o.Cleaner)
14361448
fmt.Fprintf(&buf, " compaction_debt_concurrency=%d\n", o.Experimental.CompactionDebtConcurrency)
1449+
fmt.Fprintf(&buf, " compaction_garbage_fraction_for_max_concurrency=%.2f\n",
1450+
o.Experimental.CompactionGarbageFractionForMaxConcurrency())
14371451
fmt.Fprintf(&buf, " comparer=%s\n", o.Comparer.Name)
14381452
fmt.Fprintf(&buf, " disable_wal=%t\n", o.DisableWAL)
14391453
if o.Experimental.DisableIngestAsFlushable != nil && o.Experimental.DisableIngestAsFlushable() {
@@ -1698,6 +1712,13 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
16981712
}
16991713
case "compaction_debt_concurrency":
17001714
o.Experimental.CompactionDebtConcurrency, err = strconv.ParseUint(value, 10, 64)
1715+
case "compaction_garbage_fraction_for_max_concurrency":
1716+
var frac float64
1717+
frac, err = strconv.ParseFloat(value, 64)
1718+
if err == nil {
1719+
o.Experimental.CompactionGarbageFractionForMaxConcurrency =
1720+
func() float64 { return frac }
1721+
}
17011722
case "delete_range_flush_delay":
17021723
// NB: This is a deprecated serialization of the
17031724
// `flush_delay_delete_range`.

options_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func TestDefaultOptionsString(t *testing.T) {
8484
cache_size=8388608
8585
cleaner=delete
8686
compaction_debt_concurrency=1073741824
87+
compaction_garbage_fraction_for_max_concurrency=0.40
8788
comparer=leveldb.BytewiseComparator
8889
disable_wal=false
8990
enable_columnar_blocks=true

replay/testdata/replay

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ tree
1111
614 000007.sst
1212
0 LOCK
1313
133 MANIFEST-000001
14-
1524 OPTIONS-000003
14+
1579 OPTIONS-000003
1515
0 marker.format-version.000001.013
1616
0 marker.manifest.000001.MANIFEST-000001
1717
simple/
@@ -21,7 +21,7 @@ tree
2121
25 000004.log
2222
586 000005.sst
2323
85 MANIFEST-000001
24-
1524 OPTIONS-000003
24+
1579 OPTIONS-000003
2525
0 marker.format-version.000001.013
2626
0 marker.manifest.000001.MANIFEST-000001
2727

@@ -36,6 +36,7 @@ cat build/OPTIONS-000003
3636
cache_size=8388608
3737
cleaner=replay.WorkloadCollector("delete")
3838
compaction_debt_concurrency=1073741824
39+
compaction_garbage_fraction_for_max_concurrency=0.40
3940
comparer=pebble.internal.testkeys
4041
disable_wal=false
4142
enable_columnar_blocks=true

replay/testdata/replay_paced

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ tree
1414
0 LOCK
1515
133 MANIFEST-000001
1616
205 MANIFEST-000010
17-
1524 OPTIONS-000003
17+
1579 OPTIONS-000003
1818
0 marker.format-version.000001.013
1919
0 marker.manifest.000002.MANIFEST-000010
2020
high_read_amp/
@@ -26,7 +26,7 @@ tree
2626
39 000008.log
2727
560 000009.sst
2828
157 MANIFEST-000010
29-
1524 OPTIONS-000003
29+
1579 OPTIONS-000003
3030
0 marker.format-version.000001.013
3131
0 marker.manifest.000001.MANIFEST-000010
3232

0 commit comments

Comments
 (0)