Skip to content

Commit 56cf871

Browse files
committed
pebble: add virtual sst rewrite compaction
This commit adds a new low-priority compaction to rewrite virtual tables. We select the backing table with the least referenced data, and materialize one of its virtual tables into a physical table, allowing us to delete the backing sooner and correct overestimations of blob value liveness. Fixes: #4915
1 parent 518b3cb commit 56cf871

20 files changed

+427
-96
lines changed

compaction.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ const (
155155
compactionKindRewrite
156156
compactionKindIngestedFlushable
157157
compactionKindBlobFileRewrite
158+
compactionKindVirtualRewrite
158159
)
159160

160161
func (k compactionKind) String() string {
@@ -181,6 +182,8 @@ func (k compactionKind) String() string {
181182
return "copy"
182183
case compactionKindBlobFileRewrite:
183184
return "blob-file-rewrite"
185+
case compactionKindVirtualRewrite:
186+
return "virtual-sst-rewrite"
184187
}
185188
return "?"
186189
}
@@ -3281,7 +3284,17 @@ func (d *DB) runCompaction(
32813284
case compactionKindIngestedFlushable:
32823285
panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
32833286
}
3287+
return d.runDefaultTableCompaction(jobID, c)
3288+
}
32843289

3290+
func (d *DB) runDefaultTableCompaction(
3291+
jobID JobID, c *tableCompaction,
3292+
) (
3293+
ve *manifest.VersionEdit,
3294+
stats compact.Stats,
3295+
outputBlobs []compact.OutputBlob,
3296+
retErr error,
3297+
) {
32853298
snapshots := d.mu.snapshots.toSlice()
32863299

32873300
// Release the d.mu lock while doing I/O.

compaction_picker.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1497,6 +1497,12 @@ func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc picked
14971497
return pc
14981498
}
14991499

1500+
// Check for virtual SST rewrites. These compactions materialize virtual tables
1501+
// to reclaim space in backing files with low utilization.
1502+
if pc := p.pickVirtualRewriteCompaction(env); pc != nil {
1503+
return pc
1504+
}
1505+
15001506
// Check for blob file rewrites. These are low-priority compactions because
15011507
// they don't help us keep up with writes, just reclaim disk space.
15021508
if pc := p.pickBlobFileRewriteCompactionLowPriority(env); pc != nil {
@@ -1629,8 +1635,10 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
16291635
}
16301636

