Skip to content

Commit 7d5f9b9

Browse files
committed
blob: vary max cached readers by read amplification
The blob.ValueFetcher maintains a small cache of open blob file readers. This allows a scan that's retrieving values to avoid repeatedly re-opening the same blob file and re-retrieving the index block from the block cache. This cache previously used a fixed size of 5 in all contexts. This commit updates the fetcher to configure the size of the cache dynamically. Iterator construction now sets a cache size of 5 times the number of non-empty levels in the LSM, capturing the fact that deeper LSMs will reference more blob files over a span. Close #5181. ``` │ all.txt │ │ ops/sec │ ycsb/A/values=1024 383.6k ± ∞ ¹ ycsb/B/values=1024 619.7k ± ∞ ¹ ycsb/C/values=1024 1.005M ± ∞ ¹ ycsb/D/values=1024 232.8k ± ∞ ¹ ycsb/E/values=1024 96.19k ± ∞ ¹ ycsb/F/values=1024 82.91k ± ∞ ¹ geomean 276.2k ```
1 parent b82cc06 commit 7d5f9b9

File tree

13 files changed

+65
-31
lines changed

13 files changed

+65
-31
lines changed

blob_rewrite_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,8 @@ func TestBlobRewriteRandomized(t *testing.T) {
410410
// Verify that the rewritten blob file contains the correct values, and
411411
// that they may still be accessed using the original handles.
412412
var valueFetcher blob.ValueFetcher
413-
valueFetcher.Init(constantFileMapping(newBlobFileNum), fch, readEnv)
413+
valueFetcher.Init(constantFileMapping(newBlobFileNum), fch, readEnv,
414+
blob.SuggestedCachedReaders(1))
414415
func() {
415416
defer func() { _ = valueFetcher.Close() }()
416417
for _, valueIndex := range newFile.valueIndices {

compaction.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3321,7 +3321,8 @@ func (d *DB) compactAndWrite(
33213321
),
33223322
}
33233323
if c.version != nil {
3324-
c.iterationState.valueFetcher.Init(&c.version.BlobFiles, d.fileCache, blockReadEnv)
3324+
c.iterationState.valueFetcher.Init(&c.version.BlobFiles, d.fileCache, blockReadEnv,
3325+
blob.SuggestedCachedReaders(len(c.inputs)))
33253326
}
33263327
iiopts := internalIterOpts{
33273328
compaction: true,

compaction_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1175,7 +1175,8 @@ func TestCompaction(t *testing.T) {
11751175
}
11761176
}
11771177
vf := &blob.ValueFetcher{}
1178-
vf.Init(&d.mu.versions.currentVersion().BlobFiles, d.fileCache, block.ReadEnv{})
1178+
vf.Init(&d.mu.versions.currentVersion().BlobFiles, d.fileCache, block.ReadEnv{},
1179+
blob.SuggestedCachedReaders(d.mu.versions.currentVersion().MaxReadAmp()))
11791180
defer func() { _ = vf.Close() }()
11801181
err := validateBlobValueLiveness(inputTables, d.fileCache, block.ReadEnv{}, vf)
11811182
if err != nil {

db.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/cockroachdb/pebble/rangekey"
3434
"github.com/cockroachdb/pebble/record"
3535
"github.com/cockroachdb/pebble/sstable"
36+
"github.com/cockroachdb/pebble/sstable/blob"
3637
"github.com/cockroachdb/pebble/sstable/block"
3738
"github.com/cockroachdb/pebble/vfs"
3839
"github.com/cockroachdb/pebble/vfs/atomicfs"
@@ -1217,9 +1218,11 @@ func (i *Iterator) constructPointIter(
12171218
),
12181219
}
12191220
if i.readState != nil {
1220-
i.blobValueFetcher.Init(&i.readState.current.BlobFiles, i.fc, readEnv)
1221+
i.blobValueFetcher.Init(&i.readState.current.BlobFiles, i.fc, readEnv,
1222+
blob.SuggestedCachedReaders(i.readState.current.MaxReadAmp()))
12211223
} else if i.version != nil {
1222-
i.blobValueFetcher.Init(&i.version.BlobFiles, i.fc, readEnv)
1224+
i.blobValueFetcher.Init(&i.version.BlobFiles, i.fc, readEnv,
1225+
blob.SuggestedCachedReaders(i.version.MaxReadAmp()))
12231226
}
12241227
internalOpts := internalIterOpts{
12251228
readEnv: sstable.ReadEnv{Block: readEnv},
@@ -1245,20 +1248,13 @@ func (i *Iterator) constructPointIter(
12451248
var current *manifest.Version
12461249
if !i.batchOnlyIter {
12471250
numMergingLevels += len(memtables)
1248-
12491251
current = i.version
12501252
if current == nil {
12511253
current = i.readState.current
12521254
}
1253-
numMergingLevels += len(current.L0SublevelFiles)
1254-
numLevelIters += len(current.L0SublevelFiles)
1255-
for level := 1; level < len(current.Levels); level++ {
1256-
if current.Levels[level].Empty() {
1257-
continue
1258-
}
1259-
numMergingLevels++
1260-
numLevelIters++
1261-
}
1255+
maxReadAmp := current.MaxReadAmp()
1256+
numMergingLevels += maxReadAmp
1257+
numLevelIters += maxReadAmp
12621258
}
12631259

12641260
if numMergingLevels > cap(mlevels) {

get.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/pebble/internal/keyspan"
1616
"github.com/cockroachdb/pebble/internal/manifest"
1717
"github.com/cockroachdb/pebble/internal/treeprinter"
18+
"github.com/cockroachdb/pebble/sstable/blob"
1819
"github.com/cockroachdb/pebble/sstable/block"
1920
)
2021

@@ -120,7 +121,8 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer,
120121
keyBuf: buf.keyBuf,
121122
}
122123
// Set up a blob value fetcher to use for retrieving values from blob files.
123-
i.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, block.NoReadEnv)
124+
i.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, block.NoReadEnv,
125+
blob.SuggestedCachedReaders(readState.current.MaxReadAmp()))
124126
get.iiopts.blobValueFetcher = &i.blobValueFetcher
125127

126128
if !i.First() {

internal/manifest/version.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,19 @@ func (v *Version) unrefFiles() ObsoleteFiles {
311311
return obsoleteFiles
312312
}
313313

314+
// MaxReadAmp returns the number of non-empty sublevels and levels in the
315+
// Version. This is the maximum number of sstables that a point read must
316+
// consult.
317+
func (v *Version) MaxReadAmp() int {
318+
readAmp := len(v.L0SublevelFiles)
319+
for i := 1; i < len(v.Levels); i++ {
320+
if !v.Levels[i].Empty() {
321+
readAmp++
322+
}
323+
}
324+
return readAmp
325+
}
326+
314327
// ObsoleteFiles holds a set of files that are no longer referenced by any
315328
// referenced Version.
316329
type ObsoleteFiles struct {

level_checker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,11 @@ func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
554554
},
555555
fileCache: d.fileCache,
556556
}
557-
checkConfig.blobValueFetcher.Init(&readState.current.BlobFiles, checkConfig.fileCache, checkConfig.readEnv)
557+
checkConfig.blobValueFetcher.Init(
558+
&readState.current.BlobFiles,
559+
checkConfig.fileCache,
560+
checkConfig.readEnv,
561+
blob.SuggestedCachedReaders(readState.current.MaxReadAmp()))
558562
defer func() { _ = checkConfig.blobValueFetcher.Close() }()
559563
return checkLevelsInternal(checkConfig)
560564
}

