Skip to content

Commit 2c04bce

Browse files
committed
db: collect blob file properties
Collect blob file properties as part of initial table stats collection. Properties for newly written blob files are populated directly.
1 parent e39a868 commit 2c04bce

File tree

11 files changed

+145
-42
lines changed

11 files changed

+145
-42
lines changed

blob_rewrite.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,14 @@ func (d *DB) runBlobFileRewriteLocked(
308308
return objstorage.ObjectMetadata{}, nil, err
309309
}
310310

311+
physical := &manifest.PhysicalBlobFile{
312+
FileNum: objMeta.DiskFileNum,
313+
Size: stats.FileLen,
314+
ValueSize: stats.UncompressedValueBytes,
315+
CreationTime: uint64(d.timeNow().Unix()),
316+
}
317+
physical.PopulateProperties(&stats.Properties)
318+
311319
ve := &manifest.VersionEdit{
312320
DeletedBlobFiles: map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile{
313321
{
@@ -317,13 +325,8 @@ func (d *DB) runBlobFileRewriteLocked(
317325
},
318326
NewBlobFiles: []manifest.BlobFileMetadata{
319327
{
320-
FileID: c.input.FileID,
321-
Physical: &manifest.PhysicalBlobFile{
322-
FileNum: objMeta.DiskFileNum,
323-
Size: stats.FileLen,
324-
ValueSize: stats.UncompressedValueBytes,
325-
CreationTime: uint64(d.timeNow().Unix()),
326-
},
328+
FileID: c.input.FileID,
329+
Physical: physical,
327330
},
328331
},
329332
}

data_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,8 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
903903
}
904904
d.mu.Lock()
905905
defer d.mu.Unlock()
906+
// Make sure the table stats finished loading.
907+
d.waitTableStatsLocked()
906908
for i := range snapshots {
907909
s := &Snapshot{db: d}
908910
s.seqNum = snapshots[i]
@@ -1206,7 +1208,11 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
12061208
return nil, err
12071209
}
12081210
d.updateReadStateLocked(nil)
1209-
d.updateTableStatsLocked(ve.NewTables)
1211+
1212+
// Force a re-read of table and blob file stats.
1213+
d.mu.tableStats.loadedInitial = false
1214+
d.maybeCollectTableStatsLocked()
1215+
d.waitTableStatsLocked()
12101216

12111217
return d, nil
12121218
}

