Skip to content

Commit e9bf6e3

Browse files
committed
manifest: replace marked for compaction annotator with a set
The marked-for-compaction annotator is an overcomplicated way to implement an ordered set. It does not fit neatly in the annotator pattern - unlike all other annotators, it can become invalidated. It also requires separate logic to maintain a count of marked files. We remove this annotator and replace it with an ordered set (using the google btree) stored in the Version. This is a lot cleaner and also addresses a limitation of the annotator: we were only able to obtain one candidate (and thus only one rewrite compaction at any time).
1 parent bc795a5 commit e9bf6e3

12 files changed

+177
-138
lines changed

compaction_picker.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,7 +1524,7 @@ func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc picked
15241524
// MarkedForCompaction field is persisted in the manifest. That's okay. We
15251525
// previously would've ignored the designation, whereas now we'll re-compact
15261526
// the file in place.
1527-
if p.vers.Stats.MarkedForCompaction > 0 {
1527+
if p.vers.MarkedForCompaction.Count() > 0 {
15281528
if pc := p.pickRewriteCompaction(env); pc != nil {
15291529
return pc
15301530
}
@@ -1597,19 +1597,6 @@ var elisionOnlyAnnotator = manifest.NewTableAnnotator[manifest.TableMetadata](ma
15971597
},
15981598
})
15991599

1600-
// markedForCompactionAnnotator is a manifest.TableAnnotator that annotates B-Tree
1601-
// nodes with the *fileMetadata of a file that is marked for compaction
1602-
// within the subtree. If multiple files meet the criteria, it chooses
1603-
// whichever file has the lowest LargestSeqNum.
1604-
var markedForCompactionAnnotator = manifest.NewTableAnnotator[manifest.TableMetadata](manifest.PickFileAggregator{
1605-
Filter: func(f *manifest.TableMetadata) (eligible bool, cacheOK bool) {
1606-
return f.MarkedForCompaction, true
1607-
},
1608-
Compare: func(f1 *manifest.TableMetadata, f2 *manifest.TableMetadata) bool {
1609-
return f1.LargestSeqNum < f2.LargestSeqNum
1610-
},
1611-
})
1612-
16131600
// pickedCompactionFromCandidateFile creates a pickedCompaction from a *fileMetadata
16141601
// with various checks to ensure that the file still exists in the expected level
16151602
// and isn't already being compacted.
@@ -1680,17 +1667,8 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
16801667
func (p *compactionPickerByScore) pickRewriteCompaction(
16811668
env compactionEnv,
16821669
) (pc *pickedTableCompaction) {
1683-
if p.vers.Stats.MarkedForCompaction == 0 {
1684-
return nil
1685-
}
1686-
for l := numLevels - 1; l >= 0; l-- {
1687-
candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l])
1688-
if candidate == nil {
1689-
// Try the next level.
1690-
continue
1691-
}
1692-
pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite)
1693-
if pc != nil {
1670+
for candidate, level := range p.vers.MarkedForCompaction.Ascending() {
1671+
if pc := p.pickedCompactionFromCandidateFile(candidate, env, level, level, compactionKindRewrite); pc != nil {
16941672
return pc
16951673
}
16961674
}

compaction_picker_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -690,9 +690,9 @@ func TestCompactionPickerL0(t *testing.T) {
690690
if f.TableNum != base.TableNum(tableNum) {
691691
continue
692692
}
693+
// This code should be identical to the one in DB.markFilesLocked().
693694
f.MarkedForCompaction = true
694-
picker.vers.Stats.MarkedForCompaction++
695-
markedForCompactionAnnotator.InvalidateLevelAnnotation(picker.vers.Levels[l])
695+
picker.vers.MarkedForCompaction.Insert(f, l)
696696
return fmt.Sprintf("marked L%d.%s", l, f.TableNum)
697697
}
698698
}

compaction_test.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2789,23 +2789,19 @@ func TestMarkedForCompaction(t *testing.T) {
27892789
},
27902790
Logger: testutils.Logger{T: t},
27912791
}
2792+
opts.Experimental.CompactionScheduler = func() CompactionScheduler {
2793+
return NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()
2794+
}
27922795
opts.WithFSDefaults()
27932796

