Skip to content

Commit ee64cc6

Browse files
committed
db: organize compaction kind-specific fields
Organize the fields specific to delete-only compactions and flushes into their own struct fields.
1 parent 6cbf981 commit ee64cc6

File tree

3 files changed

+69
-58
lines changed

3 files changed

+69
-58
lines changed

compaction.go

Lines changed: 66 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ type compaction struct {
191191
// to cancel, such as if a conflicting excise operation raced it to manifest
192192
// application. Only holders of the manifest lock will write to this atomic.
193193
cancel atomic.Bool
194-
194+
// kind indicates the kind of compaction. Different compaction kinds have
195+
// different semantics and mechanics. Some may have additional fields.
195196
kind compactionKind
196197
// isDownload is true if this compaction was started as part of a Download
197198
// operation. In this case kind is compactionKindCopy or
@@ -253,8 +254,6 @@ type compaction struct {
253254
// single output table with the tables in the grandparent level.
254255
maxOverlapBytes uint64
255256

256-
// flushing contains the flushables (aka memtables) that are being flushed.
257-
flushing flushableList
258257
// bytesWritten contains the number of bytes that have been written to outputs.
259258
bytesWritten atomic.Int64
260259

@@ -271,10 +270,6 @@ type compaction struct {
271270
// in the grandparent when this compaction finishes will be the same.
272271
grandparents manifest.LevelSlice
273272

274-
// Boundaries at which flushes to L0 should be split. Determined by
275-
// L0Sublevels. If nil, flushes aren't split.
276-
l0Limits [][]byte
277-
278273
delElision compact.TombstoneElision
279274
rangeKeyElision compact.TombstoneElision
280275

@@ -284,14 +279,31 @@ type compaction struct {
284279
// lower level in the LSM during runCompaction.
285280
allowedZeroSeqNum bool
286281

287-
// deletionHints are set if this is a compactionKindDeleteOnly. Used to figure
288-
// out whether an input must be deleted in its entirety, or excised into
289-
// virtual sstables.
290-
deletionHints []deleteCompactionHint
291-
292-
// exciseEnabled is set to true if this is a compactionKindDeleteOnly and
293-
// this compaction is allowed to excise files.
294-
exciseEnabled bool
282+
// deleteOnly contains information specific to compactions with kind
283+
// compactionKindDeleteOnly. A delete-only compaction is a special
284+
// compaction that does not merge or write sstables. Instead, it only
285+
// performs deletions either through removing whole sstables from the LSM or
286+
// virtualizing them into virtual sstables.
287+
deleteOnly struct {
288+
// hints are collected by the table stats collector and describe range
289+
// deletions and the files containing keys deleted by them.
290+
hints []deleteCompactionHint
291+
// exciseEnabled is set to true if this compaction is allowed to excise
292+
// files. If false, the compaction will only remove whole sstables that
293+
// are wholly contained within the bounds of range deletions.
294+
exciseEnabled bool
295+
}
296+
// flush contains information specific to flushes (compactionKindFlush and
297+
// compactionKindIngestedFlushable). A flush is modeled by a compaction
298+
// because it has similar mechanics to a default compaction.
299+
flush struct {
300+
// flushables contains the flushables (aka memtables, large batches,
301+
// flushable ingestions, etc) that are being flushed.
302+
flushables flushableList
303+
// Boundaries at which sstables flushed to L0 should be split.
304+
// Determined by L0Sublevels. If nil, ignored.
305+
l0Limits [][]byte
306+
}
295307

296308
metrics levelMetricsDelta
297309

@@ -526,16 +538,16 @@ func newDeleteOnlyCompaction(
526538
exciseEnabled bool,
527539
) *compaction {
528540
c := &compaction{
529-
kind: compactionKindDeleteOnly,
530-
comparer: opts.Comparer,
531-
logger: opts.Logger,
532-
version: cur,
533-
beganAt: beganAt,
534-
inputs: inputs,
535-
deletionHints: hints,
536-
exciseEnabled: exciseEnabled,
537-
grantHandle: noopGrantHandle{},
538-
}
541+
kind: compactionKindDeleteOnly,
542+
comparer: opts.Comparer,
543+
logger: opts.Logger,
544+
version: cur,
545+
beganAt: beganAt,
546+
inputs: inputs,
547+
grantHandle: noopGrantHandle{},
548+
}
549+
c.deleteOnly.hints = hints
550+
c.deleteOnly.exciseEnabled = exciseEnabled
539551
// Acquire a reference to the version to ensure that files and in-memory
540552
// version state necessary for reading files remain available. Ignoring
541553
// excises, this isn't strictly necessary for reading the sstables that are
@@ -657,10 +669,11 @@ func newFlush(
657669
getValueSeparation: getValueSeparation,
658670
maxOutputFileSize: math.MaxUint64,
659671
maxOverlapBytes: math.MaxUint64,
660-
flushing: flushing,
661672
grantHandle: noopGrantHandle{},
662673
tableFormat: tableFormat,
663674
}
675+
c.flush.flushables = flushing
676+
c.flush.l0Limits = l0Organizer.FlushSplitKeys()
664677
c.startLevel = &c.inputs[0]
665678
c.outputLevel = &c.inputs[1]
666679
if len(flushing) > 0 {
@@ -670,6 +683,14 @@ func newFlush(
670683
}
671684
c.kind = compactionKindIngestedFlushable
672685
return c, nil
686+
} else {
687+
// Make sure there's no ingestedFlushable after the first flushable
688+
// in the list.
689+
for _, f := range c.flush.flushables[1:] {
690+
if _, ok := f.flushable.(*ingestedFlushable); ok {
691+
panic("pebble: flushables shouldn't contain ingestedFlushable")
692+
}
693+
}
673694
}
674695
}
675696

@@ -683,16 +704,6 @@ func newFlush(
683704
c.getValueSeparation = neverSeparateValues
684705
}
685706

686-
// Make sure there's no ingestedFlushable after the first flushable in the
687-
// list.
688-
for _, f := range flushing {
689-
if _, ok := f.flushable.(*ingestedFlushable); ok {
690-
panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
691-
}
692-
}
693-
694-
c.l0Limits = l0Organizer.FlushSplitKeys()
695-
696707
cmp := c.comparer.Compare
697708
updatePointBounds := func(iter internalIterator) {
698709
if kv := iter.First(); kv != nil {
@@ -797,7 +808,7 @@ func (c *compaction) allowZeroSeqNum() bool {
797808
// code doesn't know that L0 contains files and zeroing of seqnums should
798809
// be disabled. That is fixable, but it seems safer to just match the
799810
// RocksDB behavior for now.
800-
return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
811+
return len(c.flush.flushables) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
801812
}
802813

803814
// newInputIters returns an iterator over all the input tables in a compaction.
@@ -811,7 +822,7 @@ func (c *compaction) newInputIters(
811822
cmp := c.comparer.Compare
812823

813824
// Validate the ordering of compaction input files for defense in depth.
814-
if len(c.flushing) == 0 {
825+
if len(c.flush.flushables) == 0 {
815826
if c.startLevel.level >= 0 {
816827
err := manifest.CheckOrdering(c.comparer, manifest.Level(c.startLevel.level),
817828
c.startLevel.files.Iter())
@@ -856,7 +867,7 @@ func (c *compaction) newInputIters(
856867
// numInputLevels is an approximation of the number of iterator levels. Due
857868
// to idiosyncrasies in iterator construction, we may (rarely) exceed this
858869
// initial capacity.
859-
numInputLevels := max(len(c.flushing), len(c.inputs))
870+
numInputLevels := max(len(c.flush.flushables), len(c.inputs))
860871
iters := make([]internalIterator, 0, numInputLevels)
861872
rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
862873
rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
@@ -883,11 +894,10 @@ func (c *compaction) newInputIters(
883894
// Populate iters, rangeDelIters and rangeKeyIters with the appropriate
884895
// constituent iterators. This depends on whether this is a flush or a
885896
// compaction.
886-
if len(c.flushing) != 0 {
897+
if len(c.flush.flushables) != 0 {
887898
// If flushing, we need to build the input iterators over the memtables
888-
// stored in c.flushing.
889-
for i := range c.flushing {
890-
f := c.flushing[i]
899+
// stored in c.flush.flushables.
900+
for _, f := range c.flush.flushables {
891901
iters = append(iters, f.newFlushIter(nil))
892902
rangeDelIter := f.newRangeDelIter(nil)
893903
if rangeDelIter != nil {
@@ -1082,7 +1092,7 @@ func (c *compaction) newRangeDelIter(
10821092
}
10831093

10841094
func (c *compaction) String() string {
1085-
if len(c.flushing) != 0 {
1095+
if len(c.flush.flushables) != 0 {
10861096
return "flush\n"
10871097
}
10881098

@@ -1358,7 +1368,7 @@ func (d *DB) flush() {
13581368
// were ingested as flushables. Both DB.mu and the manifest lock must be held
13591369
// while runIngestFlush is called.
13601370
func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1361-
if len(c.flushing) != 1 {
1371+
if len(c.flush.flushables) != 1 {
13621372
panic("pebble: ingestedFlushable must be flushed one at a time.")
13631373
}
13641374

@@ -1369,7 +1379,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
13691379
baseLevel := d.mu.versions.picker.getBaseLevel()
13701380
ve := &manifest.VersionEdit{}
13711381
var ingestSplitFiles []ingestSplitFile
1372-
ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1382+
ingestFlushable := c.flush.flushables[0].flushable.(*ingestedFlushable)
13731383

13741384
updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
13751385
levelMetrics := c.metrics[level]
@@ -1633,7 +1643,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
16331643
} else {
16341644
// c.kind == compactionKindIngestedFlushable && we could have deleted files due
16351645
// to ingest-time splits or excises.
1636-
ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1646+
ingestFlushable := c.flush.flushables[0].flushable.(*ingestedFlushable)
16371647
exciseBounds := ingestFlushable.exciseSpan.UserKeyBounds()
16381648
for c2 := range d.mu.compact.inProgress {
16391649
// Check if this compaction overlaps with the excise span. Note that just
@@ -2413,7 +2423,7 @@ func checkDeleteCompactionHints(
24132423

24142424
func (d *DB) compactionPprofLabels(c *compaction) pprof.LabelSet {
24152425
activity := "compact"
2416-
if len(c.flushing) != 0 {
2426+
if len(c.flush.flushables) != 0 {
24172427
activity = "flush"
24182428
}
24192429
level := "L?"
@@ -3007,13 +3017,14 @@ func fragmentDeleteCompactionHints(
30073017
func (d *DB) runDeleteOnlyCompaction(
30083018
jobID JobID, c *compaction, snapshots compact.Snapshots,
30093019
) (ve *manifest.VersionEdit, stats compact.Stats, retErr error) {
3010-
fragments := fragmentDeleteCompactionHints(d.cmp, c.deletionHints)
3020+
fragments := fragmentDeleteCompactionHints(d.cmp, c.deleteOnly.hints)
30113021
ve = &manifest.VersionEdit{
30123022
DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
30133023
}
30143024
for _, cl := range c.inputs {
30153025
levelMetrics := &LevelMetrics{}
3016-
if err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.exciseEnabled); err != nil {
3026+
err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.deleteOnly.exciseEnabled)
3027+
if err != nil {
30173028
return nil, stats, err
30183029
}
30193030
c.metrics[cl.level] = levelMetrics
@@ -3245,7 +3256,7 @@ func (d *DB) compactAndWrite(
32453256

32463257
runnerCfg := compact.RunnerConfig{
32473258
CompactionBounds: c.bounds,
3248-
L0SplitKeys: c.l0Limits,
3259+
L0SplitKeys: c.flush.l0Limits,
32493260
Grandparents: c.grandparents,
32503261
MaxGrandparentOverlapBytes: c.maxOverlapBytes,
32513262
TargetOutputFileSize: c.maxOutputFileSize,
@@ -3329,7 +3340,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*manifest.VersionEd
33293340
TableBytesRead: c.outputLevel.files.TableSizeSum(),
33303341
BlobBytesCompacted: result.Stats.CumulativeBlobFileSize,
33313342
}
3332-
if c.flushing != nil {
3343+
if c.flush.flushables != nil {
33333344
outputMetrics.BlobBytesFlushed = result.Stats.CumulativeBlobFileSize
33343345
}
33353346
if len(c.extraLevels) > 0 {
@@ -3338,7 +3349,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*manifest.VersionEd
33383349
outputMetrics.TableBytesRead += outputMetrics.TableBytesIn
33393350

33403351
c.metrics[c.outputLevel.level] = outputMetrics
3341-
if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
3352+
if len(c.flush.flushables) == 0 && c.metrics[c.startLevel.level] == nil {
33423353
c.metrics[c.startLevel.level] = &LevelMetrics{}
33433354
}
33443355
if len(c.extraLevels) > 0 {
@@ -3369,7 +3380,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*manifest.VersionEd
33693380
BlobReferences: t.BlobReferences,
33703381
BlobReferenceDepth: t.BlobReferenceDepth,
33713382
}
3372-
if c.flushing == nil {
3383+
if c.flush.flushables == nil {
33733384
// Set the file's LargestSeqNumAbsolute to be the maximum value of any
33743385
// of the compaction's input sstables.
33753386
// TODO(jackson): This could be narrowed to be the maximum of input
@@ -3408,7 +3419,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*manifest.VersionEd
34083419
}
34093420

34103421
// Update metrics.
3411-
if c.flushing == nil {
3422+
if c.flush.flushables == nil {
34123423
outputMetrics.TablesCompacted++
34133424
outputMetrics.TableBytesCompacted += fileMeta.Size
34143425
} else {

compaction_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,7 +2101,7 @@ func TestCompactionAllowZeroSeqNum(t *testing.T) {
21012101
var buf bytes.Buffer
21022102
for _, line := range crstrings.Lines(td.Input) {
21032103
parts := strings.Fields(line)
2104-
c.flushing = nil
2104+
c.flush.flushables = nil
21052105
c.startLevel.level = -1
21062106

21072107
var startFiles, outputFiles []*manifest.TableMetadata
@@ -2110,7 +2110,7 @@ func TestCompactionAllowZeroSeqNum(t *testing.T) {
21102110
case len(parts) == 1 && parts[0] == "flush":
21112111
c.outputLevel.level = 0
21122112
d.mu.Lock()
2113-
c.flushing = d.mu.mem.queue
2113+
c.flush.flushables = d.mu.mem.queue
21142114
d.mu.Unlock()
21152115

21162116
default:

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2862,7 +2862,7 @@ func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
28622862

28632863
func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
28642864
for c := range d.mu.compact.inProgress {
2865-
if len(c.flushing) == 0 && (finishing == nil || c != finishing) {
2865+
if len(c.flush.flushables) == 0 && (finishing == nil || c != finishing) {
28662866
info := compactionInfo{
28672867
versionEditApplied: c.versionEditApplied,
28682868
inputs: c.inputs,

0 commit comments

Comments
 (0)