Skip to content

Commit dfa7783

Browse files
committed
db: add blob file rewrite compaction
Introduce a new compaction kind: blob-file rewrite compactions. These compactions pick a physical blob file with unreferenced values, rewrite it and output a new physical blob file. This commit adds the compaction and compaction picking mechanics without the sophisticated blob file rewrite. For now these rewrite compactions only copy the blob file verbatim. Informs #112.
1 parent f715521 commit dfa7783

23 files changed

+679
-105
lines changed

blob_rewrite.go

Lines changed: 356 additions & 58 deletions
Large diffs are not rendered by default.

blob_rewrite_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,14 +194,15 @@ func TestBlobRewrite(t *testing.T) {
194194

195195
blobWriter := blob.NewFileWriter(fn, outputWritable, blob.FileWriterOptions{})
196196
rewriter := newBlobFileRewriter(mockFC, block.ReadEnv{}, blobWriter, sstables, inputBlob)
197-
err = rewriter.Rewrite()
197+
stats, err := rewriter.Rewrite(context.Background())
198198
if err != nil {
199199
fmt.Fprintf(&buf, "rewrite error: %v\n", err)
200200
} else {
201201
fmt.Fprintf(&buf, "Successfully rewrote blob file %s to %s\n",
202202
targetBlobFileStr, fn.String())
203203
fmt.Fprintf(&buf, "Input SSTables: %v\n", sstableFileNums)
204204
fmt.Fprintf(&buf, "SSTables with blob references: %d\n", len(sstables))
205+
fmt.Fprintln(&buf, stats)
205206
}
206207

207208
return buf.String()

compaction.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ const (
151151
compactionKindTombstoneDensity
152152
compactionKindRewrite
153153
compactionKindIngestedFlushable
154+
compactionKindBlobFileRewrite
154155
)
155156

156157
func (k compactionKind) String() string {
@@ -175,6 +176,8 @@ func (k compactionKind) String() string {
175176
return "ingested-flushable"
176177
case compactionKindCopy:
177178
return "copy"
179+
case compactionKindBlobFileRewrite:
180+
return "blob-file-rewrite"
178181
}
179182
return "?"
180183
}
@@ -410,8 +413,9 @@ func (c *tableCompaction) IsFlush() bool { return len(c.flu
410413
func (c *tableCompaction) Info() compactionInfo {
411414
info := compactionInfo{
412415
versionEditApplied: c.versionEditApplied,
416+
kind: c.kind,
413417
inputs: c.inputs,
414-
bounds: c.bounds,
418+
bounds: &c.bounds,
415419
outputLevel: -1,
416420
}
417421
if c.outputLevel != nil {
@@ -1774,7 +1778,8 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17741778
// doing a [c,d) excise at the same time as this compaction, we will have
17751779
// to error out the whole compaction as we can't guarantee it hasn't/won't
17761780
// write a file overlapping with the excise span.
1777-
if c2.Bounds().Overlaps(d.cmp, &exciseBounds) {
1781+
bounds := c2.Bounds()
1782+
if bounds != nil && bounds.Overlaps(d.cmp, &exciseBounds) {
17781783
c2.Cancel()
17791784
}
17801785
}
@@ -3623,6 +3628,8 @@ func getDiskWriteCategoryForCompaction(opts *Options, kind compactionKind) vfs.D
36233628
return "sql-row-spill"
36243629
} else if kind == compactionKindFlush {
36253630
return "pebble-memtable-flush"
3631+
} else if kind == compactionKindBlobFileRewrite {
3632+
return "pebble-blob-file-rewrite"
36263633
} else {
36273634
return "pebble-compaction"
36283635
}

compaction_picker.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,13 @@ type compactionInfo struct {
9393
// been committed. The compaction may still be in-progress deleting newly
9494
// obsolete files.
9595
versionEditApplied bool
96-
inputs []compactionLevel
97-
outputLevel int
98-
bounds base.UserKeyBounds
96+
// kind indicates the kind of compaction.
97+
kind compactionKind
98+
inputs []compactionLevel
99+
outputLevel int
100+
// bounds may be nil if the compaction does not involve sstables
101+
// (specifically, a blob file rewrite).
102+
bounds *base.UserKeyBounds
99103
}
100104

101105
func (info compactionInfo) String() string {
@@ -1448,6 +1452,12 @@ func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc picked
14481452
return pc
14491453
}
14501454

1455+
// Check for blob file rewrites. These are low-priority compactions because
1456+
// they don't help us keep up with writes, just reclaim disk space.
1457+
if pc := p.pickBlobFileRewriteCompaction(env); pc != nil {
1458+
return pc
1459+
}
1460+
14511461
if pc := p.pickReadTriggeredCompaction(env); pc != nil {
14521462
return pc
14531463
}
@@ -1662,6 +1672,51 @@ func (p *compactionPickerByScore) pickRewriteCompaction(
16621672
return nil
16631673
}
16641674

1675+
// pickBlobFileRewriteCompaction looks for compactions of blob files that
1676+
// can be rewritten to reclaim disk space.
1677+
func (p *compactionPickerByScore) pickBlobFileRewriteCompaction(
1678+
env compactionEnv,
1679+
) (pc *pickedBlobFileCompaction) {
1680+
aggregateStats, heuristicStats := p.latestVersionState.blobFiles.Stats()
1681+
if heuristicStats.CountFilesEligible == 0 && heuristicStats.CountFilesTooRecent == 0 {
1682+
// No blob files with any garbage to rewrite.
1683+
return nil
1684+
}
1685+
policy := p.opts.Experimental.ValueSeparationPolicy()
1686+
if policy.TargetGarbageRatio >= 1.0 {
1687+
// Blob file rewrite compactions are disabled.
1688+
return nil
1689+
}
1690+
garbagePct := float64(aggregateStats.ValueSize-aggregateStats.ReferencedValueSize) /
1691+
float64(aggregateStats.ValueSize)
1692+
if garbagePct <= policy.TargetGarbageRatio {
1693+
// Not enough garbage to warrant a rewrite compaction.
1694+
return nil
1695+
}
1696+
1697+
// Check if there is an ongoing blob file rewrite compaction. If there is,
1698+
// don't schedule a new one.
1699+
for _, c := range env.inProgressCompactions {
1700+
if c.kind == compactionKindBlobFileRewrite {
1701+
return nil
1702+
}
1703+
}
1704+
1705+
candidate, ok := p.latestVersionState.blobFiles.ReplacementCandidate()
1706+
if !ok {
1707+
// None meet the heuristic.
1708+
return nil
1709+
}
1710+
// Add a reference to the version. The compaction will release the reference
1711+
// when it completes.
1712+
p.vers.Ref()
1713+
return &pickedBlobFileCompaction{
1714+
vers: p.vers,
1715+
file: candidate,
1716+
referencingTables: p.latestVersionState.blobFiles.ReferencingTables(candidate.FileID),
1717+
}
1718+
}
1719+
16651720
// pickTombstoneDensityCompaction looks for a compaction that eliminates
16661721
// regions of extremely high point tombstone density. For each level, it picks
16671722
// a file where the ratio of tombstone-dense blocks is at least

compaction_picker_test.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,13 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
267267
break
268268
}
269269
fmt.Fprintf(&b, "L%d->L%d: %.1f\n", pc.startLevel.level, pc.outputLevel.level, pc.score)
270+
bounds := pc.bounds.Clone()
270271
inProgress = append(inProgress, compactionInfo{
271272
inputs: pc.inputs,
272273
outputLevel: pc.outputLevel.level,
273-
bounds: pc.bounds.Clone(),
274+
bounds: &bounds,
274275
})
276+
275277
if pc.outputLevel.level == 0 {
276278
// Once we pick one L0->L0 compaction, we'll keep on doing so
277279
// because the test isn't marking files as Compacting.
@@ -520,7 +522,13 @@ func TestCompactionPickerL0(t *testing.T) {
520522
return fmt.Sprintf("cannot find compaction file %s", tableNum)
521523
}
522524
compactFile.CompactionState = manifest.CompactionStateCompacting
523-
info.bounds = info.bounds.Union(DefaultComparer.Compare, compactFile.UserKeyBounds())
525+
var bounds base.UserKeyBounds
526+
if info.bounds != nil {
527+
bounds = info.bounds.Union(DefaultComparer.Compare, compactFile.UserKeyBounds())
528+
} else {
529+
bounds = compactFile.UserKeyBounds()
530+
}
531+
info.bounds = &bounds
524532
compactionFiles[level] = append(compactionFiles[level], compactFile)
525533
}
526534
}
@@ -745,7 +753,13 @@ func TestCompactionPickerConcurrency(t *testing.T) {
745753
return fmt.Sprintf("cannot find compaction file %s", tableNum)
746754
}
747755
compactFile.CompactionState = manifest.CompactionStateCompacting
748-
info.bounds = info.bounds.Union(DefaultComparer.Compare, compactFile.UserKeyBounds())
756+
var bounds base.UserKeyBounds
757+
if info.bounds != nil {
758+
bounds = info.bounds.Union(DefaultComparer.Compare, compactFile.UserKeyBounds())
759+
} else {
760+
bounds = compactFile.UserKeyBounds()
761+
}
762+
info.bounds = &bounds
749763
compactionFiles[level] = append(compactionFiles[level], compactFile)
750764
}
751765
}