2794-
reset := func() {
2795-
if d != nil {
2796-
require.NoError(t, d.Close())
2797-
}
2798-
mem = vfs.NewMem()
2799-
require.NoError(t, mem.MkdirAll("ext", 0755))
2800-
2801-
var err error
2802-
d, err = Open("", opts)
2803-
require.NoError(t, err)
2804-
}
28052797
datadriven.RunTest(t, "testdata/marked_for_compaction", func(t *testing.T, td *datadriven.TestData) string {
2798+
buf.Reset()
28062799
switch td.Cmd {
2807-
case "reset":
2808-
reset()
2800+
case "reopen":
2801+
require.NoError(t, d.Close())
2802+
var err error
2803+
d, err = Open("", opts)
2804+
require.NoError(t, err)
28092805
return ""
28102806

28112807
case "define":
@@ -2829,23 +2825,31 @@ func TestMarkedForCompaction(t *testing.T) {
28292825
return s
28302826

28312827
case "mark-for-compaction":
2832-
d.mu.Lock()
2833-
defer d.mu.Unlock()
2834-
vers := d.mu.versions.currentVersion()
28352828
var tableNum uint64
28362829
td.ScanArgs(t, "file", &tableNum)
2837-
for l, lm := range vers.Levels {
2838-
for f := range lm.All() {
2839-
if f.TableNum != base.TableNum(tableNum) {
2840-
continue
2830+
2831+
findFn := func(v *manifest.Version) (found bool, files [numLevels][]*manifest.TableMetadata, _ error) {
2832+
for l, lm := range v.Levels {
2833+
for f := range lm.All() {
2834+
if f.TableNum == base.TableNum(tableNum) {
2835+
files[l] = append(files[l], f)
2836+
fmt.Fprintf(&buf, "marked L%d.%s", l, f.TableNum)
2837+
return true, files, nil
2838+
}
28412839
}
2842-
f.MarkedForCompaction = true
2843-
vers.Stats.MarkedForCompaction++
2844-
markedForCompactionAnnotator.InvalidateLevelAnnotation(vers.Levels[l])
2845-
return fmt.Sprintf("marked L%d.%s", l, f.TableNum)
28462840
}
2841+
return false, files, nil
2842+
}
2843+
d.mu.Lock()
2844+
defer d.mu.Unlock()
2845+
if err := d.markFilesForCompactionLocked(findFn); err != nil {
2846+
td.Fatalf(t, "markFilesLocked: %s", err)
28472847
}
2848-
return "not-found"
2848+
2849+
if buf.Len() == 0 {
2850+
return "not-found"
2851+
}
2852+
return buf.String()
28492853

28502854
case "maybe-compact":
28512855
d.mu.Lock()
@@ -2854,11 +2858,11 @@ func TestMarkedForCompaction(t *testing.T) {
28542858
d.maybeScheduleCompaction()
28552859
for d.mu.compact.compactingCount > 0 {
28562860
d.mu.compact.cond.Wait()
2861+
d.maybeScheduleCompaction()
28572862
}
28582863

28592864
fmt.Fprintln(&buf, d.mu.versions.currentVersion().DebugString())
28602865
s := strings.TrimSpace(buf.String())
2861-
buf.Reset()
28622866
opts.DisableAutomaticCompactions = true
28632867
return s
28642868

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2061,7 +2061,7 @@ func (d *DB) Metrics() *Metrics {
20612061
metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
20622062
// TODO(radu): split this to separate the download compactions.
20632063
metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount + d.mu.compact.downloadingCount)
2064-
metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
2064+
metrics.Compact.MarkedFiles = vers.MarkedForCompaction.Count()
20652065
metrics.Compact.Duration = d.mu.compact.duration
20662066
for c := range d.mu.compact.inProgress {
20672067
if !c.IsFlush() {

format_major_version.go

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ func (d *DB) writeFormatVersionMarker(formatVers FormatMajorVersion) error {
536536
// waiting for compactions to complete (or for slots to free up).
537537
func (d *DB) compactMarkedFilesLocked() error {
538538
curr := d.mu.versions.currentVersion()
539-
if curr.Stats.MarkedForCompaction == 0 {
539+
if curr.MarkedForCompaction.Count() == 0 {
540540
return nil
541541
}
542542
// Attempt to schedule a compaction to rewrite a file marked for compaction.
@@ -552,7 +552,7 @@ func (d *DB) compactMarkedFilesLocked() error {
552552
// compaction. Or compaction of the file might have already been in
553553
// progress. In any scenario, wait until there's some change in the
554554
// state of active compactions.
555-
for curr.Stats.MarkedForCompaction > 0 {
555+
for curr.MarkedForCompaction.Count() > 0 {
556556
// Before waiting, check that the database hasn't been closed. Trying to
557557
// schedule the compaction may have dropped d.mu while waiting for a
558558
// manifest write to complete. In that dropped interim, the database may
@@ -568,7 +568,7 @@ func (d *DB) compactMarkedFilesLocked() error {
568568

569569
// Only wait on compactions if there are files still marked for compaction.
570570
// NB: Waiting on this condition variable drops d.mu while blocked.
571-
if curr.Stats.MarkedForCompaction > 0 {
571+
if curr.MarkedForCompaction.Count() > 0 {
572572
// NB: we cannot assert that d.mu.compact.compactingCount > 0, since
573573
// with a CompactionScheduler a DB may not have even one ongoing
574574
// compaction (if other competing activities are being preferred by the
@@ -586,13 +586,9 @@ func (d *DB) compactMarkedFilesLocked() error {
586586
// level.
587587
type findFilesFunc func(v *manifest.Version) (found bool, files [numLevels][]*manifest.TableMetadata, _ error)
588588

589-
// This method is not used currently, but it will be useful the next time we need
590-
// to mark files for compaction.
591-
var _ = (*DB)(nil).markFilesLocked
592-
593-
// markFilesLocked durably marks the files that match the given findFilesFunc for
589+
// markFilesForCompactionLocked durably marks the files that match the given findFilesFunc for
594590
// compaction.
595-
func (d *DB) markFilesLocked(findFn findFilesFunc) error {
591+
func (d *DB) markFilesForCompactionLocked(findFn findFilesFunc) error {
596592
jobID := d.newJobIDLocked()
597593

598594
// Acquire a read state to have a view of the LSM and a guarantee that none
@@ -634,32 +630,22 @@ func (d *DB) markFilesLocked(findFn findFilesFunc) error {
634630
// been re-acquired by the defer within the above anonymous function.
635631
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
636632
vers := d.mu.versions.currentVersion()
637-
for l, filesToMark := range files {
638-
if len(filesToMark) == 0 {
639-
continue
640-
}
633+
for level, filesToMark := range files {
641634
for _, f := range filesToMark {
642635
// Ignore files to be marked that have already been compacted or marked.
643636
if f.CompactionState == manifest.CompactionStateCompacted ||
644637
f.MarkedForCompaction {
645638
continue
646639
}
647640
// Else, mark the file for compaction in this version.
648-
vers.Stats.MarkedForCompaction++
649641
f.MarkedForCompaction = true
642+
// We are modifying the current version in-place (so that the updated
643+
// set is reflected in the "base" version in the new manifest). This is
644+
// ok because we are holding the DB lock and all code that uses the
645+
// MarkedForCompaction set runs under the DB lock.
646+
// TODO(radu): find a less sketchy way to do this.
647+
vers.MarkedForCompaction.Insert(f, level)
650648
}
651-
// The compaction picker uses the markedForCompactionAnnotator to
652-
// quickly find files marked for compaction, or to quickly determine
653-
// that there are no such files marked for compaction within a level.
654-
// A b-tree node may be annotated with an annotation recording that
655-
// there are no files marked for compaction within the node's subtree,
656-
// based on the assumption that it's static.
657-
//
658-
// Since we're marking files for compaction, these b-tree nodes'
659-
// annotations will be out of date. Clear the compaction-picking
660-
// annotation, so that it's recomputed the next time the compaction
661-
// picker looks for a file marked for compaction.
662-
markedForCompactionAnnotator.InvalidateLevelAnnotation(vers.Levels[l])
663649
}
664650
// The 'marked-for-compaction' bit is persisted in the MANIFEST file
665651
// metadata. We've already modified the in-memory table metadata, but the

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06
1515
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9
1616
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e
17+
github.com/google/btree v1.1.3
1718
github.com/guptarohit/asciigraph v0.5.5
1819
github.com/klauspost/compress v1.17.11
1920
github.com/kr/pretty v0.3.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ github.com/gonum/internal v0.0.0-20181124074243-f884aa714029/go.mod h1:Pu4dmpkhS
108108
github.com/gonum/lapack v0.0.0-20181123203213-e4cdc5a0bff9/go.mod h1:XA3DeT6rxh2EAE789SSiSJNqxPaC0aE9J8NTOI0Jo/A=
109109
github.com/gonum/matrix v0.0.0-20181209220409-c518dec07be9/go.mod h1:0EXg4mc1CNP0HCqCz+K4ts155PXIlUywf0wqN+GfPZw=
110110
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
111+
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
112+
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
111113
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
112114
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
113115
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=

internal/manifest/annotator.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -258,18 +258,6 @@ func (a *TableAnnotator[T]) accumulateRangeAnnotation(
258258
return dst
259259
}
260260

261-
// InvalidateAnnotation removes any existing cached annotations from this
262-
// annotator from a node's subtree.
263-
func (a *TableAnnotator[T]) invalidateNodeAnnotation(n *node[*TableMetadata]) {
264-
annot := a.findAnnotation(n)
265-
annot.valid.Store(false)
266-
if !n.isLeaf() {
267-
for i := int16(0); i <= n.count; i++ {
268-
a.invalidateNodeAnnotation(n.child(i))
269-
}
270-
}
271-
}
272-
273261
// LevelAnnotation calculates the annotation defined by this TableAnnotator for all
274262
// files in the given LevelMetadata. A pointer to the TableAnnotator is used as the
275263
// key for pre-calculated values, so the same TableAnnotator must be used to avoid
@@ -337,20 +325,6 @@ func (a *TableAnnotator[T]) VersionRangeAnnotation(v *Version, bounds base.UserK
337325
return dst
338326
}
339327

340-
// InvalidateLevelAnnotation clears any cached annotations defined by TableAnnotator.
341-
// A pointer to the TableAnnotator is used as the key for pre-calculated values, so
342-
// the same TableAnnotator must be used to clear the appropriate cached annotation.
343-
// Calls to InvalidateLevelAnnotation are *not* concurrent-safe with any other
344-
// calls to TableAnnotator methods for the same TableAnnotator (concurrent calls from
345-
// other annotators are fine). Any calls to this function must have some
346-
// externally-guaranteed mutual exclusion.
347-
func (a *TableAnnotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
348-
if lm.Empty() {
349-
return
350-
}
351-
a.invalidateNodeAnnotation(lm.tree.root)
352-
}
353-
354328
// Annotation calculates the annotation defined by this BlobFileAnnotator for
355329
// all blob files in the given set.
356330
// A pointer to the TableAnnotator is used as the key for pre-calculated values,
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package manifest
6+
7+
import (
8+
"iter"
9+
10+
"github.com/cockroachdb/errors"
11+
"github.com/cockroachdb/pebble/internal/invariants"
12+
googlebtree "github.com/google/btree"
13+
)
14+
15+
// MarkedForCompactionSet implements an ordered set of tables that are marked
16+
// for compaction. They are ordered by decreasing LSM level, and within each
17+
// level by increasing LargestSeqNum.
18+
//
19+
// Files can be marked for compaction when upgrading a DB to a specific version.
20+
type MarkedForCompactionSet struct {
21+
tree *googlebtree.BTreeG[tableAndLevel]
22+
}
23+
24+
type tableAndLevel struct {
25+
meta *TableMetadata
26+
level int
27+
}
28+
29+
func markedForCompactionLessFn(a, b tableAndLevel) bool {
30+
if a.level != b.level {
31+
return a.level > b.level
32+
}
33+
if a.meta.LargestSeqNum != b.meta.LargestSeqNum {
34+
return a.meta.LargestSeqNum < b.meta.LargestSeqNum
35+
}
36+
return a.meta.TableNum < b.meta.TableNum
37+
}
38+
39+
// Count returns the number of tables in the set.
40+
func (s *MarkedForCompactionSet) Count() int {
41+
if s.tree == nil {
42+
return 0
43+
}
44+
return s.tree.Len()
45+
}
46+
47+
// Insert adds a table to the set. The table must not be in the set already.
48+
func (s *MarkedForCompactionSet) Insert(meta *TableMetadata, level int) {
49+
if s.tree == nil {
50+
s.tree = googlebtree.NewG[tableAndLevel](8, markedForCompactionLessFn)
51+
}
52+
_, existed := s.tree.ReplaceOrInsert(tableAndLevel{meta: meta, level: level})
53+
if invariants.Enabled && existed {
54+
panic(errors.AssertionFailedf("table %s already in MarkedForCompaction", meta.TableNum))
55+
}
56+
}
57+
58+
// Delete removes a table from the set. The table must be in the set.
59+
func (s *MarkedForCompactionSet) Delete(meta *TableMetadata, level int) {
60+
ok := false
61+
if s.tree != nil {
62+
_, ok = s.tree.Delete(tableAndLevel{meta: meta, level: level})
63+
}
64+
if invariants.Enabled && !ok {
65+
panic(errors.AssertionFailedf("table %s not in MarkedForCompaction", meta.TableNum))
66+
}
67+
}
68+
69+
// Clone the set. The resulting set can be independently modified.
70+
func (s *MarkedForCompactionSet) Clone() MarkedForCompactionSet {
71+
if s.tree == nil {
72+
return MarkedForCompactionSet{}
73+
}
74+
return MarkedForCompactionSet{tree: s.tree.Clone()}
75+
}
76+
77+
// Ascending returns an iterator which produces the tables marked for compaction
78+
// (along with the level), ordered by decreasing level and within each level by
79+
// increasing LargestSeqNum.
80+
func (s *MarkedForCompactionSet) Ascending() iter.Seq2[*TableMetadata, int] {
81+
return func(yield func(*TableMetadata, int) bool) {
82+
if s.tree == nil {
83+
return
84+
}
85+
s.tree.Ascend(func(t tableAndLevel) bool {
86+
return yield(t.meta, t.level)
87+
})
88+
}
89+
}

0 commit comments

Comments
 (0)