Skip to content

Commit 4703883

Browse files
committed
db: add output blobs, pct likely mvcc garbage to flush and compact events
Fixes: #5273
1 parent e26ec98 commit 4703883

File tree

8 files changed

+179
-51
lines changed

8 files changed

+179
-51
lines changed

compaction.go

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1739,14 +1739,15 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17391739

17401740
var ve *manifest.VersionEdit
17411741
var stats compact.Stats
1742+
var outputBlobs []compact.OutputBlob
17421743
// To determine the target level of the files in the ingestedFlushable, we
17431744
// need to acquire the logLock, and not release it for that duration. Since
17441745
// UpdateVersionLocked acquires it anyway, we create the VersionEdit for
17451746
// ingestedFlushable outside runCompaction. For all other flush cases, we
17461747
// construct the VersionEdit inside runCompaction.
17471748
var compactionErr error
17481749
if c.kind != compactionKindIngestedFlushable {
1749-
ve, stats, compactionErr = d.runCompaction(jobID, c)
1750+
ve, stats, outputBlobs, compactionErr = d.runCompaction(jobID, c)
17501751
}
17511752

17521753
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
@@ -1762,13 +1763,24 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17621763
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
17631764
for i := range ve.NewTables {
17641765
e := &ve.NewTables[i]
1765-
info.Output = append(info.Output, e.Meta.TableInfo())
1766+
info.OutputTables = append(info.OutputTables, e.Meta.TableInfo())
17661767
// Ingested tables are not necessarily flushed to L0. Record the level of
17671768
// each ingested file explicitly.
17681769
if ingest {
17691770
info.IngestLevels = append(info.IngestLevels, e.Level)
17701771
}
17711772
}
1773+
for i := range outputBlobs {
1774+
b := &outputBlobs[i]
1775+
info.OutputBlobs = append(info.OutputBlobs, BlobFileInfo{
1776+
BlobFileID: base.BlobFileID(b.Metadata.FileNum),
1777+
DiskFileNum: b.ObjMeta.DiskFileNum,
1778+
Size: b.Metadata.Size,
1779+
ValueSize: b.Stats.UncompressedValueBytes,
1780+
MVCCGarbageSize: b.Stats.MVCCGarbageBytes,
1781+
})
1782+
1783+
}
17721784

17731785
// The flush succeeded or it produced an empty sstable. In either case we
17741786
// want to bump the minimum unflushed log number to the log number of the
@@ -2721,7 +2733,7 @@ func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
27212733
d.opts.EventListener.CompactionBegin(info)
27222734
startTime := d.timeNow()
27232735

2724-
ve, stats, err := d.runCompaction(jobID, c)
2736+
ve, stats, outputBlobs, err := d.runCompaction(jobID, c)
27252737