16311637
var inputs manifest.LevelSlice
1632-
if startLevel == 0 {
1638+
if startLevel == 0 && outputLevel > 0 {
16331639
// Overlapping L0 files must also be compacted alongside the candidate.
1640+
// Some compactions attempt to rewrite a file in place (e.g. virtual rewrite)
1641+
// so we only do this for L0->Lbase compactions.
16341642
inputs = p.vers.Overlaps(0, candidate.UserKeyBounds())
16351643
} else {
16361644
inputs = p.vers.Levels[startLevel].Find(p.opts.Comparer.Compare, candidate)
@@ -1692,6 +1700,39 @@ func (p *compactionPickerByScore) pickRewriteCompaction(
16921700
return nil
16931701
}
16941702

1703+
// pickVirtualRewriteCompaction looks for backing tables that have a low percentage
1704+
// of referenced data and materializes their virtual sstables.
1705+
func (p *compactionPickerByScore) pickVirtualRewriteCompaction(
1706+
env compactionEnv,
1707+
) *pickedTableCompaction {
1708+
1709+
for _, c := range env.inProgressCompactions {
1710+
// Allow only one virtual rewrite compaction at a time.
1711+
if c.kind == compactionKindVirtualRewrite {
1712+
return nil
1713+
}
1714+
}
1715+
1716+
// We'll pick one virtual table at a time to materialize. This works with our
1717+
// compaction system, which currently doesn't support outputting to multiple levels
1718+
// or selecting files that aren't contiguous in a level. Successfully materializing
1719+
// one of the backing's virtual table will also make the backing more likely to be
1720+
// picked again, since the space amp will increase.
1721+
_, vtablesByLevel := p.latestVersionState.virtualBackings.ReplacementCandidate()
1722+
for level, tables := range vtablesByLevel {
1723+
for _, vt := range tables {
1724+
if vt.IsCompacting() {
1725+
continue
1726+
}
1727+
if pc := p.pickedCompactionFromCandidateFile(vt, env, level, level, compactionKindVirtualRewrite); pc != nil {
1728+
return pc
1729+
}
1730+
}
1731+
}
1732+
1733+
return nil
1734+
}
1735+
16951736
// pickBlobFileRewriteCompactionHighPriority picks a compaction that rewrites a
16961737
// blob file to reclaim disk space if the heuristics for high-priority blob file
16971738
// rewrites are met.

compaction_picker_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,9 +1336,10 @@ func TestCompactionPickerPickFile(t *testing.T) {
13361336
}
13371337

13381338
opts := &Options{
1339-
Comparer: testkeys.Comparer,
1340-
FormatMajorVersion: FormatNewest,
1341-
Logger: testutils.Logger{T: t},
1339+
Comparer: testkeys.Comparer,
1340+
FormatMajorVersion: FormatNewest,
1341+
Logger: testutils.Logger{T: t},
1342+
DisableAutomaticCompactions: true,
13421343
}
13431344
opts.Experimental.CompactionScheduler = func() CompactionScheduler {
13441345
return NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()

compaction_scheduler.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,14 @@ func init() {
142142
compactionOptionalAndPriority{optional: true, priority: 60}
143143
scheduledCompactionMap[compactionKindElisionOnly] =
144144
compactionOptionalAndPriority{optional: true, priority: 50}
145-
scheduledCompactionMap[compactionKindBlobFileRewrite] =
145+
scheduledCompactionMap[compactionKindVirtualRewrite] =
146146
compactionOptionalAndPriority{optional: true, priority: 40}
147-
scheduledCompactionMap[compactionKindRead] =
147+
scheduledCompactionMap[compactionKindBlobFileRewrite] =
148148
compactionOptionalAndPriority{optional: true, priority: 30}
149-
scheduledCompactionMap[compactionKindRewrite] =
149+
scheduledCompactionMap[compactionKindRead] =
150150
compactionOptionalAndPriority{optional: true, priority: 20}
151+
scheduledCompactionMap[compactionKindRewrite] =
152+
compactionOptionalAndPriority{optional: true, priority: 10}
151153
}
152154

153155
// noopGrantHandle is used in cases that don't interact with a CompactionScheduler.

compaction_test.go

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,6 +1150,37 @@ func runCompactionTest(
11501150
}
11511151
return describeLSM(d, verbose)
11521152

1153+
case "run-virtual-rewrite-compaction":
1154+
err := func() error {
1155+
d.mu.Lock()
1156+
defer d.mu.Unlock()
1157+
d.mu.versions.logLock()
1158+
env := d.makeCompactionEnvLocked()
1159+
require.NotNil(t, env)
1160+
picker := d.mu.versions.picker.(*compactionPickerByScore)
1161+
pc := picker.pickVirtualRewriteCompaction(*env)
1162+
if pc == nil {
1163+
d.mu.versions.logUnlock()
1164+
return errors.New("no virtual rewrite compaction")
1165+
}
1166+
d.mu.versions.logUnlock()
1167+
d.runPickedCompaction(pc, noopGrantHandle{})
1168+
for d.mu.compact.compactingCount > 0 {
1169+
d.mu.compact.cond.Wait()
1170+
}
1171+
return nil
1172+
}()
1173+
if err != nil {
1174+
return err.Error()
1175+
}
1176+
return describeLSM(d, verbose)
1177+
1178+
case "virtual-backings":
1179+
d.mu.Lock()
1180+
s := d.mu.versions.latest.virtualBackings.String()
1181+
d.mu.Unlock()
1182+
return s
1183+
11531184
case "validate-blob-reference-index-block":
11541185
var inputTables []*manifest.TableMetadata
11551186
for _, line := range crstrings.Lines(td.Input) {
@@ -1550,6 +1581,11 @@ func TestCompaction(t *testing.T) {
15501581
verbose: true,
15511582
cmp: DefaultComparer,
15521583
},
1584+
"virtual_rewrite": {
1585+
minVersion: FormatNewest,
1586+
maxVersion: FormatNewest,
1587+
verbose: true,
1588+
},
15531589
}
15541590
datadriven.Walk(t, "testdata/compaction", func(t *testing.T, path string) {
15551591
filename := filepath.Base(path)
@@ -1608,24 +1644,21 @@ func TestCompactionDeleteOnlyHints(t *testing.T) {
16081644
}
16091645
}()
16101646

1611-
var compactInfo *CompactionInfo // protected by d.mu
1647+
var compactInfo []CompactionInfo
16121648
reset := func() (*Options, error) {
16131649
if d != nil {
1614-
compactInfo = nil
16151650
if err := closeAllSnapshots(d); err != nil {
16161651
return nil, err
16171652
}
16181653
if err := d.Close(); err != nil {
16191654
return nil, err
16201655
}
16211656
}
1657+
compactInfo = nil
16221658
el := TeeEventListener(
16231659
EventListener{
16241660
CompactionEnd: func(info CompactionInfo) {
1625-
if compactInfo != nil {
1626-
return
1627-
}
1628-
compactInfo = &info
1661+
compactInfo = append(compactInfo, info)
16291662
},
16301663
},
16311664
MakeLoggingEventListener(testutils.Logger{T: t}),
@@ -1653,21 +1686,29 @@ func TestCompactionDeleteOnlyHints(t *testing.T) {
16531686
return opts, nil
16541687
}
16551688

1689+
compactInfo = nil
16561690
compactionString := func() string {
16571691
for d.mu.compact.compactingCount > 0 {
16581692
d.mu.compact.cond.Wait()
16591693
}
1694+
slices.SortFunc(compactInfo, func(a, b CompactionInfo) int {
1695+
return cmp.Compare(a.String(), b.String())
1696+
})
16601697

1661-
s := "(none)"
1662-
if compactInfo != nil {
1698+
var b strings.Builder
1699+
if len(compactInfo) == 0 {
1700+
return "(none)\n"
1701+
}
1702+
1703+
for _, c := range compactInfo {
16631704
// Fix the job ID and durations for determinism.
1664-
compactInfo.JobID = 100
1665-
compactInfo.Duration = time.Second
1666-
compactInfo.TotalDuration = 2 * time.Second
1667-
s = compactInfo.String()
1668-
compactInfo = nil
1705+
c.JobID = 100
1706+
c.Duration = time.Second
1707+
c.TotalDuration = 2 * time.Second
1708+
b.WriteString(fmt.Sprintf("%s\n", c.String()))
16691709
}
1670-
return s
1710+
compactInfo = nil
1711+
return b.String()
16711712
}
16721713

16731714
var err error
@@ -1768,12 +1809,14 @@ func TestCompactionDeleteOnlyHints(t *testing.T) {
17681809

17691810
// NB: collectTableStats attempts to acquire the lock. Temporarily
17701811
// unlock here to avoid a deadlock.
1771-
d.mu.Unlock()
1772-
if didRun := d.collectTableStats(); !didRun {
1773-
// If a job was already running, wait for the results.
1774-
d.waitTableStats()
1775-
}
1776-
d.mu.Lock()
1812+
func() {
1813+
d.mu.Unlock()
1814+
defer d.mu.Lock()
1815+
if didRun := d.collectTableStats(); !didRun {
1816+
// If a job was already running, wait for the results.
1817+
d.waitTableStats()
1818+
}
1819+
}()
17771820

17781821
hints := d.mu.compact.deletionHints
17791822
if len(hints) == 0 {
@@ -1914,6 +1957,13 @@ func TestCompactionTombstones(t *testing.T) {
19141957
for i := range c.Output.Tables {
19151958
c.Output.Tables[i].FileNum = 0
19161959
}
1960+
if c.Reason == "virtual-sst-rewrite" {
1961+
for i := range c.Input {
1962+
for j := range c.Input[i].Tables {
1963+
c.Input[i].Tables[j].FileNum = 0
1964+
}
1965+
}
1966+
}
19171967
}
19181968
}
19191969
// Sort for determinism. We use the negative value of cmp.Compare to sort

db_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,6 +1421,9 @@ func TestVirtualSSTables(t *testing.T) {
14211421
d, err := Open("", testingRandomized(t, &Options{
14221422
FS: vfs.NewMem(),
14231423
FormatMajorVersion: FormatTableFormatV6,
1424+
// Disable automatic compactions, as virtual ssts may
1425+
// be compacted away too quickly.
1426+
DisableAutomaticCompactions: true,
14241427
}))
14251428
require.NoError(t, err)
14261429
defer func() {

internal/manifest/virtual_backings.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,9 @@ func (bv *VirtualBackings) ReplacementCandidate() (*TableBacking, [NumLevels][]*
310310
}
311311
v := bv.rewriteCandidates.items[0]
312312
var tables [NumLevels][]*TableMetadata
313-
for _, tl := range v.virtualTables {
313+
tableNums := slices.Sorted(maps.Keys(v.virtualTables))
314+
for _, t := range tableNums {
315+
tl := v.virtualTables[t]
314316
tables[tl.level] = append(tables[tl.level], tl.meta)
315317
}
316318
return v.backing, tables
@@ -330,7 +332,7 @@ func (bv *VirtualBackings) String() string {
330332
fmt.Fprintf(&buf, " %s: size=%d refBlobValueSize=%d useCount=%d protectionCount=%d virtualizedSize=%d",
331333
n, v.backing.Size, v.backing.ReferencedBlobValueSizeTotal, len(v.virtualTables), v.protectionCount, v.virtualizedSize)
332334
if !v.isLocal {
333-
fmt.Fprintf(&buf, " (external)")
335+
fmt.Fprintf(&buf, " (remote)")
334336
}
335337
tableNums := slices.Sorted(maps.Keys(v.virtualTables))
336338
fmt.Fprintf(&buf, " tables: %v\n", tableNums)

metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ type Metrics struct {
229229
RewriteCount int64
230230
MultiLevelCount int64
231231
BlobFileRewriteCount int64
232+
VirtualRewriteCount int64
232233
// An estimate of the number of bytes that need to be compacted for the LSM
233234
// to reach a stable state.
234235
EstimatedDebt uint64
@@ -733,6 +734,7 @@ var (
733734
table.Int64("copy", 5, table.AlignRight, func(m *Metrics) int64 { return m.Compact.CopyCount }),
734735
table.Int64("multi", 6, table.AlignRight, func(m *Metrics) int64 { return m.Compact.MultiLevelCount }),
735736
table.Int64("blob", 5, table.AlignRight, func(m *Metrics) int64 { return m.Compact.BlobFileRewriteCount }),
737+
table.Int64("virtual", 7, table.AlignRight, func(m *Metrics) int64 { return m.Compact.VirtualRewriteCount }),
736738
)
737739
commitPipelineInfoTableTopHeader = `COMMIT PIPELINE`
738740
commitPipelineInfoTableSubHeader = ` wals | memtables | ingestions`

metrics_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func exampleMetrics() Metrics {
5353
m.Compact.RewriteCount = 17
5454
m.Compact.MultiLevelCount = 18
5555
m.Compact.BlobFileRewriteCount = 19
56+
m.Compact.VirtualRewriteCount = 20
5657
m.Compact.EstimatedDebt = 6 * GB
5758
m.Compact.InProgressBytes = 1 * MB
5859
m.Compact.NumInProgress = 2

testdata/compaction/l0_to_lbase_compaction

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ level | score ff cff | tables size | top in read | tables blob | ta
4949
------+-------------------+--------------+-------------------+--------------+---------------------
5050
total | - - - | 3 6MB | 0B 0B 0B | 0B 0B | 3 10MB 0B
5151

52-
kind | default delete elision move read tomb rewrite copy multi blob
53-
count | 0 0 0 3 0 0 0 0 0 0
52+
kind | default delete elision move read tomb rewrite copy multi blob virtual
53+
count | 0 0 0 3 0 0 0 0 0 0 0
5454

5555
COMMIT PIPELINE
5656
wals | memtables | ingestions

0 commit comments

Comments
 (0)