Skip to content

Commit 4f87919

Browse files
committed
db: implement blob.ReaderProvider on fileCacheHandle
Adapt the file cache to implement the blob.ReaderProvider interface, supporting the caching of blob files alongside sstable files. The two file types will share a cache since open sstables and blob files both contribute towards the same open file descriptor limits. Informs #112.
1 parent 9dfc42a commit 4f87919

File tree

7 files changed

+278
-109
lines changed

7 files changed

+278
-109
lines changed

event.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,17 +1150,32 @@ func (r *lowDiskSpaceReporter) findThreshold(
11501150
return threshold, ok
11511151
}
11521152

1153-
func (d *DB) reportSSTableCorruption(meta *manifest.TableMetadata, err error) {
1153+
func (d *DB) reportCorruption(meta any, err error) {
1154+
switch meta := meta.(type) {
1155+
case *manifest.TableMetadata:
1156+
d.reportFileCorruption(base.FileTypeTable, meta.FileBacking.DiskFileNum, meta.UserKeyBounds(), err)
1157+
case *manifest.BlobFileMetadata:
1158+
// TODO(jackson): Add bounds for blob files.
1159+
d.reportFileCorruption(base.FileTypeBlob, meta.FileNum, base.UserKeyBounds{}, err)
1160+
default:
1161+
panic(fmt.Sprintf("unknown metadata type: %T", meta))
1162+
}
1163+
}
1164+
1165+
func (d *DB) reportFileCorruption(
1166+
fileType base.FileType, fileNum base.DiskFileNum, userKeyBounds base.UserKeyBounds, err error,
1167+
) {
11541168
if invariants.Enabled && err == nil {
11551169
panic("nil error")
11561170
}
1157-
objMeta, lookupErr := d.objProvider.Lookup(base.FileTypeTable, meta.FileBacking.DiskFileNum)
1171+
1172+
objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
11581173
if lookupErr != nil {
11591174
// If the object is not known to the provider, it must be a local object
11601175
// that was missing when we opened the store. Remote objects have their
11611176
// metadata in a catalog, so even if the backing object is deleted, the
11621177
// DiskFileNum would still be known.
1163-
objMeta = objstorage.ObjectMetadata{DiskFileNum: meta.FileBacking.DiskFileNum, FileType: base.FileTypeTable}
1178+
objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
11641179
}
11651180
path := d.objProvider.Path(objMeta)
11661181
if objMeta.IsRemote() {
@@ -1176,7 +1191,7 @@ func (d *DB) reportSSTableCorruption(meta *manifest.TableMetadata, err error) {
11761191
Path: path,
11771192
IsRemote: objMeta.IsRemote(),
11781193
Locator: objMeta.Remote.Locator,
1179-
Bounds: meta.UserKeyBounds(),
1194+
Bounds: userKeyBounds,
11801195
Details: err,
11811196
}
11821197
d.opts.EventListener.DataCorruption(info)

file_cache.go

Lines changed: 128 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cockroachdb/pebble/objstorage"
2727
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
2828
"github.com/cockroachdb/pebble/sstable"
29+
"github.com/cockroachdb/pebble/sstable/blob"
2930
"github.com/cockroachdb/pebble/sstable/block"
3031
"github.com/cockroachdb/pebble/sstable/valblk"
3132
"github.com/cockroachdb/redact"
@@ -120,6 +121,9 @@ type fileCacheHandle struct {
120121
}
121122
}
122123

124+
// Assert that *fileCacheHandle implements blob.ReaderProvider.
125+
var _ blob.ReaderProvider = (*fileCacheHandle)(nil)
126+
123127
// newHandle creates a handle for the FileCache which has its own options. Each
124128
// handle has its own set of files in the cache, separate from those of other
125129
// handles.
@@ -128,7 +132,7 @@ func (c *FileCache) newHandle(
128132
objProvider objstorage.Provider,
129133
loggerAndTracer LoggerAndTracer,
130134
readerOpts sstable.ReaderOptions,
131-
reportSSTableCorruptionFn func(*manifest.TableMetadata, error),
135+
reportCorruptionFn func(any, error),
132136
) *fileCacheHandle {
133137
c.Ref()
134138

@@ -140,11 +144,7 @@ func (c *FileCache) newHandle(
140144
}
141145
t.readerOpts = readerOpts
142146
t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
143-
if reportSSTableCorruptionFn != nil {
144-
t.reportCorruptionFn = func(arg any, err error) {
145-
reportSSTableCorruptionFn(arg.(*manifest.TableMetadata), err)
146-
}
147-
}
147+
t.reportCorruptionFn = reportCorruptionFn
148148
if invariants.RaceEnabled {
149149
t.raceMu.openRefs = make(map[uint64][]byte)
150150
}
@@ -190,40 +190,54 @@ func (h *fileCacheHandle) Close() error {
190190

191191
// openFile is called when we insert a new entry in the file cache.
192192
func (h *fileCacheHandle) openFile(
193-
ctx context.Context, fileNum base.DiskFileNum,
194-
) (*sstable.Reader, objstorage.ObjectMetadata, error) {
193+
ctx context.Context, fileNum base.DiskFileNum, fileType base.FileType,
194+
) (io.Closer, objstorage.ObjectMetadata, error) {
195195
f, err := h.objProvider.OpenForReading(
196-
ctx, base.FileTypeTable, fileNum, objstorage.OpenOptions{MustExist: true},
196+
ctx, fileType, fileNum, objstorage.OpenOptions{MustExist: true},
197197
)
198198
if err != nil {
199199
return nil, objstorage.ObjectMetadata{}, err
200200
}
201+
objMeta, err := h.objProvider.Lookup(fileType, fileNum)
202+
if err != nil {
203+
return nil, objstorage.ObjectMetadata{}, err
204+
}
205+
201206
o := h.readerOpts
202207
o.CacheOpts = sstableinternal.CacheOptions{
203208
CacheHandle: h.blockCacheHandle,
204209
FileNum: fileNum,
205210
}
206-
r, err := sstable.NewReader(ctx, f, o)
207-
if err != nil {
208-
return nil, objstorage.ObjectMetadata{}, err
209-
}
210-
objMeta, err := h.objProvider.Lookup(base.FileTypeTable, fileNum)
211-
if err != nil {
212-
r.Close()
213-
return nil, objstorage.ObjectMetadata{}, err
211+
switch fileType {
212+
case base.FileTypeTable:
213+
r, err := sstable.NewReader(ctx, f, o)
214+
if err != nil {
215+
return nil, objMeta, err
216+
}
217+
return r, objMeta, nil
218+
case base.FileTypeBlob:
219+
r, err := blob.NewFileReader(ctx, f, blob.FileReaderOptions{
220+
ReaderOptions: o.ReaderOptions,
221+
})
222+
if err != nil {
223+
return nil, objMeta, err
224+
}
225+
return r, objMeta, nil
226+
default:
227+
panic(errors.AssertionFailedf("pebble: unexpected file cache file type: %s", fileType))
214228
}
215-
return r, objMeta, nil
216229
}
217230

218-
// findOrCreate retrieves an existing reader or creates a new one for the
219-
// backing file of the given table. If a corruption error is encountered,
220-
// reportCorruptionFn() is called.
221-
func (h *fileCacheHandle) findOrCreate(
231+
// findOrCreateTable retrieves an existing sstable reader or creates a new one
232+
// for the backing file of the given table. If a corruption error is
233+
// encountered, reportCorruptionFn() is called.
234+
func (h *fileCacheHandle) findOrCreateTable(
222235
ctx context.Context, meta *manifest.TableMetadata,
223236
) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
224237
key := fileCacheKey{
225-
handle: h,
226-
fileNum: meta.FileBacking.DiskFileNum,
238+
handle: h,
239+
fileNum: meta.FileBacking.DiskFileNum,
240+
fileType: base.FileTypeTable,
227241
}
228242
valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
229243
if err != nil && IsCorruptionError(err) {
@@ -232,9 +246,28 @@ func (h *fileCacheHandle) findOrCreate(
232246
return valRef, err
233247
}
234248

249+
// findOrCreateBlob retrieves an existing blob reader or creates a new one for
250+
// the given blob file. If a corruption error is encountered,
251+
// reportCorruptionFn() is called.
252+
func (h *fileCacheHandle) findOrCreateBlob(
253+
ctx context.Context, fileNum base.DiskFileNum,
254+
) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
255+
key := fileCacheKey{
256+
handle: h,
257+
fileNum: fileNum,
258+
fileType: base.FileTypeBlob,
259+
}
260+
valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
261+
// TODO(jackson): Propagate a blob metadata object here.
262+
if err != nil && IsCorruptionError(err) {
263+
h.reportCorruptionFn(nil, err)
264+
}
265+
return valRef, err
266+
}
267+
235268
// Evict the given file from the file cache and the block cache.
236-
func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum) {
237-
h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum})
269+
func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum, fileType base.FileType) {
270+
h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum, fileType: fileType})
238271
h.blockCacheHandle.EvictFile(fileNum)
239272
}
240273

@@ -247,11 +280,21 @@ func (h *fileCacheHandle) SSTStatsCollector() *block.CategoryStatsCollector {
247280
// FilterMetrics are per-handle.
248281
func (h *fileCacheHandle) Metrics() (CacheMetrics, FilterMetrics) {
249282
m := h.fileCache.c.Metrics()
283+
284+
// The generic cache maintains a count of entries, but it doesn't know which
285+
// entries are sstables and which are blob files, which affects the memory
286+
// footprint of the table cache. So the FileCache maintains its own counts,
287+
// incremented when initializing a new value and decremented by the
288+
// releasing func.
289+
countSSTables := h.fileCache.counts.sstables.Load()
290+
countBlobFiles := h.fileCache.counts.blobFiles.Load()
291+
250292
cm := CacheMetrics{
251293
Hits: m.Hits,
252294
Misses: m.Misses,
253-
Count: m.Count,
254-
Size: m.Size + m.Count*int64(unsafe.Sizeof(sstable.Reader{})),
295+
Count: countSSTables + countBlobFiles,
296+
Size: m.Size + countSSTables*int64(unsafe.Sizeof(sstable.Reader{})) +
297+
countBlobFiles*int64(unsafe.Sizeof(blob.FileReader{})),
255298
}
256299
fm := h.readerOpts.FilterMetricsTracker.Load()
257300
return cm, fm
@@ -287,7 +330,7 @@ func (h *fileCacheHandle) withCommonReader(
287330
meta *tableMetadata,
288331
fn func(sstable.CommonReader, block.ReadEnv) error,
289332
) error {
290-
ref, err := h.findOrCreate(ctx, meta)
333+
ref, err := h.findOrCreateTable(ctx, meta)
291334
if err != nil {
292335
return err
293336
}
@@ -303,7 +346,7 @@ func (h *fileCacheHandle) withReader(
303346
meta physicalMeta,
304347
fn func(*sstable.Reader, block.ReadEnv) error,
305348
) error {
306-
ref, err := h.findOrCreate(ctx, meta.TableMetadata)
349+
ref, err := h.findOrCreateTable(ctx, meta.TableMetadata)
307350
if err != nil {
308351
return err
309352
}
@@ -320,7 +363,7 @@ func (h *fileCacheHandle) withVirtualReader(
320363
meta virtualMeta,
321364
fn func(sstable.VirtualReader, block.ReadEnv) error,
322365
) error {
323-
ref, err := h.findOrCreate(ctx, meta.TableMetadata)
366+
ref, err := h.findOrCreateTable(ctx, meta.TableMetadata)
324367
if err != nil {
325368
return err
326369
}
@@ -335,10 +378,32 @@ func (h *fileCacheHandle) IterCount() int64 {
335378
return int64(h.iterCount.Load())
336379
}
337380

381+
// GetValueReader returns a blob.ValueReader for blob file identified by fileNum.
382+
func (h *fileCacheHandle) GetValueReader(
383+
ctx context.Context, fileNum base.DiskFileNum,
384+
) (r blob.ValueReader, closeFunc func(), err error) {
385+
ref, err := h.findOrCreateBlob(ctx, fileNum)
386+
if err != nil {
387+
return nil, nil, err
388+
}
389+
v := ref.Value()
390+
r = v.mustBlob()
391+
// NB: The call to findOrCreateBlob incremented the value's reference count.
392+
// The closeHook (v.closeHook) takes responsibility for unreferencing the
393+
// value. Take care to avoid introducing an allocation here by adding a
394+
// closure.
395+
closeHook := h.addReference(v)
396+
return r, closeHook, nil
397+
}
398+
338399
// FileCache is a shareable cache for open files. Open files are exclusively
339400
// sstable files today.
340401
type FileCache struct {
341-
refs atomic.Int64
402+
refs atomic.Int64
403+
counts struct {
404+
sstables atomic.Int64
405+
blobFiles atomic.Int64
406+
}
342407

343408
c genericcache.Cache[fileCacheKey, fileCacheValue]
344409
}
@@ -390,17 +455,31 @@ func NewFileCache(numShards int, size int) *FileCache {
390455
vRef.Unref()
391456
handle.iterCount.Add(-1)
392457
}
393-
reader, objMeta, err := handle.openFile(ctx, key.fileNum)
458+
reader, objMeta, err := handle.openFile(ctx, key.fileNum, key.fileType)
394459
if err != nil {
395460
return errors.Wrapf(err, "pebble: backing file %s error", redact.Safe(key.fileNum))
396461
}
397462
v.reader = reader
398463
v.isShared = objMeta.IsShared()
464+
switch key.fileType {
465+
case base.FileTypeTable:
466+
c.counts.sstables.Add(1)
467+
case base.FileTypeBlob:
468+
c.counts.blobFiles.Add(1)
469+
default:
470+
panic("unexpected file type")
471+
}
399472
return nil
400473
}
401474

402475
releaseFn := func(v *fileCacheValue) {
403476
if v.reader != nil {
477+
switch v.reader.(type) {
478+
case *sstable.Reader:
479+
c.counts.sstables.Add(-1)
480+
case *blob.FileReader:
481+
c.counts.blobFiles.Add(-1)
482+
}
404483
_ = v.reader.Close()
405484
v.reader = nil
406485
}
@@ -417,6 +496,11 @@ func NewFileCache(numShards int, size int) *FileCache {
417496
type fileCacheKey struct {
418497
handle *fileCacheHandle
419498
fileNum base.DiskFileNum
499+
// fileType describes the type of file being cached (blob or sstable). A
500+
// file number alone uniquely identifies every file within a DB, but we need
501+
// to propagate the type so the file cache looks for the correct file in
502+
// object storage / the filesystem.
503+
fileType base.FileType
420504
}
421505

422506
// Shard implements the genericcache.Key interface.
@@ -458,7 +542,7 @@ func (h *fileCacheHandle) newIters(
458542
kinds iterKinds,
459543
) (iterSet, error) {
460544
// Calling findOrCreate gives us the responsibility of Unref()ing vRef.
461-
vRef, err := h.findOrCreate(ctx, file)
545+
vRef, err := h.findOrCreateTable(ctx, file)
462546
if err != nil {
463547
return iterSet{}, err
464548
}
@@ -774,8 +858,9 @@ func (rp *tableCacheShardReaderProvider) Close() {
774858
// WARNING! If file is a virtual table, we return the properties of the physical
775859
// table.
776860
func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Properties, error) {
777-
// Calling findOrCreate gives us the responsibility of decrementing v's refCount here
778-
v, err := h.findOrCreate(context.TODO(), file)
861+
// Calling findOrCreateTable gives us the responsibility of decrementing v's
862+
// refCount here
863+
v, err := h.findOrCreateTable(context.TODO(), file)
779864
if err != nil {
780865
return nil, err
781866
}
@@ -787,7 +872,7 @@ func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Prop
787872

788873
type fileCacheValue struct {
789874
closeHook func()
790-
reader io.Closer // *sstable.Reader
875+
reader io.Closer // *sstable.Reader or *blob.FileReader
791876
isShared bool
792877

793878
// readerProvider is embedded here so that we only allocate it once as long as
@@ -804,6 +889,12 @@ func (v *fileCacheValue) mustSSTableReader() *sstable.Reader {
804889
return v.reader.(*sstable.Reader)
805890
}
806891

892+
// mustBlob retrieves the value's *blob.FileReader. It panics if the cached file
893+
// is not a blob file.
894+
func (v *fileCacheValue) mustBlob() *blob.FileReader {
895+
return v.reader.(*blob.FileReader)
896+
}
897+
807898
// iterSet holds a set of iterators of various key kinds, all constructed over
808899
// the same data structure (eg, an sstable). A subset of the fields may be
809900
// populated depending on the `iterKinds` passed to newIters.

0 commit comments

Comments
 (0)