Skip to content

Commit 342d2e8

Browse files
committed
db: add high-priority blob file rewrite compactions
This commit introduces a new trigger for blob-file rewrite compactions. Previously blob-file rewrite compactions were triggered when a) no default score-based compactions were available to run b) no tombstone density or elision-only compaction were available to run c) total unreferenced blob values exceeded the 'garbage ratio' threshold d) a blob file meets the criteria for rewrite (including the minimum age) We've observed that score-based compactions can starve blob-file rewrite compactions, leaving blob-file—induced space amplification unbounded. This commit renames the old 'garbage ratio' threshold to GarbageRatioLowPriority and introduces a new GarbageRatioHighPriority setting. When the ratio of unreferenced values reaches the high priority setting, Pebble will schedule up to 1 blob file rewrite compaction at a time as long as eligible blob files exist. To avoid reducing the concurrency available for default compactions, compaction concurrency is temporarily inflated for the duration of the blob file rewrite compaction. Informs #112.
1 parent fff7d3a commit 342d2e8

19 files changed

+386
-93
lines changed

blob_rewrite.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package pebble
77
import (
88
"container/heap"
99
"context"
10+
"fmt"
1011
"iter"
1112
"runtime/pprof"
1213
"slices"
@@ -28,6 +29,12 @@ import (
2829
// A pickedBlobFileCompaction is a blob file rewrite compaction that has been
2930
// picked by the compaction picker.
3031
type pickedBlobFileCompaction struct {
32+
// highPriority is set to true if the compaction was picked because the
33+
// ValueSeparationPolcy.GarbageThresholdHighPriority heuristic was
34+
// triggered. In this case, the resulting compaction will be permitted to
35+
// use a burst compaction concurrency slot to avoid starving default
36+
// compactions.
37+
highPriority bool
3138
vers *manifest.Version
3239
file manifest.BlobFileMetadata
3340
referencingTables []*manifest.TableMetadata
@@ -64,6 +71,7 @@ func (c *pickedBlobFileCompaction) ConstructCompaction(
6471
PreferSharedStorage: false,
6572
WriteCategory: getDiskWriteCategoryForCompaction(d.opts, compactionKindBlobFileRewrite),
6673
},
74+
highPriority: c.highPriority,
6775
}
6876
}
6977

@@ -98,13 +106,31 @@ type blobFileRewriteCompaction struct {
98106
objCreateOpts objstorage.CreateOptions
99107
internalIteratorStats base.InternalIteratorStats
100108
bytesWritten atomic.Int64 // Total bytes written to the new blob file.
109+
// highPriority is set to true if the compaction was picked because the
110+
// ValueSeparationPolcy.GarbageThresholdHighPriority heuristic was
111+
// triggered. In this case, the resulting compaction will be permitted to
112+
// use a burst compaction concurrency slot to avoid starving default
113+
// compactions. This field is set when the compaction is created.
114+
highPriority bool
115+
}
116+
117+
func (c *blobFileRewriteCompaction) String() string {
118+
s := fmt.Sprintf("blob file (ID: %s) %s (%s) being rewritten",
119+
c.input.FileID, c.input.Physical.FileNum, humanizeBytes(uint64(c.input.Physical.Size)))
120+
if c.highPriority {
121+
s += " (high priority)"
122+
}
123+
return s
101124
}
102125

103126
// Assert that *blobFileRewriteCompaction implements the Compaction interface.
104127
var _ compaction = (*blobFileRewriteCompaction)(nil)
105128

106129
func (c *blobFileRewriteCompaction) AddInProgressLocked(d *DB) {
107130
d.mu.compact.inProgress[c] = struct{}{}
131+
if c.highPriority {
132+
d.mu.compact.burstConcurrency.Add(1)
133+
}
108134
// TODO(jackson): Currently the compaction picker iterates through all
109135
// ongoing compactions in order to limit the number of concurrent blob
110136
// rewrite compactions to 1.
@@ -249,6 +275,10 @@ func (c *blobFileRewriteCompaction) Info() compactionInfo {
249275
}
250276
}
251277

278+
func (c *blobFileRewriteCompaction) UsesBurstConcurrency() bool {
279+
return c.highPriority
280+
}
281+
252282
func (c *blobFileRewriteCompaction) RecordError(*problemspans.ByLevel, error) {
253283
// TODO(jackson): Track problematic blob files and avoid re-picking the same
254284
// blob file compaction.

cmd/pebble/db.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,12 @@ func newPebbleDB(dir string) DB {
8181
// value separation.
8282
opts.Experimental.ValueSeparationPolicy = func() pebble.ValueSeparationPolicy {
8383
return pebble.ValueSeparationPolicy{
84-
Enabled: true,
85-
MinimumSize: 512,
86-
MaxBlobReferenceDepth: 10,
87-
RewriteMinimumAge: 5 * time.Minute,
88-
TargetGarbageRatio: 0.1,
84+
Enabled: true,
85+
MinimumSize: 512,
86+
MaxBlobReferenceDepth: 10,
87+
RewriteMinimumAge: 5 * time.Minute,
88+
GarbageRatioLowPriority: 0.10, // 10% garbage
89+
GarbageRatioHighPriority: 0.30, // 30% garbage
8990
}
9091
}
9192

compaction.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ type compaction interface {
207207
PprofLabels(UserKeyCategories) pprof.LabelSet
208208
RecordError(*problemspans.ByLevel, error)
209209
Tables() iter.Seq2[int, *manifest.TableMetadata]
210+
UsesBurstConcurrency() bool
210211
VersionEditApplied() bool
211212
}
212213

@@ -343,6 +344,9 @@ var _ compaction = (*tableCompaction)(nil)
343344

344345
func (c *tableCompaction) AddInProgressLocked(d *DB) {
345346
d.mu.compact.inProgress[c] = struct{}{}
347+
if c.UsesBurstConcurrency() {
348+
d.mu.compact.burstConcurrency.Add(1)
349+
}
346350
var isBase, isIntraL0 bool
347351
for _, cl := range c.inputs {
348352
for f := range cl.files.All() {
@@ -457,6 +461,8 @@ func (c *tableCompaction) Tables() iter.Seq2[int, *manifest.TableMetadata] {
457461
}
458462
}
459463

464+
func (c *tableCompaction) UsesBurstConcurrency() bool { return false }
465+
460466
func (c *tableCompaction) VersionEditApplied() bool { return c.versionEditApplied }
461467

462468
// compactionMetrics contians metrics surrounding a compaction.
@@ -1267,10 +1273,9 @@ func (c *tableCompaction) String() string {
12671273
}
12681274

12691275
var buf bytes.Buffer
1270-
for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
1271-
i := level - c.startLevel.level
1272-
fmt.Fprintf(&buf, "%d:", level)
1273-
for f := range c.inputs[i].files.All() {
1276+
for _, l := range c.inputs {
1277+
fmt.Fprintf(&buf, "%d:", l.level)
1278+
for f := range l.files.All() {
12741279
fmt.Fprintf(&buf, " %s:%s-%s", f.TableNum, f.Smallest(), f.Largest())
12751280
}
12761281
fmt.Fprintf(&buf, "\n")
@@ -1832,6 +1837,11 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
18321837
}
18331838

18341839
d.clearCompactingState(c, err != nil)
1840+
if c.UsesBurstConcurrency() {
1841+
if v := d.mu.compact.burstConcurrency.Add(-1); v < 0 {
1842+
panic(errors.AssertionFailedf("burst concurrency underflow: %d", v))
1843+
}
1844+
}
18351845
delete(d.mu.compact.inProgress, c)
18361846
d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.bytesWritten.Load(), err)
18371847

@@ -2052,8 +2062,16 @@ func (d *DB) makeCompactionEnvLocked() *compactionEnv {
20522062

20532063
// pickAnyCompaction tries to pick a manual or automatic compaction.
20542064
func (d *DB) pickAnyCompaction(env compactionEnv) (pc pickedCompaction) {
2055-
// Pick a score-based compaction first, since a misshapen LSM is bad.
20562065
if !d.opts.DisableAutomaticCompactions {
2066+
// Pick a score-based compaction first, since a misshapen LSM is bad.
2067+
// We allow an exception for a high-priority disk-space reclamation
2068+
// compaction. Future work will explore balancing the various competing
2069+
// compaction priorities more judiciously. For now, we're relying on the
2070+
// configured heuristic to be set carefully so that we don't starve
2071+
// score-based compactions.
2072+
if pc := d.mu.versions.picker.pickHighPrioritySpaceCompaction(env); pc != nil {
2073+
return pc
2074+
}
20572075
if pc = d.mu.versions.picker.pickAutoScore(env); pc != nil {
20582076
return pc
20592077
}
@@ -2160,13 +2178,15 @@ func (d *DB) GetWaitingCompaction() (bool, WaitingCompaction) {
21602178
// CompactionScheduler).
21612179
func (d *DB) GetAllowedWithoutPermission() int {
21622180
allowedBasedOnBacklog := int(d.mu.versions.curCompactionConcurrency.Load())
2163-
allowedBasedOnManual := 0
2164-
manualBacklog := int(d.mu.compact.manualLen.Load())
2165-
if manualBacklog > 0 {
2166-
_, maxAllowed := d.opts.CompactionConcurrencyRange()
2167-
allowedBasedOnManual = min(maxAllowed, manualBacklog+allowedBasedOnBacklog)
2181+
allowedBasedOnManual := int(d.mu.compact.manualLen.Load())
2182+
allowedBasedOnSpaceHeuristic := int(d.mu.compact.burstConcurrency.Load())
2183+
2184+
v := allowedBasedOnBacklog + allowedBasedOnManual + allowedBasedOnSpaceHeuristic
2185+
if v == allowedBasedOnBacklog {
2186+
return v
21682187
}
2169-
return max(allowedBasedOnBacklog, allowedBasedOnManual)
2188+
_, maxAllowed := d.opts.CompactionConcurrencyRange()
2189+
return min(v, maxAllowed)
21702190
}
21712191

21722192
// tryScheduleDownloadCompactions tries to start download compactions.
@@ -2588,6 +2608,11 @@ func (d *DB) compact(c compaction, errChannel chan error) {
25882608
} else {
25892609
d.mu.compact.compactingCount--
25902610
}
2611+
if c.UsesBurstConcurrency() {
2612+
if v := d.mu.compact.burstConcurrency.Add(-1); v < 0 {
2613+
panic(errors.AssertionFailedf("burst concurrency underflow: %d", v))
2614+
}
2615+
}
25912616
delete(d.mu.compact.inProgress, c)
25922617
// Add this compaction's duration to the cumulative duration. NB: This
25932618
// must be atomic with the above removal of c from

compaction_picker.go

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type compactionPicker interface {
6060
getMetrics([]compactionInfo) compactionPickerMetrics
6161
getBaseLevel() int
6262
estimatedCompactionDebt() uint64
63+
pickHighPrioritySpaceCompaction(env compactionEnv) pickedCompaction
6364
pickAutoScore(env compactionEnv) (pc pickedCompaction)
6465
pickAutoNonScore(env compactionEnv) (pc pickedCompaction)
6566
forceBaseLevel1()
@@ -1406,6 +1407,22 @@ func (p *compactionPickerByScore) logCompactionForTesting(
14061407
pc.startLevel.level, pc.outputLevel.level, buf.String())
14071408
}
14081409

1410+
// pickHighPrioritySpaceCompaction checks for a high-priority space reclamation
1411+
// compaction. Under some circumstances, we want to persue a compaction for the
1412+
// purpose of reclaiming disk space even when there are eligible default
1413+
// compactions.
1414+
func (p *compactionPickerByScore) pickHighPrioritySpaceCompaction(
1415+
env compactionEnv,
1416+
) pickedCompaction {
1417+
if pc := p.pickBlobFileRewriteCompactionHighPriority(env); pc != nil {
1418+
return pc
1419+
}
1420+
// NB: We can't just return the above result because the above func returns
1421+
// a *pickedBlobFileCompaction, not a pickedCompaction. We need to return an
1422+
// untyped nil.
1423+
return nil
1424+
}
1425+
14091426
// pickAutoScore picks the best score-based compaction, if any.
14101427
//
14111428
// On each call, pickAutoScore computes per-level size adjustments based on
@@ -1482,7 +1499,7 @@ func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc picked
14821499

14831500
// Check for blob file rewrites. These are low-priority compactions because
14841501
// they don't help us keep up with writes, just reclaim disk space.
1485-
if pc := p.pickBlobFileRewriteCompaction(env); pc != nil {
1502+
if pc := p.pickBlobFileRewriteCompactionLowPriority(env); pc != nil {
14861503
return pc
14871504
}
14881505

@@ -1675,9 +1692,40 @@ func (p *compactionPickerByScore) pickRewriteCompaction(
16751692
return nil
16761693
}
16771694

1678-
// pickBlobFileRewriteCompaction looks for compactions of blob files that
1679-
// can be rewritten to reclaim disk space.
1680-
func (p *compactionPickerByScore) pickBlobFileRewriteCompaction(
1695+
// pickBlobFileRewriteCompactionHighPriority picks a compaction that rewrites a
1696+
// blob file to reclaim disk space if the heuristics for high-priority blob file
1697+
// rewrites are met.
1698+
func (p *compactionPickerByScore) pickBlobFileRewriteCompactionHighPriority(
1699+
env compactionEnv,
1700+
) (pc *pickedBlobFileCompaction) {
1701+
policy := p.opts.Experimental.ValueSeparationPolicy()
1702+
if policy.GarbageRatioHighPriority >= 1.0 {
1703+
// High-priority blob file rewrite compactions are disabled.
1704+
return nil
1705+
}
1706+
aggregateStats, heuristicStats := p.latestVersionState.blobFiles.Stats()
1707+
if heuristicStats.CountFilesEligible == 0 && heuristicStats.CountFilesTooRecent == 0 {
1708+
// No blob files with any garbage to rewrite.
1709+
return nil
1710+
}
1711+
1712+
garbagePct := float64(aggregateStats.ValueSize-aggregateStats.ReferencedValueSize) /
1713+
float64(aggregateStats.ValueSize)
1714+
if garbagePct <= policy.GarbageRatioHighPriority {
1715+
// Not enough garbage to warrant a rewrite compaction.
1716+
return nil
1717+
}
1718+
pc = p.pickBlobFileRewriteCandidate(env)
1719+
if pc != nil {
1720+
pc.highPriority = true
1721+
}
1722+
return pc
1723+
}
1724+
1725+
// pickBlobFileRewriteCompactionLowPriority picks a compaction that rewrites a
1726+
// blob file to reclaim disk space if the heuristics for low-priority blob file
1727+
// rewrites are met.
1728+
func (p *compactionPickerByScore) pickBlobFileRewriteCompactionLowPriority(
16811729
env compactionEnv,
16821730
) (pc *pickedBlobFileCompaction) {
16831731
aggregateStats, heuristicStats := p.latestVersionState.blobFiles.Stats()
@@ -1686,7 +1734,7 @@ func (p *compactionPickerByScore) pickBlobFileRewriteCompaction(
16861734
return nil
16871735
}
16881736
policy := p.opts.Experimental.ValueSeparationPolicy()
1689-
if policy.TargetGarbageRatio >= 1.0 {
1737+
if policy.GarbageRatioLowPriority >= 1.0 {
16901738
// Blob file rewrite compactions are disabled.
16911739
return nil
16921740
}
@@ -1696,19 +1744,23 @@ func (p *compactionPickerByScore) pickBlobFileRewriteCompaction(
16961744
// are actually any candidates with garbage to reclaim.
16971745
garbagePct := float64(aggregateStats.ValueSize-aggregateStats.ReferencedValueSize) /
16981746
float64(aggregateStats.ValueSize)
1699-
if garbagePct <= policy.TargetGarbageRatio {
1747+
if garbagePct <= policy.GarbageRatioLowPriority {
17001748
// Not enough garbage to warrant a rewrite compaction.
17011749
return nil
17021750
}
1751+
return p.pickBlobFileRewriteCandidate(env)
1752+
}
17031753

1754+
func (p *compactionPickerByScore) pickBlobFileRewriteCandidate(
1755+
env compactionEnv,
1756+
) (pc *pickedBlobFileCompaction) {
17041757
// Check if there is an ongoing blob file rewrite compaction. If there is,
17051758
// don't schedule a new one.
17061759
for _, c := range env.inProgressCompactions {
17071760
if c.kind == compactionKindBlobFileRewrite {
17081761
return nil
17091762
}
17101763
}
1711-
17121764
candidate, ok := p.latestVersionState.blobFiles.ReplacementCandidate()
17131765
if !ok {
17141766
// None meet the heuristic.

compaction_picker_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ package pebble
66

77
import (
88
"bytes"
9+
"cmp"
910
"fmt"
11+
"maps"
1012
"math"
13+
"slices"
1114
"sort"
1215
"strconv"
1316
"strings"
@@ -1486,6 +1489,17 @@ func TestCompactionPickerScores(t *testing.T) {
14861489
var buf bytes.Buffer
14871490
datadriven.RunTest(t, "testdata/compaction_picker_scores", func(t *testing.T, td *datadriven.TestData) string {
14881491
switch td.Cmd {
1492+
case "batch":
1493+
b := d.NewBatch()
1494+
err := runBatchDefineCmd(td, b)
1495+
if err != nil {
1496+
return err.Error()
1497+
}
1498+
if err = b.Commit(Sync); err != nil {
1499+
return err.Error()
1500+
}
1501+
return ""
1502+
14891503
case "define":
14901504
require.NoError(t, closeAllSnapshots(d))
14911505
require.NoError(t, d.Close())
@@ -1518,6 +1532,12 @@ func TestCompactionPickerScores(t *testing.T) {
15181532
d.mu.Unlock()
15191533
return ""
15201534

1535+
case "flush":
1536+
if err := d.Flush(); err != nil {
1537+
return err.Error()
1538+
}
1539+
return runLSMCmd(td, d)
1540+
15211541
case "resume-cleaning":
15221542
cleaner.resume()
15231543
return ""
@@ -1542,8 +1562,15 @@ func TestCompactionPickerScores(t *testing.T) {
15421562
d.mu.Lock()
15431563
d.opts.DisableAutomaticCompactions = false
15441564
d.maybeScheduleCompaction()
1565+
if v := d.mu.compact.burstConcurrency.Load(); v > 0 {
1566+
fmt.Fprintf(&buf, "%d burst concurrency active\n", v)
1567+
}
15451568
fmt.Fprintf(&buf, "%d compactions in progress:", d.mu.compact.compactingCount)
1546-
for c := range d.mu.compact.inProgress {
1569+
1570+
runningCompactions := slices.SortedFunc(maps.Keys(d.mu.compact.inProgress), func(a, b compaction) int {
1571+
return cmp.Compare(a.Info().String(), b.Info().String())
1572+
})
1573+
for _, c := range runningCompactions {
15471574
fmt.Fprintf(&buf, "\n%s", c)
15481575
}
15491576
d.opts.DisableAutomaticCompactions = true
@@ -1596,6 +1623,14 @@ func TestCompactionPickerScores(t *testing.T) {
15961623
tw.Flush()
15971624
return buf.String()
15981625

1626+
case "wait-compactions":
1627+
d.mu.Lock()
1628+
for d.mu.compact.compactingCount > 0 {
1629+
d.mu.compact.cond.Wait()
1630+
}
1631+
d.mu.Unlock()
1632+
return ""
1633+
15991634
case "wait-pending-table-stats":
16001635
return runWaitForTableStatsCmd(td, d)
16011636

0 commit comments

Comments
 (0)