scan_internal.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ func (d *DB) newInternalIter(
195195
seqNum: seqNum,
196196
mergingIter: &buf.merging,
197197
}
198-
dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{})
198+
dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{},
199+
blob.SuggestedCachedReaders(vers.MaxReadAmp()))
199200

200201
dbi.opts = *o
201202
dbi.opts.logger = d.opts.Logger

sstable/blob/fetcher.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package blob
66

77
import (
88
"context"
9+
"slices"
910

1011
"github.com/cockroachdb/errors"
1112
"github.com/cockroachdb/pebble/internal/base"
@@ -15,8 +16,6 @@ import (
1516
"github.com/cockroachdb/pebble/sstable/block"
1617
)
1718

18-
const maxCachedReaders = 5
19-
2019
// A ValueReader is an interface defined over a file that can be used to read
2120
// value blocks.
2221
type ValueReader interface {
@@ -47,6 +46,12 @@ type ReaderProvider interface {
4746
) (r ValueReader, closeFunc func(), err error)
4847
}
4948

49+
// SuggestedCachedReaders returns the suggested default number of cached readers
50+
// to use for ValueFetcher.Init given the number of non-empty levels in the LSM.
51+
func SuggestedCachedReaders(readAmp int) int {
52+
return 5 * max(1, readAmp)
53+
}
54+
5055
// A ValueFetcher retrieves values stored out-of-band in separate blob files.
5156
// The ValueFetcher caches accessed file readers to avoid redundant file cache
5257
// and block cache lookups when performing consecutive value retrievals.
@@ -62,7 +67,7 @@ type ValueFetcher struct {
6267
env block.ReadEnv
6368
fetchCount int
6469
bufMangler invariants.BufMangler
65-
readers [maxCachedReaders]cachedReader
70+
readers []cachedReader
6671
}
6772

6873
// TODO(jackson): Support setting up a read handle for compaction when relevant.
@@ -71,13 +76,16 @@ type ValueFetcher struct {
7176
var _ base.ValueFetcher = (*ValueFetcher)(nil)
7277

7378
// Init initializes the ValueFetcher.
74-
func (r *ValueFetcher) Init(fm base.BlobFileMapping, rp ReaderProvider, env block.ReadEnv) {
79+
func (r *ValueFetcher) Init(
80+
fm base.BlobFileMapping, rp ReaderProvider, env block.ReadEnv, maxCachedReaders int,
81+
) {
7582
r.fileMapping = fm
7683
r.readerProvider = rp
7784
r.env = env
7885
if r.readerProvider == nil {
7986
panic("readerProvider is nil")
8087
}
88+
r.readers = slices.Grow(r.readers[:0], maxCachedReaders)[:maxCachedReaders]
8189
}
8290

8391
// FetchHandle returns the value, given the handle. FetchHandle must not be
@@ -189,6 +197,7 @@ func (r *ValueFetcher) Close() error {
189197
r.env = block.ReadEnv{}
190198
r.fetchCount = 0
191199
r.bufMangler = invariants.BufMangler{}
200+
r.readers = r.readers[:0]
192201
return err
193202
}
194203

sstable/blob/fetcher_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,10 @@ func TestValueFetcher(t *testing.T) {
123123
case "new-fetcher":
124124
var name string
125125
td.ScanArgs(t, "name", &name)
126+
maxCachedReaders := 5
127+
td.MaybeScanArgs(t, "maxCachedReaders", &maxCachedReaders)
126128
fetchers[name] = &ValueFetcher{}
127-
fetchers[name].Init(identityFileMapping{}, rp, block.ReadEnv{})
129+
fetchers[name].Init(identityFileMapping{}, rp, block.ReadEnv{}, maxCachedReaders)
128130
return ""
129131
case "fetch":
130132
var (
@@ -209,7 +211,7 @@ func TestValueFetcherRetrieveRandomized(t *testing.T) {
209211

210212
t.Run("sequential", func(t *testing.T) {
211213
var fetcher ValueFetcher
212-
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{})
214+
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{}, 5)
213215
defer fetcher.Close()
214216
for i := 0; i < len(handles); i++ {
215217
val, err := fetcher.retrieve(ctx, handles[i])
@@ -219,7 +221,7 @@ func TestValueFetcherRetrieveRandomized(t *testing.T) {
219221
})
220222
t.Run("random", func(t *testing.T) {
221223
var fetcher ValueFetcher
222-
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{})
224+
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{}, 5)
223225
defer fetcher.Close()
224226
for _, i := range rng.Perm(len(handles)) {
225227
val, err := fetcher.retrieve(ctx, handles[i])
@@ -281,7 +283,7 @@ func benchmarkValueFetcherRetrieve(b *testing.B, valueSize int, cacheSize int64)
281283
rp := makeMockReaderProvider(b, obj, cacheSize, handles)
282284
defer rp.Close()
283285
var fetcher ValueFetcher
284-
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{})
286+
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{}, 5)
285287
defer fetcher.Close()
286288
b.ResetTimer()
287289
for i := 0; i < b.N; i++ {
@@ -301,7 +303,7 @@ func benchmarkValueFetcherRetrieve(b *testing.B, valueSize int, cacheSize int64)
301303
indices[i] = testutils.RandIntInRange(rng, 0, len(handles))
302304
}
303305
var fetcher ValueFetcher
304-
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{})
306+
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{}, 5)
305307
defer fetcher.Close()
306308
b.ResetTimer()
307309
for i := 0; i < b.N; i++ {
@@ -339,7 +341,7 @@ func makeMockReaderProvider(
339341
// blocks.
340342
if cacheSize > 0 {
341343
var fetcher ValueFetcher
342-
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{})
344+
fetcher.Init(identityFileMapping{}, rp, block.ReadEnv{}, 5)
343345
defer fetcher.Close()
344346
for i, h := range handles {
345347
if i > 0 && handles[i-1].BlockID == h.BlockID {

0 commit comments

Comments
 (0)