Skip to content

Commit 8b880ed

Browse files
committed
db: add compaction interface
In preparation for the introduction of blob file rewrite compactions, rename the existing compaction struct to tableCompaction and add a compaction interface. This change is largely mechanical. The interface is not as simple as it could be and should be improved, but this is deferred for now.
1 parent aa5a248 commit 8b880ed

11 files changed

+234
-192
lines changed

compaction.go

Lines changed: 195 additions & 134 deletions
Large diffs are not rendered by default.

compaction_picker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ type pickedCompaction interface {
7373
// compaction is not a result of a manual compaction.
7474
ManualID() uint64
7575
// ConstructCompaction creates a compaction from the picked compaction.
76-
ConstructCompaction(*DB, CompactionGrantHandle) *compaction
76+
ConstructCompaction(*DB, CompactionGrantHandle) compaction
7777
// WaitingCompaction returns a WaitingCompaction description of this
7878
// compaction for consumption by the compaction scheduler.
7979
WaitingCompaction() WaitingCompaction
@@ -236,7 +236,7 @@ func (pc *pickedTableCompaction) Score() float64 { return pc.score }
236236
// pickedTableCompaction.
237237
func (pc *pickedTableCompaction) ConstructCompaction(
238238
d *DB, grantHandle CompactionGrantHandle,
239-
) *compaction {
239+
) compaction {
240240
return newCompaction(
241241
pc,
242242
d.opts,

compaction_picker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1654,7 +1654,7 @@ func TestCompactionPickerScores(t *testing.T) {
16541654
d.mu.Lock()
16551655
wait := len(d.mu.compact.inProgress) > 0
16561656
for c := range d.mu.compact.inProgress {
1657-
wait = wait && !c.versionEditApplied
1657+
wait = wait && !c.VersionEditApplied()
16581658
}
16591659
d.mu.Unlock()
16601660
if !wait {

compaction_scheduler.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"sync"
99
"time"
1010

11-
"github.com/cockroachdb/errors"
1211
"github.com/cockroachdb/pebble/internal/base"
1312
)
1413

@@ -149,17 +148,6 @@ func init() {
149148
compactionOptionalAndPriority{optional: true, priority: 30}
150149
}
151150

152-
func makeWaitingCompaction(manual bool, kind compactionKind, score float64) WaitingCompaction {
153-
if manual {
154-
return WaitingCompaction{Priority: manualCompactionPriority, Score: score}
155-
}
156-
entry, ok := scheduledCompactionMap[kind]
157-
if !ok {
158-
panic(errors.AssertionFailedf("unexpected compactionKind %s", kind))
159-
}
160-
return WaitingCompaction{Optional: entry.optional, Priority: entry.priority, Score: score}
161-
}
162-
163151
// noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
164152
type noopGrantHandle struct{}
165153

compaction_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -915,8 +915,8 @@ func TestCompaction(t *testing.T) {
915915
}
916916

917917
// d.mu must be held when calling.
918-
createOngoingCompaction := func(start, end []byte, startLevel, outputLevel int) (ongoingCompaction *compaction) {
919-
ongoingCompaction = &compaction{
918+
createOngoingCompaction := func(start, end []byte, startLevel, outputLevel int) (ongoingCompaction *tableCompaction) {
919+
ongoingCompaction = &tableCompaction{
920920
inputs: []compactionLevel{{level: startLevel}, {level: outputLevel}},
921921
bounds: base.UserKeyBoundsInclusive(start, end),
922922
}
@@ -937,7 +937,7 @@ func TestCompaction(t *testing.T) {
937937
}
938938

939939
// d.mu must be held when calling.
940-
deleteOngoingCompaction := func(ongoingCompaction *compaction) {
940+
deleteOngoingCompaction := func(ongoingCompaction *tableCompaction) {
941941
for _, cl := range ongoingCompaction.inputs {
942942
for f := range cl.files.All() {
943943
f.CompactionState = manifest.CompactionStateNotCompacting
@@ -949,7 +949,7 @@ func TestCompaction(t *testing.T) {
949949

950950
runTest := func(t *testing.T, testData string, minVersion, maxVersion FormatMajorVersion, verbose bool) {
951951
reset(minVersion, maxVersion)
952-
var ongoingCompaction *compaction
952+
var ongoingCompaction *tableCompaction
953953
datadriven.RunTest(t, testData, func(t *testing.T, td *datadriven.TestData) string {
954954
switch td.Cmd {
955955
case "reset":
@@ -2091,7 +2091,7 @@ func TestCompactionAllowZeroSeqNum(t *testing.T) {
20912091

20922092
case "allow-zero-seqnum":
20932093
d.mu.Lock()
2094-
c := &compaction{
2094+
c := &tableCompaction{
20952095
comparer: d.opts.Comparer,
20962096
version: d.mu.versions.currentVersion(),
20972097
inputs: []compactionLevel{{}, {}},
@@ -2173,7 +2173,7 @@ func TestCompactionErrorOnUserKeyOverlap(t *testing.T) {
21732173
func(t *testing.T, d *datadriven.TestData) string {
21742174
switch d.Cmd {
21752175
case "error-on-user-key-overlap":
2176-
c := &compaction{comparer: DefaultComparer}
2176+
c := &tableCompaction{comparer: DefaultComparer}
21772177
var files []manifest.NewTableEntry
21782178
tableNum := base.TableNum(1)
21792179

@@ -2300,7 +2300,7 @@ func TestCompactionCheckOrdering(t *testing.T) {
23002300
func(t *testing.T, d *datadriven.TestData) string {
23012301
switch d.Cmd {
23022302
case "check-ordering":
2303-
c := &compaction{
2303+
c := &tableCompaction{
23042304
comparer: DefaultComparer,
23052305
logger: panicLogger{},
23062306
inputs: []compactionLevel{{level: -1}, {level: -1}},
@@ -2552,7 +2552,7 @@ func TestAdjustGrandparentOverlapBytesForFlush(t *testing.T) {
25522552
}
25532553
for _, tc := range testCases {
25542554
t.Run("", func(t *testing.T) {
2555-
c := compaction{
2555+
c := tableCompaction{
25562556
grandparents: ls,
25572557
maxOverlapBytes: maxOverlapBytes,
25582558
maxOutputFileSize: maxOutputFileSize,

data_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
948948
if err != nil {
949949
return err
950950
}
951-
c.getValueSeparation = func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation {
951+
c.getValueSeparation = func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation {
952952
return valueSeparator
953953
}
954954
// NB: define allows the test to exactly specify which keys go
@@ -1018,12 +1018,12 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
10181018
}
10191019

10201020
// Example, compact: a-c.
1021-
parseCompaction := func(outputLevel int, s string) (*compaction, error) {
1021+
parseCompaction := func(outputLevel int, s string) (*tableCompaction, error) {
10221022
m, err := parseMeta(s)
10231023
if err != nil {
10241024
return nil, err
10251025
}
1026-
c := &compaction{
1026+
c := &tableCompaction{
10271027
inputs: []compactionLevel{{}, {level: outputLevel}},
10281028
bounds: m.UserKeyBounds(),
10291029
}

db.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ type DB struct {
433433
// sublevels' state. Some of the compactions contained within this
434434
// map may have already committed an edit to the version but are
435435
// lingering performing cleanup, like deleting obsolete files.
436-
inProgress map[*compaction]struct{}
436+
inProgress map[compaction]struct{}
437437

438438
// rescheduleReadCompaction indicates to an iterator that a read compaction
439439
// should be scheduled.
@@ -2080,8 +2080,8 @@ func (d *DB) Metrics() *Metrics {
20802080
metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
20812081
metrics.Compact.Duration = d.mu.compact.duration
20822082
for c := range d.mu.compact.inProgress {
2083-
if c.kind != compactionKindFlush && c.kind != compactionKindIngestedFlushable {
2084-
metrics.Compact.Duration += d.timeNow().Sub(c.metrics.beganAt)
2083+
if !c.IsFlush() {
2084+
metrics.Compact.Duration += d.timeNow().Sub(c.BeganAt())
20852085
}
20862086
}
20872087
metrics.Compact.NumProblemSpans = d.problemSpans.Len()
@@ -2860,19 +2860,10 @@ func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
28602860
return seqNum
28612861
}
28622862

2863-
func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
2863+
func (d *DB) getInProgressCompactionInfoLocked(finishing *tableCompaction) (rv []compactionInfo) {
28642864
for c := range d.mu.compact.inProgress {
2865-
if len(c.flush.flushables) == 0 && (finishing == nil || c != finishing) {
2866-
info := compactionInfo{
2867-
versionEditApplied: c.versionEditApplied,
2868-
inputs: c.inputs,
2869-
bounds: c.bounds,
2870-
outputLevel: -1,
2871-
}
2872-
if c.outputLevel != nil {
2873-
info.outputLevel = c.outputLevel.level
2874-
}
2875-
rv = append(rv, info)
2865+
if !c.IsFlush() && (finishing == nil || c != finishing) {
2866+
rv = append(rv, c.Info())
28762867
}
28772868
}
28782869
return

download.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ func (d *DB) tryLaunchDownloadForFile(
449449
c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, noopGrantHandle{}, d.TableFormat(), d.determineCompactionValueSeparation)
450450
c.isDownload = true
451451
d.mu.compact.downloadingCount++
452-
d.addInProgressCompaction(c)
452+
c.AddInProgressLocked(d)
453453
go d.compact(c, doneCh)
454454
return doneCh, true
455455
}

ingest.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ func ingestTargetLevel(
912912
cmp base.Compare,
913913
lsmOverlap overlap.WithLSM,
914914
baseLevel int,
915-
compactions map[*compaction]struct{},
915+
compactions map[compaction]struct{},
916916
meta *manifest.TableMetadata,
917917
suggestSplit bool,
918918
) (targetLevel int, splitFile *manifest.TableMetadata, err error) {
@@ -1027,10 +1027,14 @@ func ingestTargetLevel(
10271027
// unless necessary.
10281028
overlaps := false
10291029
for c := range compactions {
1030-
if c.outputLevel == nil || level != c.outputLevel.level {
1030+
tblCompaction, ok := c.(*tableCompaction)
1031+
if !ok {
1032+
continue
1033+
}
1034+
if tblCompaction.outputLevel == nil || level != tblCompaction.outputLevel.level {
10311035
continue
10321036
}
1033-
if metaBounds.Overlaps(cmp, &c.bounds) {
1037+
if metaBounds.Overlaps(cmp, tblCompaction.Bounds()) {
10341038
overlaps = true
10351039
break
10361040
}
@@ -2101,7 +2105,7 @@ func (d *DB) ingestApply(
21012105
}
21022106
if len(filesToSplit) > 0 || exciseSpan.Valid() {
21032107
for c := range d.mu.compact.inProgress {
2104-
if c.versionEditApplied {
2108+
if c.VersionEditApplied() {
21052109
continue
21062110
}
21072111
// Check if this compaction overlaps with the excise span. Note that just
@@ -2111,8 +2115,8 @@ func (d *DB) ingestApply(
21112115
// doing a [c,d) excise at the same time as this compaction, we will have
21122116
// to error out the whole compaction as we can't guarantee it hasn't/won't
21132117
// write a file overlapping with the excise span.
2114-
if c.bounds.Overlaps(d.cmp, &exciseBounds) {
2115-
c.cancel.Store(true)
2118+
if c.Bounds().Overlaps(d.cmp, &exciseBounds) {
2119+
c.Cancel()
21162120
}
21172121
// Check if this compaction's inputs have been replaced due to an
21182122
// ingest-time split. In that case, cancel the compaction as a newly picked
@@ -2121,12 +2125,10 @@ func (d *DB) ingestApply(
21212125
// file that was ingest-split as an input, even if it started before this
21222126
// ingestion.
21232127
if checkCompactions {
2124-
for i := range c.inputs {
2125-
for f := range c.inputs[i].files.All() {
2126-
if _, ok := replacedTables[f.TableNum]; ok {
2127-
c.cancel.Store(true)
2128-
break
2129-
}
2128+
for _, table := range c.Tables() {
2129+
if _, ok := replacedTables[table.TableNum]; ok {
2130+
c.Cancel()
2131+
break
21302132
}
21312133
}
21322134
}

open.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
274274
d.mu.mem.nextSize = initialMemTableSize
275275
}
276276
d.mu.compact.cond.L = &d.mu.Mutex
277-
d.mu.compact.inProgress = make(map[*compaction]struct{})
277+
d.mu.compact.inProgress = make(map[compaction]struct{})
278278
d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
279279
d.mu.snapshots.init()
280280
// logSeqNum is the next sequence number that will be assigned.

0 commit comments

Comments
 (0)