format_major_version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ const (
117117
// FormatMinTableFormatPebblev1 is a format major version that guarantees that
118118
// tables created by or ingested into the DB at or above this format major
119119
// version will have a table format version of at least Pebblev1 (Block
120-
// Properties).
120+
// FileProperties).
121121
// Deprecated.
122122
_ // FormatMinTableFormatPebblev1
123123

internal/genericcache/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ type ValueRef[K Key, V any, InitOpts any] struct {
107107
}
108108

109109
// Value returns the value. This method and the returned value can only be used
110-
// until ref.Close() is called.
110+
// until ref.Unref() is called.
111111
func (ref ValueRef[K, V, InitOpts]) Value() *V {
112112
if invariants.Enabled && ref.value.err != nil {
113113
panic("ValueRef with error")

internal/manifest/blob_metadata.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/pebble/internal/invariants"
2222
"github.com/cockroachdb/pebble/internal/strparse"
2323
"github.com/cockroachdb/pebble/sstable"
24+
"github.com/cockroachdb/pebble/sstable/blob"
2425
"github.com/cockroachdb/redact"
2526
)
2627

@@ -134,6 +135,10 @@ type PhysicalBlobFile struct {
134135
// referencing TableMetadata is installed in a Version and decremented when
135136
// that TableMetadata becomes obsolete.
136137
refs atomic.Int32
138+
139+
propsValid atomic.Bool
140+
// stats are populated exactly once.
141+
props blob.FileProperties
137142
}
138143

139144
// SafeFormat implements redact.SafeFormatter.
@@ -159,6 +164,27 @@ func (m *PhysicalBlobFile) UserKeyBounds() base.UserKeyBounds {
159164
return base.UserKeyBounds{}
160165
}
161166

167+
// Properties returns the blob file properties if they have been populated, or
168+
// nil and ok=false if they were not.
169+
//
170+
// The caller must not modify the returned stats.
171+
func (m *PhysicalBlobFile) Properties() (_ *blob.FileProperties, ok bool) {
172+
if !m.propsValid.Load() {
173+
return nil, false
174+
}
175+
return &m.props, true
176+
}
177+
178+
// PopulateProperties populates the bob file properties. Can be called at most
179+
// once for a TableBacking.
180+
func (m *PhysicalBlobFile) PopulateProperties(props *blob.FileProperties) {
181+
m.props = *props
182+
oldPropsValid := m.propsValid.Swap(true)
183+
if invariants.Enabled && oldPropsValid {
184+
panic("props set twice")
185+
}
186+
}
187+
162188
// ref increments the reference count for the blob file.
163189
func (m *PhysicalBlobFile) ref() {
164190
m.refs.Add(+1)

metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ type LevelMetrics struct {
134134

135135
// Additional contains misc additional metrics that are not always printed.
136136
Additional struct {
137-
// The sum of Properties.ValueBlocksSize for all the sstables in this
137+
// The sum of FileProperties.ValueBlocksSize for all the sstables in this
138138
// level. Printed by LevelMetrics.format iff there is at least one level
139139
// with a non-zero value.
140140
ValueBlocksSize uint64

sstable/blob/blob.go

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/errors"
1515
"github.com/cockroachdb/pebble/internal/base"
1616
"github.com/cockroachdb/pebble/internal/crc"
17+
"github.com/cockroachdb/pebble/internal/invariants"
1718
"github.com/cockroachdb/pebble/objstorage"
1819
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
1920
"github.com/cockroachdb/pebble/sstable/block"
@@ -76,8 +77,8 @@ const (
7677
// 20 Checksum type (1 byte)
7778
// 21 File format version (1 byte)
7879
// 22:30 Original file number (8 bytes)
79-
// 30:38 Properties block offset (8 bytes)
80-
// 38:46 Properties block length (8 bytes)
80+
// 30:38 FileProperties block offset (8 bytes)
81+
// 38:46 FileProperties block length (8 bytes)
8182
// 46:54 Metaindex block offset (8 bytes); optional (for future use)
8283
// 54:62 Metaindex block length (8 bytes); optional (for future use)
8384
// 62:70 Blob file magic string (8 bytes)
@@ -123,6 +124,7 @@ type FileWriterStats struct {
123124
ValueCount uint32
124125
UncompressedValueBytes uint64
125126
FileLen uint64
127+
Properties FileProperties
126128
}
127129

128130
// String implements the fmt.Stringer interface.
@@ -325,14 +327,14 @@ func (w *FileWriter) Close() (FileWriterStats, error) {
325327
}
326328

327329
// Write the properties and the v2 footer if the file format is v2.
330+
w.stats.Properties = FileProperties{
331+
CompressionStats: w.physBlockMaker.Compressor.Stats().Clone(),
332+
}
328333
var propBlockHandle block.Handle
329334
if w.format >= FileFormatV2 {
330335
var cw colblk.KeyValueBlockWriter
331336
cw.Init()
332-
p := Properties{
333-
CompressionStats: w.physBlockMaker.Compressor.Stats().String(),
334-
}
335-
p.writeTo(&cw)
337+
w.stats.Properties.writeTo(&cw)
336338
propBlockHandle, err = w.writeMetadataBlock(cw.Finish(cw.Rows()))
337339
if err != nil {
338340
return err
@@ -616,40 +618,45 @@ func (r *FileReader) Layout() (string, error) {
616618
return buf.String(), nil
617619
}
618620

619-
type Properties struct {
620-
CompressionStats string
621+
type FileProperties struct {
622+
CompressionStats block.CompressionStats
621623
}
622624

623625
// String returns any set properties, one per line.
624-
func (p *Properties) String() string {
626+
func (p *FileProperties) String() string {
625627
var buf bytes.Buffer
626-
if p.CompressionStats != "" {
627-
fmt.Fprintf(&buf, "%s: %s\n", propertyKeyCompressionStats, p.CompressionStats)
628+
if !p.CompressionStats.IsEmpty() {
629+
fmt.Fprintf(&buf, "%s: %s\n", propertyKeyCompressionStats, p.CompressionStats.String())
628630
}
629631
return buf.String()
630632
}
631633

632-
func (p *Properties) set(key []byte, value []byte) {
634+
func (p *FileProperties) set(key []byte, value []byte) {
633635
switch string(key) {
634636
case propertyKeyCompressionStats:
635-
p.CompressionStats = string(value)
637+
var err error
638+
p.CompressionStats, err = block.ParseCompressionStats(string(value))
639+
if invariants.Enabled && err != nil {
640+
panic(errors.AssertionFailedf("pebble: error parsing blob file compression stats %q", string(value)))
641+
}
642+
636643
default:
637644
// Ignore unknown properties (for forward compatibility).
638645
}
639646
}
640647

641-
func (p *Properties) writeTo(w *colblk.KeyValueBlockWriter) {
642-
if p.CompressionStats != "" {
643-
w.AddKV([]byte(propertyKeyCompressionStats), []byte(p.CompressionStats))
648+
func (p *FileProperties) writeTo(w *colblk.KeyValueBlockWriter) {
649+
if !p.CompressionStats.IsEmpty() {
650+
w.AddKV([]byte(propertyKeyCompressionStats), []byte(p.CompressionStats.String()))
644651
}
645652
}
646653

647654
const propertyKeyCompressionStats = "compression_stats"
648655

649656
// ReadProperties reads the properties block from the file, if it exists.
650-
func (r *FileReader) ReadProperties(ctx context.Context) (Properties, error) {
657+
func (r *FileReader) ReadProperties(ctx context.Context) (FileProperties, error) {
651658
if r.footer.format < FileFormatV2 {
652-
return Properties{}, nil
659+
return FileProperties{}, nil
653660
}
654661
// We don't want the property block to go into the block cache, so we use a
655662
// buffer pool.
@@ -661,12 +668,12 @@ func (r *FileReader) ReadProperties(ctx context.Context) (Properties, error) {
661668
func(*block.Metadata, []byte) error { return nil },
662669
)
663670
if err != nil {
664-
return Properties{}, err
671+
return FileProperties{}, err
665672
}
666673
defer b.Release()
667674
var decoder colblk.KeyValueBlockDecoder
668675
decoder.Init(b.BlockData())
669-
var p Properties
676+
var p FileProperties
670677
for k, v := range decoder.All() {
671678
p.set(k, v)
672679
}

sstable/block/compression_stats.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,20 @@ func (c CompressionStats) String() string {
154154
return buf.String()
155155
}
156156

157+
// Clone returns a copy of the CompressionStats.
158+
func (c *CompressionStats) Clone() CompressionStats {
159+
var out CompressionStats
160+
out.noCompressionBytes = c.noCompressionBytes
161+
out.fastest = c.fastest
162+
if len(c.others) > 0 {
163+
out.others = make(map[compression.Setting]CompressionStatsForSetting, len(c.others))
164+
for s, cs := range c.others {
165+
out.others[s] = cs
166+
}
167+
}
168+
return out
169+
}
170+
157171
// Scale the stats by (size/backingSize). Used to obtain an approximation of the
158172
// stats for a virtual table.
159173
//

table_stats.go

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ func (d *DB) collectTableStats() bool {
9999
d.mu.tableStats.pending = nil
100100
d.mu.tableStats.loading = true
101101
jobID := d.newJobIDLocked()
102-
loadedInitial := d.mu.tableStats.loadedInitial
103102
// Drop DB.mu before performing IO.
104103
d.mu.Unlock()
105104

@@ -111,22 +110,29 @@ func (d *DB) collectTableStats() bool {
111110
rs := d.loadReadState()
112111
var collected []collectedStats
113112
var hints []deleteCompactionHint
113+
initialLoadCompleted := false
114114
if len(pending) > 0 {
115115
collected, hints = d.loadNewFileStats(ctx, rs, pending)
116116
} else {
117117
var moreRemain bool
118118
var buf [maxTableStatsPerScan]collectedStats
119-
collected, hints, moreRemain = d.scanReadStateTableStats(ctx, rs, buf[:0])
120-
loadedInitial = !moreRemain
119+
collected, hints, moreRemain = d.scanReadStateTableStats(ctx, rs.current, buf[:0])
120+
if !moreRemain {
121+
// Once we're done with table stats, load blob file properties.
122+
moreRemain = d.scanBlobFileProperties(ctx, rs.current, maxTableStatsPerScan-len(collected))
123+
if !moreRemain {
124+
initialLoadCompleted = true
125+
}
126+
}
121127
}
122128
rs.unref()
123129

124130
// Update the TableMetadata with the loaded stats while holding d.mu.
125131
d.mu.Lock()
126132
defer d.mu.Unlock()
127133
d.mu.tableStats.loading = false
128-
if loadedInitial && !d.mu.tableStats.loadedInitial {
129-
d.mu.tableStats.loadedInitial = loadedInitial
134+
if initialLoadCompleted && !d.mu.tableStats.loadedInitial {
135+
d.mu.tableStats.loadedInitial = true
130136
d.opts.EventListener.TableStatsLoaded(TableStatsInfo{
131137
JobID: int(jobID),
132138
})
@@ -216,12 +222,12 @@ func (d *DB) loadNewFileStats(
216222
// are no pending new files, but there might be files that existed at Open for
217223
// which we haven't loaded table stats.
218224
func (d *DB) scanReadStateTableStats(
219-
ctx context.Context, rs *readState, fill []collectedStats,
220-
) ([]collectedStats, []deleteCompactionHint, bool) {
221-
moreRemain := false
225+
ctx context.Context, version *manifest.Version, fill []collectedStats,
226+
) (_ []collectedStats, _ []deleteCompactionHint, moreRemain bool) {
222227
var hints []deleteCompactionHint
223228
sizesChecked := make(map[base.DiskFileNum]struct{})
224-
for l, levelMetadata := range rs.current.Levels {
229+
// TODO(radu): an O(#tables) scan every time could be problematic.
230+
for l, levelMetadata := range version.Levels {
225231
for f := range levelMetadata.All() {
226232
// NB: Only the active stats collection job updates f.Stats for active
227233
// files, and we ensure only one goroutine runs it at a time through
@@ -278,7 +284,7 @@ func (d *DB) scanReadStateTableStats(
278284
sizesChecked[f.TableBacking.DiskFileNum] = struct{}{}
279285
}
280286

281-
stats, newHints, err := d.loadTableStats(ctx, rs.current, l, f)
287+
stats, newHints, err := d.loadTableStats(ctx, version, l, f)
282288
if err != nil {
283289
// Set `moreRemain` so we'll try again.
284290
moreRemain = true
@@ -295,6 +301,43 @@ func (d *DB) scanReadStateTableStats(
295301
return fill, hints, moreRemain
296302
}
297303

304+
// populateBlobFileProperties reads at most maxNum blob file properties for blob
305+
// files that don't have them populated. Returns false once all properties have
306+
// been populated.
307+
func (d *DB) scanBlobFileProperties(
308+
ctx context.Context, version *manifest.Version, maxNum int,
309+
) (moreRemain bool) {
310+
// TODO(radu): an O(#files) scan every time could be problematic.
311+
// We could remember the last blob file ID and scan from there.
312+
for f := range version.BlobFiles.All() {
313+
if _, propsValid := f.Physical.Properties(); propsValid {
314+
// Properties are already populated.
315+
continue
316+
}
317+
if maxNum == 0 {
318+
return moreRemain
319+
}
320+
v, err := d.fileCache.findOrCreateBlob(ctx, f.Physical, block.InitFileReadStats{})
321+
if err != nil {
322+
moreRemain = true
323+
continue
324+
}
325+
blobReader := v.Value().mustBlob()
326+
blobProps, err := blobReader.ReadProperties(ctx)
327+
v.Unref()
328+
if err != nil {
329+
moreRemain = true
330+
continue
331+
}
332+
// It is ok to call PopulateProperties here because this function runs as
333+
// part of a table statistics job, and at most one goroutine runs this at a
334+
// time (see d.mu.tableStats.loading).
335+
f.Physical.PopulateProperties(&blobProps)
336+
maxNum--
337+
}
338+
return moreRemain
339+
}
340+
298341
func (d *DB) loadTableStats(
299342
ctx context.Context, v *manifest.Version, level int, meta *manifest.TableMetadata,
300343
) (manifest.TableStats, []deleteCompactionHint, error) {
@@ -525,6 +568,9 @@ func (d *DB) estimateSizesBeneath(
525568
if err != nil {
526569
return err
527570
}
571+
// It is ok to call PopulateProperties here because this function runs as part of
572+
// a table statistics job, and at most one goroutine runs this at a
573+
// time (see d.mu.tableStats.loading).
528574
backingProps = tableBeneath.TableBacking.PopulateProperties(&loadedProps)
529575
return nil
530576
})

testdata/compaction/value_separation

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ ITERATORS
158158
block cache | file cache | filter | sst iters | snapshots
159159
entries | hit rate | entries | hit rate | util | open | open
160160
-------------+-------------+--------------+-------------+--------------+-------------+------------
161-
5 (1.6KB) | 81.8% | 1 (392B) | 89.2% | 0.0% | 0 | 0
161+
5 (1.6KB) | 81.8% | 1 (392B) | 89.6% | 0.0% | 0 | 0
162162

163163
FILES tables | blob files | blob values
164164
stats prog | backing | zombie | live | zombie | total | refed

0 commit comments

Comments
 (0)