compaction_scheduler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,12 @@ func init() {
142142
compactionOptionalAndPriority{optional: true, priority: 60}
143143
scheduledCompactionMap[compactionKindElisionOnly] =
144144
compactionOptionalAndPriority{optional: true, priority: 50}
145-
scheduledCompactionMap[compactionKindRead] =
145+
scheduledCompactionMap[compactionKindBlobFileRewrite] =
146146
compactionOptionalAndPriority{optional: true, priority: 40}
147-
scheduledCompactionMap[compactionKindRewrite] =
147+
scheduledCompactionMap[compactionKindRead] =
148148
compactionOptionalAndPriority{optional: true, priority: 30}
149+
scheduledCompactionMap[compactionKindRewrite] =
150+
compactionOptionalAndPriority{optional: true, priority: 20}
149151
}
150152

151153
// noopGrantHandle is used in cases that don't interact with a CompactionScheduler.

data_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,8 +1781,8 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error {
17811781
opts.TargetFileSizes[i] = opts.TargetFileSizes[i-1] * 2
17821782
}
17831783
case "value-separation":
1784-
if len(cmdArg.Vals) != 3 {
1785-
return errors.New("value-separation-policy expects 3 arguments: (enabled, minimum-size, max-blob-reference-depth)")
1784+
if len(cmdArg.Vals) != 5 {
1785+
return errors.New("value-separation expects 5 arguments: (enabled, minimum-size, max-blob-reference-depth, rewrite-minimum-age, target-garbage-ratio)")
17861786
}
17871787
var policy ValueSeparationPolicy
17881788
var err error
@@ -1798,6 +1798,14 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error {
17981798
if err != nil {
17991799
return err
18001800
}
1801+
policy.RewriteMinimumAge, err = time.ParseDuration(cmdArg.Vals[3])
1802+
if err != nil {
1803+
return err
1804+
}
1805+
policy.TargetGarbageRatio, err = strconv.ParseFloat(cmdArg.Vals[4], 64)
1806+
if err != nil {
1807+
return err
1808+
}
18011809
opts.Experimental.ValueSeparationPolicy = func() ValueSeparationPolicy {
18021810
return policy
18031811
}

db.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2860,7 +2860,7 @@ func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
28602860
return seqNum
28612861
}
28622862

2863-
func (d *DB) getInProgressCompactionInfoLocked(finishing *tableCompaction) (rv []compactionInfo) {
2863+
func (d *DB) getInProgressCompactionInfoLocked(finishing compaction) (rv []compactionInfo) {
28642864
for c := range d.mu.compact.inProgress {
28652865
if !c.IsFlush() && (finishing == nil || c != finishing) {
28662866
rv = append(rv, c.Info())
@@ -2888,7 +2888,7 @@ func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compactio
28882888
continue
28892889
}
28902890
compactions = append(compactions, manifest.L0Compaction{
2891-
Bounds: info.bounds,
2891+
Bounds: *info.bounds,
28922892
IsIntraL0: info.outputLevel == 0,
28932893
})
28942894
}

event.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,66 @@ func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
139139
w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
140140
}
141141

142+
// BlobFileRewriteInfo contains the info for a blob file rewrite event.
143+
type BlobFileRewriteInfo struct {
144+
// JobID is the ID of the job.
145+
JobID int
146+
// Input contains the input tables for the compaction organized by level.
147+
Input BlobFileInfo
148+
// Output contains the output tables generated by the compaction. The output
149+
// info is empty for the compaction begin event.
150+
Output BlobFileInfo
151+
// Duration is the time spent compacting, including reading and writing
152+
// files.
153+
Duration time.Duration
154+
// TotalDuration is the total wall-time duration of the compaction,
155+
// including applying the compaction to the database. TotalDuration is
156+
// always ≥ Duration.
157+
TotalDuration time.Duration
158+
Done bool
159+
// Err is set only if Done is true. If non-nil, indicates that the compaction
160+
// failed. Note that err can be ErrCancelledCompaction, which can happen
161+
// during normal operation.
162+
Err error
163+
}
164+
165+
func (i BlobFileRewriteInfo) String() string {
166+
return redact.StringWithoutMarkers(i)
167+
}
168+
169+
// SafeFormat implements redact.SafeFormatter.
170+
func (i BlobFileRewriteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
171+
if i.Err != nil {
172+
w.Printf("[JOB %d] blob file (%s, %s) rewrite error: %s",
173+
redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum, i.Err)
174+
return
175+
}
176+
177+
if !i.Done {
178+
w.Printf("[JOB %d] rewriting blob file %s (physical file %s)",
179+
redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum)
180+
return
181+
}
182+
w.Printf("[JOB %d] rewrote blob file (%s, %s) -> (%s, %s), in %.1fs (%.1fs total)",
183+
redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum,
184+
i.Output.BlobFileID, i.Output.DiskFileNum,
185+
redact.Safe(i.Duration.Seconds()),
186+
redact.Safe(i.TotalDuration.Seconds()))
187+
}
188+
189+
// BlobFileInfo describes a blob file.
190+
type BlobFileInfo struct {
191+
// BlobFileID is the logical ID of the blob file.
192+
BlobFileID base.BlobFileID
193+
// DiskFileNum is the file number of the blob file on disk.
194+
DiskFileNum base.DiskFileNum
195+
// Size is the physical size of the file in bytes.
196+
Size uint64
197+
// ValueSize is the pre-compressed size of the values in the blob file in
198+
// bytes.
199+
ValueSize uint64
200+
}
201+
142202
// CompactionInfo contains the info for a compaction event.
143203
type CompactionInfo struct {
144204
// JobID is the ID of the compaction job.
@@ -802,6 +862,12 @@ type EventListener struct {
802862
// BlobFileDeleted is invoked after a blob file has been deleted.
803863
BlobFileDeleted func(BlobFileDeleteInfo)
804864

865+
// BlobFileRewriteBegin is invoked when a blob file rewrite compaction begins.
866+
BlobFileRewriteBegin func(BlobFileRewriteInfo)
867+
868+
// BlobFileRewriteEnd is invoked when a blob file rewrite compaction ends.
869+
BlobFileRewriteEnd func(BlobFileRewriteInfo)
870+
805871
// DataCorruption is invoked when an on-disk corruption is detected. It should
806872
// not block, as it is called synchronously in read paths.
807873
DataCorruption func(DataCorruptionInfo)
@@ -907,6 +973,12 @@ func (l *EventListener) EnsureDefaults(logger Logger) {
907973
if l.BlobFileDeleted == nil {
908974
l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
909975
}
976+
if l.BlobFileRewriteBegin == nil {
977+
l.BlobFileRewriteBegin = func(info BlobFileRewriteInfo) {}
978+
}
979+
if l.BlobFileRewriteEnd == nil {
980+
l.BlobFileRewriteEnd = func(info BlobFileRewriteInfo) {}
981+
}
910982
if l.DataCorruption == nil {
911983
if logger != nil {
912984
l.DataCorruption = func(info DataCorruptionInfo) {
@@ -998,6 +1070,12 @@ func MakeLoggingEventListener(logger Logger) EventListener {
9981070
BlobFileDeleted: func(info BlobFileDeleteInfo) {
9991071
logger.Infof("%s", info)
10001072
},
1073+
BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1074+
logger.Infof("%s", info)
1075+
},
1076+
BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1077+
logger.Infof("%s", info)
1078+
},
10011079
DataCorruption: func(info DataCorruptionInfo) {
10021080
logger.Errorf("%s", info)
10031081
},
@@ -1084,6 +1162,14 @@ func TeeEventListener(a, b EventListener) EventListener {
10841162
a.BlobFileDeleted(info)
10851163
b.BlobFileDeleted(info)
10861164
},
1165+
BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1166+
a.BlobFileRewriteBegin(info)
1167+
b.BlobFileRewriteBegin(info)
1168+
},
1169+
BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1170+
a.BlobFileRewriteEnd(info)
1171+
b.BlobFileRewriteEnd(info)
1172+
},
10871173
DataCorruption: func(info DataCorruptionInfo) {
10881174
a.DataCorruption(info)
10891175
b.DataCorruption(info)

ingest.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,8 @@ func ingestTargetLevel(
10341034
if tblCompaction.outputLevel == nil || level != tblCompaction.outputLevel.level {
10351035
continue
10361036
}
1037-
if metaBounds.Overlaps(cmp, tblCompaction.Bounds()) {
1037+
bounds := tblCompaction.Bounds()
1038+
if bounds != nil && metaBounds.Overlaps(cmp, bounds) {
10381039
overlaps = true
10391040
break
10401041
}
@@ -2115,7 +2116,8 @@ func (d *DB) ingestApply(
21152116
// doing a [c,d) excise at the same time as this compaction, we will have
21162117
// to error out the whole compaction as we can't guarantee it hasn't/won't
21172118
// write a file overlapping with the excise span.
2118-
if c.Bounds().Overlaps(d.cmp, &exciseBounds) {
2119+
bounds := c.Bounds()
2120+
if bounds != nil && bounds.Overlaps(d.cmp, &exciseBounds) {
21192121
c.Cancel()
21202122
}
21212123
// Check if this compaction's inputs have been replaced due to an

0 commit comments

Comments
 (0)