27262738
info.Annotations = append(info.Annotations, c.annotations...)
27272739
info.Duration = d.timeNow().Sub(startTime)
@@ -2760,6 +2772,16 @@ func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
27602772
e := &ve.NewTables[i]
27612773
info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
27622774
}
2775+
for i := range outputBlobs {
2776+
b := &outputBlobs[i]
2777+
info.Output.Blobs = append(info.Output.Blobs, BlobFileInfo{
2778+
BlobFileID: base.BlobFileID(b.Metadata.FileNum),
2779+
DiskFileNum: b.ObjMeta.DiskFileNum,
2780+
Size: b.Metadata.Size,
2781+
ValueSize: b.Stats.UncompressedValueBytes,
2782+
MVCCGarbageSize: b.Stats.MVCCGarbageBytes,
2783+
})
2784+
}
27632785
d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
27642786
d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
27652787
d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
@@ -2798,17 +2820,17 @@ func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
27982820
// doing IO.
27992821
func (d *DB) runCopyCompaction(
28002822
jobID JobID, c *tableCompaction,
2801-
) (ve *manifest.VersionEdit, stats compact.Stats, _ error) {
2823+
) (ve *manifest.VersionEdit, stats compact.Stats, blobs []compact.OutputBlob, _ error) {
28022824
if c.cancel.Load() {
2803-
return nil, compact.Stats{}, ErrCancelledCompaction
2825+
return nil, compact.Stats{}, blobs, ErrCancelledCompaction
28042826
}
28052827
iter := c.startLevel.files.Iter()
28062828
inputMeta := iter.First()
28072829
if iter.Next() != nil {
2808-
return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
2830+
return nil, compact.Stats{}, []compact.OutputBlob{}, base.AssertionFailedf("got more than one file for a move compaction")
28092831
}
28102832
if inputMeta.BlobReferenceDepth > 0 || len(inputMeta.BlobReferences) > 0 {
2811-
return nil, compact.Stats{}, base.AssertionFailedf(
2833+
return nil, compact.Stats{}, []compact.OutputBlob{}, base.AssertionFailedf(
28122834
"copy compaction for %s with blob references (depth=%d, refs=%d)",
28132835
inputMeta.TableNum, inputMeta.BlobReferenceDepth, len(inputMeta.BlobReferences),
28142836
)
@@ -2821,11 +2843,11 @@ func (d *DB) runCopyCompaction(
28212843

28222844
objMeta, err := d.objProvider.Lookup(base.FileTypeTable, inputMeta.TableBacking.DiskFileNum)
28232845
if err != nil {
2824-
return nil, compact.Stats{}, err
2846+
return nil, compact.Stats{}, []compact.OutputBlob{}, err
28252847
}
28262848
// This code does not support copying a shared table (which should never be necessary).
28272849
if objMeta.IsShared() {
2828-
return nil, compact.Stats{}, base.AssertionFailedf("copy compaction of shared table")
2850+
return nil, compact.Stats{}, []compact.OutputBlob{}, base.AssertionFailedf("copy compaction of shared table")
28292851
}
28302852

28312853
// We are in the relatively more complex case where we need to copy this
@@ -2885,7 +2907,7 @@ func (d *DB) runCopyCompaction(
28852907
ctx, base.FileTypeTable, inputMeta.TableBacking.DiskFileNum, objstorage.OpenOptions{},
28862908
)
28872909
if err != nil {
2888-
return nil, compact.Stats{}, err
2910+
return nil, compact.Stats{}, []compact.OutputBlob{}, err
28892911
}
28902912
defer func() {
28912913
if src != nil {
@@ -2900,7 +2922,7 @@ func (d *DB) runCopyCompaction(
29002922
},
29012923
)
29022924
if err != nil {
2903-
return nil, compact.Stats{}, err
2925+
return nil, compact.Stats{}, []compact.OutputBlob{}, err
29042926
}
29052927
deleteOnExit = true
29062928

@@ -2940,9 +2962,9 @@ func (d *DB) runCopyCompaction(
29402962
outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
29412963
outputMetrics.TableBytesIn = inputMeta.Size
29422964

2943-
return ve, compact.Stats{}, nil
2965+
return ve, compact.Stats{}, []compact.OutputBlob{}, nil
29442966
}
2945-
return nil, compact.Stats{}, err
2967+
return nil, compact.Stats{}, []compact.OutputBlob{}, err
29462968
}
29472969
newMeta.TableBacking.Size = wrote
29482970
newMeta.Size = wrote
@@ -2951,7 +2973,7 @@ func (d *DB) runCopyCompaction(
29512973
d.objProvider.Path(objMeta), base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
29522974
objstorage.CreateOptions{PreferSharedStorage: true})
29532975
if err != nil {
2954-
return nil, compact.Stats{}, err
2976+
return nil, compact.Stats{}, []compact.OutputBlob{}, err
29552977
}
29562978
deleteOnExit = true
29572979
}
@@ -2968,10 +2990,10 @@ func (d *DB) runCopyCompaction(
29682990
outputMetrics.TablesCompacted = 1
29692991

29702992
if err := d.objProvider.Sync(); err != nil {
2971-
return nil, compact.Stats{}, err
2993+
return nil, compact.Stats{}, []compact.OutputBlob{}, err
29722994
}
29732995
deleteOnExit = false
2974-
return ve, compact.Stats{}, nil
2996+
return ve, compact.Stats{}, []compact.OutputBlob{}, nil
29752997
}
29762998

29772999
// applyHintOnFile applies a deleteCompactionHint to a file, and updates the
@@ -3138,7 +3160,7 @@ func fragmentDeleteCompactionHints(
31383160
// d.mu must *not* be held when calling this.
31393161
func (d *DB) runDeleteOnlyCompaction(
31403162
jobID JobID, c *tableCompaction, snapshots compact.Snapshots,
3141-
) (ve *manifest.VersionEdit, stats compact.Stats, retErr error) {
3163+
) (ve *manifest.VersionEdit, stats compact.Stats, blobs []compact.OutputBlob, retErr error) {
31423164
fragments := fragmentDeleteCompactionHints(d.cmp, c.deleteOnly.hints)
31433165
ve = &manifest.VersionEdit{
31443166
DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
@@ -3147,7 +3169,7 @@ func (d *DB) runDeleteOnlyCompaction(
31473169
levelMetrics := c.metrics.perLevel.level(cl.level)
31483170
err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.deleteOnly.exciseEnabled)
31493171
if err != nil {
3150-
return nil, stats, err
3172+
return nil, stats, blobs, err
31513173
}
31523174
}
31533175
// Remove any files that were added and deleted in the same versionEdit.
@@ -3197,19 +3219,19 @@ func (d *DB) runDeleteOnlyCompaction(
31973219
// Refresh the disk available statistic whenever a compaction/flush
31983220
// completes, before re-acquiring the mutex.
31993221
d.calculateDiskAvailableBytes()
3200-
return ve, stats, nil
3222+
return ve, stats, blobs, nil
32013223
}
32023224

32033225
func (d *DB) runMoveCompaction(
32043226
jobID JobID, c *tableCompaction,
3205-
) (ve *manifest.VersionEdit, stats compact.Stats, _ error) {
3227+
) (ve *manifest.VersionEdit, stats compact.Stats, blobs []compact.OutputBlob, _ error) {
32063228
iter := c.startLevel.files.Iter()
32073229
meta := iter.First()
32083230
if iter.Next() != nil {
3209-
return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
3231+
return nil, stats, blobs, base.AssertionFailedf("got more than one file for a move compaction")
32103232
}
32113233
if c.cancel.Load() {
3212-
return ve, stats, ErrCancelledCompaction
3234+
return ve, stats, blobs, ErrCancelledCompaction
32133235
}
32143236
outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
32153237
outputMetrics.TableBytesMoved = meta.Size
@@ -3223,7 +3245,7 @@ func (d *DB) runMoveCompaction(
32233245
},
32243246
}
32253247

3226-
return ve, stats, nil
3248+
return ve, stats, blobs, nil
32273249
}
32283250

32293251
// runCompaction runs a compaction that produces new on-disk tables from
@@ -3235,9 +3257,14 @@ func (d *DB) runMoveCompaction(
32353257
// re-acquired during the course of this method.
32363258
func (d *DB) runCompaction(
32373259
jobID JobID, c *tableCompaction,
3238-
) (ve *manifest.VersionEdit, stats compact.Stats, retErr error) {
3260+
) (
3261+
ve *manifest.VersionEdit,
3262+
stats compact.Stats,
3263+
outputBlobs []compact.OutputBlob,
3264+
retErr error,
3265+
) {
32393266
if c.cancel.Load() {
3240-
return ve, stats, ErrCancelledCompaction
3267+
return ve, stats, outputBlobs, ErrCancelledCompaction
32413268
}
32423269
switch c.kind {
32433270
case compactionKindDeleteOnly:
@@ -3300,7 +3327,7 @@ func (d *DB) runCompaction(
33003327
// Refresh the disk available statistic whenever a compaction/flush
33013328
// completes, before re-acquiring the mutex.
33023329
d.calculateDiskAvailableBytes()
3303-
return ve, result.Stats, result.Err
3330+
return ve, result.Stats, result.Blobs, result.Err
33043331
}
33053332

33063333
// compactAndWrite runs the data part of a compaction, where we set up a

compaction_test.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -912,27 +912,32 @@ func runCompactionTest(
912912
}
913913
}()
914914
var compactionLog bytes.Buffer
915-
eventListener := TeeEventListener(
916-
MakeLoggingEventListener(&dbLog),
917-
EventListener{
918-
CompactionEnd: func(info CompactionInfo) {
919-
// Ensure determinism.
920-
info.JobID = 1
921-
info.Duration = time.Second
922-
info.TotalDuration = time.Second
923-
fmt.Fprintln(&compactionLog, info.String())
924-
},
915+
var flushLog bytes.Buffer
916+
logEventListener := &EventListener{
917+
CompactionEnd: func(info CompactionInfo) {
918+
// Ensure determinism.
919+
info.JobID = 1
920+
info.Duration = time.Second
921+
info.TotalDuration = time.Second
922+
fmt.Fprintln(&compactionLog, info.String())
925923
},
926-
)
927-
924+
FlushEnd: func(info FlushInfo) {
925+
// Ensure determinism.
926+
info.JobID = 1
927+
info.Duration = time.Second
928+
info.TotalDuration = time.Second
929+
info.InputBytes = 100
930+
fmt.Fprintln(&flushLog, info.String())
931+
},
932+
}
928933
concurrencyLow, concurrencyHigh := 1, 1
929934
mkOpts := func() *Options {
930935
randVersion := FormatMajorVersion(int(minVersion) + rng.IntN(int(maxVersion)-int(minVersion)+1))
931936
opts := &Options{
932937
FS: vfs.NewMem(),
933938
DebugCheck: DebugCheckLevels,
934939
DisableAutomaticCompactions: true,
935-
EventListener: &eventListener,
940+
EventListener: logEventListener,
936941
FormatMajorVersion: randVersion,
937942
Logger: testutils.Logger{T: t},
938943
Comparer: cmp,
@@ -1440,6 +1445,24 @@ func runCompactionTest(
14401445
}
14411446
return s
14421447

1448+
case "flush-log":
1449+
defer flushLog.Reset()
1450+
s := flushLog.String()
1451+
if td.HasArg("sort") {
1452+
lines := strings.Split(s, "\n")
1453+
sort.Strings(lines)
1454+
// Remove empty lines.
1455+
i := 0
1456+
for ; i < len(lines); i++ {
1457+
if len(lines[i]) != 0 {
1458+
break
1459+
}
1460+
}
1461+
lines = lines[i:]
1462+
s = strings.Join(lines, "\n")
1463+
}
1464+
return s
1465+
14431466
default:
14441467
return fmt.Sprintf("unknown command: %s", td.Cmd)
14451468
}

data_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
947947
// to the user-defined boundaries.
948948
c.maxOutputFileSize = math.MaxUint64
949949

950-
newVE, _, err := d.runCompaction(0, c)
950+
newVE, _, _, err := d.runCompaction(0, c)
951951
if err != nil {
952952
return err
953953
}

0 commit comments

Comments
 (0)