Skip to content

Commit da6410b

Browse files
committed
manifest: track current blob file state in CurrentBlobFileSet
Previously although machinery existed to support removal of blob files in a VersionEdit's DeletedBlobFiles, the field was never populated because there was no logic to notice that the final reference to a blob file was removed. This commit introduces a new type CurrentBlobFileSet that tracks the set of blob files that are referenced within the latest Version of the LSM. The versionSet is updated to maintain a CurrentBlobFileSet. Before applying a new version edit, we pass the version edit through CurrentBlobFileSet's ApplyAndUpdateVersionEdit. This method updates the CurrentBlobFileSet's accounting of all extant references to blob files and adds new entries to VersionEdit.DeletedBlobFiles if any blob file is now unreferenced. In the future CurrentBlobFileSet's index of blob files can be used to pick blob files to be re-written in place to reduce space amplification. Informs #112.
1 parent 42bd015 commit da6410b

File tree

7 files changed

+414
-140
lines changed

7 files changed

+414
-140
lines changed

db.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2061,7 +2061,12 @@ func (d *DB) Metrics() *Metrics {
20612061
backingCount, backingTotalSize := d.mu.versions.virtualBackings.Stats()
20622062
metrics.Table.BackingTableCount = uint64(backingCount)
20632063
metrics.Table.BackingTableSize = backingTotalSize
2064+
blobStats := d.mu.versions.blobFiles.Stats()
20642065
d.mu.versions.logUnlock()
2066+
metrics.BlobFiles.LiveCount = blobStats.Count
2067+
metrics.BlobFiles.LiveSize = blobStats.PhysicalSize
2068+
metrics.BlobFiles.ValueSize = blobStats.ValueSize
2069+
metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
20652070

20662071
metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
20672072
if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {

internal/manifest/blob_metadata.go

Lines changed: 207 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
package manifest
66

77
import (
8+
stdcmp "cmp"
9+
"fmt"
10+
"slices"
811
"sync/atomic"
912

1013
"github.com/cockroachdb/errors"
@@ -57,55 +60,6 @@ type BlobFileMetadata struct {
5760
// referencing TableMetadata is installed in a Version and decremented when
5861
// that TableMetadata becomes obsolete.
5962
refs atomic.Int32
60-
61-
// ActiveRefs holds state that describes the latest (the 'live') Version
62-
// containing this blob file and the references to the file within that
63-
// version.
64-
ActiveRefs BlobFileActiveRefs
65-
}
66-
67-
// BlobFileActiveRefs describes the state of a blob file within the latest
68-
// Version and the references to the file within that version.
69-
type BlobFileActiveRefs struct {
70-
// count holds the number of tables in the latest version that reference
71-
// this blob file. When this reference count falls to zero, the blob file is
72-
// either a zombie file (if BlobFileMetadata.refs > 0), or obsolete and
73-
// ready to be deleted.
74-
//
75-
// INVARIANT: BlobFileMetadata.refs > BlobFileMetadata.ActiveRefs.count
76-
count int32
77-
// valueSize is the sum of the length of uncompressed values in this blob
78-
// file that are still live (i.e., referred to by sstables in the latest
79-
// version).
80-
valueSize uint64
81-
}
82-
83-
// AddRef records a new reference to the blob file from a sstable in the latest
84-
// Version. The provided valueSize should be the value of the sstable's
85-
// BlobReference.ValueSize.
86-
//
87-
// Requires the manifest logLock be held.
88-
func (r *BlobFileActiveRefs) AddRef(valueSize uint64) {
89-
r.count++
90-
r.valueSize += valueSize
91-
}
92-
93-
// RemoveRef removes a reference that no longer exists in the latest Version.
94-
// The provided valueSize should be the value of the removed sstable's
95-
// BlobReference.ValueSize.
96-
//
97-
// Requires the manifest logLock be held.
98-
func (r *BlobFileActiveRefs) RemoveRef(valueSize uint64) {
99-
if invariants.Enabled {
100-
if r.count <= 0 {
101-
panic(errors.AssertionFailedf("pebble: negative active ref count"))
102-
}
103-
if valueSize > r.valueSize {
104-
panic(errors.AssertionFailedf("pebble: negative active value size"))
105-
}
106-
}
107-
r.count--
108-
r.valueSize -= valueSize
10963
}
11064

11165
// SafeFormat implements redact.SafeFormatter.
@@ -244,3 +198,207 @@ func (br BlobReferences) IDByFileNum(fileNum base.DiskFileNum) (blob.ReferenceID
244198
}
245199
return blob.ReferenceID(len(br)), false
246200
}
201+
202+
// AggregateBlobFileStats records cumulative stats across blob files.
203+
type AggregateBlobFileStats struct {
204+
// Count is the number of blob files in the set.
205+
Count uint64
206+
// PhysicalSize is the sum of the size of all blob files in the set. This
207+
// is the size of the blob files on physical storage. Data within blob files
208+
// is compressed, so this value may be less than ValueSize.
209+
PhysicalSize uint64
210+
// ValueSize is the sum of the length of the uncompressed values in all blob
211+
// files in the set.
212+
ValueSize uint64
213+
// ReferencedValueSize is the sum of the length of the uncompressed values
214+
// in all blob files in the set that are still referenced by live tables
215+
// (i.e., in the latest version).
216+
ReferencedValueSize uint64
217+
// ReferencesCount is the total number of tracked references in live tables
218+
// (i.e., in the latest version). When virtual sstables are present, this
219+
// count is per-virtual sstable (not per backing physical sstable).
220+
ReferencesCount uint64
221+
}
222+
223+
// String implements fmt.Stringer.
224+
func (s AggregateBlobFileStats) String() string {
225+
return fmt.Sprintf("Files:{Count: %d, Size: %d, ValueSize: %d}, References:{ValueSize: %d, Count: %d}",
226+
s.Count, s.PhysicalSize, s.ValueSize, s.ReferencedValueSize, s.ReferencesCount)
227+
}
228+
229+
// CurrentBlobFileSet describes the set of blob files that are currently live in
230+
// the latest Version. CurrentBlobFileSet is not thread-safe. In practice its
231+
// use is protected by the versionSet logLock.
232+
type CurrentBlobFileSet struct {
233+
// files is a map of blob file numbers to a *currentBlobFile, recording
234+
// metadata about the blob file's active references in the latest Version.
235+
files map[base.DiskFileNum]*currentBlobFile
236+
// stats records cumulative stats across all blob files in the set.
237+
stats AggregateBlobFileStats
238+
}
239+
240+
type currentBlobFile struct {
241+
metadata *BlobFileMetadata
242+
// references holds pointers to TableMetadatas that exist in the latest
243+
// version and reference this blob file. When the length of references falls
244+
// to zero, the blob file is either a zombie file (if BlobFileMetadata.refs
245+
// > 0), or obsolete and ready to be deleted.
246+
//
247+
// INVARIANT: BlobFileMetadata.refs >= len(references)
248+
//
249+
// TODO(jackson): Rather than using 1 map per blob file which needs to grow
250+
// and shrink over the lifetime of the blob file, we could use a single
251+
// B-Tree that holds all blob references, sorted by blob file then by
252+
// referencing table number. This would likely be more memory efficient,
253+
// reduce overall number of pointers to chase and suffer fewer allocations
254+
// (and we can pool the B-Tree nodes to further reduce allocs)
255+
references map[*TableMetadata]struct{}
256+
// referencedValueSize is the sum of the length of uncompressed values in
257+
// this blob file that are still live.
258+
referencedValueSize uint64
259+
}
260+
261+
// Init initializes the CurrentBlobFileSet with the state of the provided
262+
// BulkVersionEdit. This is used after replaying a manifest.
263+
func (s *CurrentBlobFileSet) Init(bve *BulkVersionEdit) {
264+
*s = CurrentBlobFileSet{files: make(map[base.DiskFileNum]*currentBlobFile)}
265+
if bve == nil {
266+
return
267+
}
268+
for _, m := range bve.BlobFiles.Added {
269+
s.files[m.FileNum] = &currentBlobFile{
270+
metadata: m,
271+
references: make(map[*TableMetadata]struct{}),
272+
}
273+
s.stats.Count++
274+
s.stats.PhysicalSize += m.Size
275+
s.stats.ValueSize += m.ValueSize
276+
}
277+
// Record references to blob files from extant tables. Any referenced blob
278+
// files should already exist in s.files.
279+
for _, levelFiles := range bve.AddedTables {
280+
for _, m := range levelFiles {
281+
for _, ref := range m.BlobReferences {
282+
cbf, ok := s.files[ref.FileNum]
283+
if !ok {
284+
panic(errors.AssertionFailedf("pebble: referenced blob file %d not found", ref.FileNum))
285+
}
286+
cbf.references[m] = struct{}{}
287+
cbf.referencedValueSize += ref.ValueSize
288+
s.stats.ReferencedValueSize += ref.ValueSize
289+
s.stats.ReferencesCount++
290+
}
291+
}
292+
}
293+
}
294+
295+
// Stats returns the cumulative stats across all blob files in the set.
296+
func (s *CurrentBlobFileSet) Stats() AggregateBlobFileStats {
297+
return s.stats
298+
}
299+
300+
// Metadatas returns a slice of all blob file metadata in the set, sorted by
301+
// file number for determinism.
302+
func (s *CurrentBlobFileSet) Metadatas() []*BlobFileMetadata {
303+
m := make([]*BlobFileMetadata, 0, len(s.files))
304+
for _, cbf := range s.files {
305+
m = append(m, cbf.metadata)
306+
}
307+
slices.SortFunc(m, func(a, b *BlobFileMetadata) int {
308+
return stdcmp.Compare(a.FileNum, b.FileNum)
309+
})
310+
return m
311+
}
312+
313+
// ApplyAndUpdateVersionEdit applies a version edit to the current blob file
314+
// set, updating its internal tracking of extant blob file references. If after
315+
// applying the version edit a blob file has no more references, the version
316+
// edit is modified to record the blob file removal.
317+
func (s *CurrentBlobFileSet) ApplyAndUpdateVersionEdit(ve *VersionEdit) error {
318+
// Insert new blob files into the set.
319+
for _, nf := range ve.NewBlobFiles {
320+
if _, ok := s.files[nf.FileNum]; ok {
321+
return errors.AssertionFailedf("pebble: new blob file %d already exists", nf.FileNum)
322+
}
323+
cbf := &currentBlobFile{references: make(map[*TableMetadata]struct{})}
324+
cbf.metadata = nf
325+
s.files[nf.FileNum] = cbf
326+
s.stats.Count++
327+
s.stats.PhysicalSize += nf.Size
328+
s.stats.ValueSize += nf.ValueSize
329+
}
330+
331+
// Update references to blob files from new tables. Any referenced blob
332+
// files should already exist in s.files.
333+
newTables := make(map[base.FileNum]struct{})
334+
for _, e := range ve.NewTables {
335+
newTables[e.Meta.FileNum] = struct{}{}
336+
for _, ref := range e.Meta.BlobReferences {
337+
cbf, ok := s.files[ref.FileNum]
338+
if !ok {
339+
return errors.AssertionFailedf("pebble: referenced blob file %d not found", ref.FileNum)
340+
}
341+
cbf.references[e.Meta] = struct{}{}
342+
cbf.referencedValueSize += ref.ValueSize
343+
s.stats.ReferencedValueSize += ref.ValueSize
344+
s.stats.ReferencesCount++
345+
}
346+
}
347+
348+
// Remove references to blob files from deleted tables. Any referenced blob
349+
// files should already exist in s.files. If the removal of a reference
350+
// causes the blob file's ref count to drop to zero, the blob file is a
351+
// zombie. We update the version edit to record the blob file removal and
352+
// remove it from the set.
353+
for _, meta := range ve.DeletedTables {
354+
for _, ref := range meta.BlobReferences {
355+
cbf, ok := s.files[ref.FileNum]
356+
if !ok {
357+
return errors.AssertionFailedf("pebble: referenced blob file %d not found", ref.FileNum)
358+
}
359+
if invariants.Enabled {
360+
if ref.ValueSize > cbf.referencedValueSize {
361+
return errors.AssertionFailedf("pebble: referenced value size %d for blob file %d is greater than the referenced value size %d",
362+
ref.ValueSize, cbf.metadata.FileNum, cbf.referencedValueSize)
363+
}
364+
if _, ok := cbf.references[meta]; !ok {
365+
return errors.AssertionFailedf("pebble: deleted table %s's reference to blob file %d not known",
366+
meta.FileNum, ref.FileNum)
367+
}
368+
}
369+
370+
// Decrement the stats for this reference.
371+
cbf.referencedValueSize -= ref.ValueSize
372+
s.stats.ReferencedValueSize -= ref.ValueSize
373+
s.stats.ReferencesCount--
374+
if _, ok := newTables[meta.FileNum]; ok {
375+
// This table was added to a different level of the LSM in the
376+
// same version edit. It's being moved. We can preserve the
377+
// existing reference. We still needed to reduce the counts
378+
// above because we doubled it when we incremented stats on
379+
// account of files in NewTables.
380+
continue
381+
}
382+
// Remove the reference of this table to this blob file.
383+
delete(cbf.references, meta)
384+
385+
// If there are no more references to the blob file, remove it from
386+
// the set and add the removal of the blob file to the version edit.
387+
if len(cbf.references) == 0 {
388+
if cbf.referencedValueSize != 0 {
389+
return errors.AssertionFailedf("pebble: referenced value size %d is non-zero for blob file %s with no refs",
390+
cbf.referencedValueSize, cbf.metadata.FileNum)
391+
}
392+
if ve.DeletedBlobFiles == nil {
393+
ve.DeletedBlobFiles = make(map[base.DiskFileNum]*BlobFileMetadata)
394+
}
395+
ve.DeletedBlobFiles[cbf.metadata.FileNum] = cbf.metadata
396+
s.stats.Count--
397+
s.stats.PhysicalSize -= cbf.metadata.Size
398+
s.stats.ValueSize -= cbf.metadata.ValueSize
399+
delete(s.files, cbf.metadata.FileNum)
400+
}
401+
}
402+
}
403+
return nil
404+
}

internal/manifest/blob_metadata_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@
55
package manifest
66

77
import (
8+
"bytes"
9+
"fmt"
810
"testing"
911

12+
"github.com/cockroachdb/datadriven"
13+
"github.com/cockroachdb/pebble/internal/base"
1014
"github.com/stretchr/testify/require"
1115
)
1216

@@ -44,3 +48,61 @@ func TestBlobFileMetadata_ParseRoundTrip(t *testing.T) {
4448
})
4549
}
4650
}
51+
52+
func TestCurrentBlobFileSet(t *testing.T) {
53+
var (
54+
buf bytes.Buffer
55+
set CurrentBlobFileSet
56+
tableMetas = make(map[base.FileNum]*TableMetadata)
57+
)
58+
parseAndFillVersionEdit := func(s string) *VersionEdit {
59+
ve, err := ParseVersionEditDebug(s)
60+
require.NoError(t, err)
61+
for i, m := range ve.NewTables {
62+
if existingMeta, ok := tableMetas[m.Meta.FileNum]; ok {
63+
// Ensure pointer equality of the *TableMetadata.
64+
// ParseVersionEditDebug will return a new *TableMetadata every
65+
// time it decodes it.
66+
ve.NewTables[i].Meta = existingMeta
67+
} else {
68+
tableMetas[m.Meta.FileNum] = m.Meta
69+
}
70+
}
71+
for dte := range ve.DeletedTables {
72+
ve.DeletedTables[dte] = tableMetas[dte.FileNum]
73+
}
74+
return ve
75+
}
76+
77+
datadriven.RunTest(t, "testdata/current_blob_file_set", func(t *testing.T, d *datadriven.TestData) string {
78+
buf.Reset()
79+
switch d.Cmd {
80+
case "init":
81+
ve := parseAndFillVersionEdit(d.Input)
82+
bve := &BulkVersionEdit{}
83+
if err := bve.Accumulate(ve); err != nil {
84+
return fmt.Sprintf("error accumulating version edit: %s", err)
85+
}
86+
set.Init(bve)
87+
return set.Stats().String()
88+
case "applyAndUpdateVersionEdit":
89+
ve := parseAndFillVersionEdit(d.Input)
90+
if err := set.ApplyAndUpdateVersionEdit(ve); err != nil {
91+
return fmt.Sprintf("error applying and updating version edit: %s", err)
92+
}
93+
fmt.Fprintf(&buf, "modified version edit:\n%s", ve.DebugString(base.DefaultFormatter))
94+
fmt.Fprintf(&buf, "current blob file set:\n%s", set.Stats().String())
95+
return buf.String()
96+
case "metadatas":
97+
for _, m := range set.Metadatas() {
98+
fmt.Fprintf(&buf, "%s\n", m)
99+
}
100+
return buf.String()
101+
case "stats":
102+
return set.Stats().String()
103+
default:
104+
t.Fatalf("unknown command: %s", d.Cmd)
105+
}
106+
return ""
107+
})
108+
}

0 commit comments

Comments
 (0)