Skip to content

Commit 222c46b

Browse files
committed
db: propagate *PhysicalBlobFile to reportCorruption
If corruption is encountered while reading a blob file, propagate the blob file's *PhysicalBlobFile metadata struct to DB.reportCorruption. This fixes a nil pointer that would previously result while handling blob file corruption. Fixes #5127.
1 parent bbd7e03 commit 222c46b

20 files changed

+240
-104
lines changed

blob_rewrite.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,12 @@ func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
181181
// Now that we have the manifest lock, check if the blob file is
182182
// still current. If not, we bubble up ErrCancelledCompaction.
183183
v := d.mu.versions.currentVersion()
184-
currentDiskFile, ok := v.BlobFiles.Lookup(c.input.FileID)
184+
currentDiskFile, ok := v.BlobFiles.LookupPhysical(c.input.FileID)
185185
if !ok {
186186
return versionUpdate{}, errors.Wrapf(ErrCancelledCompaction,
187187
"blob file %s became unreferenced", c.input.FileID)
188188
}
189-
currentDiskFileNum := currentDiskFile.DiskFileNum()
189+
currentDiskFileNum := currentDiskFile.FileNum
190190
// Assert that the current version's disk file number for the blob
191191
// matches the one we rewrote. This compaction should be the only
192192
// rewrite compaction running for this blob file.
@@ -391,7 +391,7 @@ func newBlobFileRewriter(
391391
sstables []*manifest.TableMetadata,
392392
inputBlob manifest.BlobFileMetadata,
393393
) *blobFileRewriter {
394-
rw := blob.NewFileRewriter(inputBlob.FileID, inputBlob.Physical.FileNum, fc, readEnv, outputFileNum, w, opts)
394+
rw := blob.NewFileRewriter(inputBlob.FileID, inputBlob.Physical, fc, readEnv, outputFileNum, w, opts)
395395
return &blobFileRewriter{
396396
fc: fc,
397397
readEnv: readEnv,

blob_rewrite_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func TestBlobRewrite(t *testing.T) {
173173
objStore,
174174
&base.LoggerWithNoopTracer{Logger: base.DefaultLogger},
175175
sstable.ReaderOptions{},
176-
func(any, error) error { return nil },
176+
func(base.ObjectInfo, error) error { return nil },
177177
)
178178
var sstables []*manifest.TableMetadata
179179
for _, sstFileNum := range sstableFileNums {
@@ -421,6 +421,10 @@ type constantFileMapping base.DiskFileNum
421421
// Assert that (*inputFileMapping) implements base.BlobFileMapping.
422422
var _ base.BlobFileMapping = constantFileMapping(0)
423423

424-
func (m constantFileMapping) Lookup(fileID base.BlobFileID) (base.DiskFile, bool) {
425-
return base.DiskFileNum(m), true
424+
func (m constantFileMapping) Lookup(fileID base.BlobFileID) (base.ObjectInfo, bool) {
425+
return base.ObjectInfoLiteral{
426+
FileType: base.FileTypeBlob,
427+
DiskFileNum: base.DiskFileNum(m),
428+
Bounds: base.UserKeyBounds{},
429+
}, true
426430
}

checkpoint.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,11 +326,11 @@ func (d *DB) Checkpoint(
326326
includedBlobFiles[ref.FileID] = struct{}{}
327327

328328
// Map the BlobFileID to a DiskFileNum in the current version.
329-
diskFile, ok := current.BlobFiles.Lookup(ref.FileID)
329+
obj, ok := current.BlobFiles.Lookup(ref.FileID)
330330
if !ok {
331331
return errors.Errorf("blob file %s not found", ref.FileID)
332332
}
333-
ckErr = copyFile(base.FileTypeBlob, diskFile.DiskFileNum())
333+
ckErr = copyFile(obj.FileInfo())
334334
if ckErr != nil {
335335
return ckErr
336336
}

db.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,11 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer,
633633
}
634634
return nil, nil, ErrNotFound
635635
}
636-
return i.Value(), i, nil
636+
val, err := i.ValueAndErr()
637+
if err != nil {
638+
return nil, nil, errors.CombineErrors(err, i.Close())
639+
}
640+
return val, i, nil
637641
}
638642

639643
// Set sets the value for the given key. It overwrites any previous value

event.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,24 +1343,11 @@ func (r *lowDiskSpaceReporter) findThreshold(
13431343

13441344
// reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
13451345
// to the event listener and also adds a DataCorruptionInfo payload to the error.
1346-
func (d *DB) reportCorruption(meta any, err error) error {
1347-
switch meta := meta.(type) {
1348-
case *manifest.TableMetadata:
1349-
return d.reportFileCorruption(base.FileTypeTable, meta.TableBacking.DiskFileNum, meta.UserKeyBounds(), err)
1350-
case *manifest.PhysicalBlobFile:
1351-
// TODO(jackson): Add bounds for blob files.
1352-
return d.reportFileCorruption(base.FileTypeBlob, meta.FileNum, base.UserKeyBounds{}, err)
1353-
default:
1354-
panic(fmt.Sprintf("unknown metadata type: %T", meta))
1355-
}
1356-
}
1357-
1358-
func (d *DB) reportFileCorruption(
1359-
fileType base.FileType, fileNum base.DiskFileNum, userKeyBounds base.UserKeyBounds, err error,
1360-
) error {
1346+
func (d *DB) reportCorruption(meta base.ObjectInfo, err error) error {
13611347
if invariants.Enabled && !IsCorruptionError(err) {
13621348
panic("not a corruption error")
13631349
}
1350+
fileType, fileNum := meta.FileInfo()
13641351

13651352
objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
13661353
if lookupErr != nil {
@@ -1384,7 +1371,7 @@ func (d *DB) reportFileCorruption(
13841371
Path: path,
13851372
IsRemote: objMeta.IsRemote(),
13861373
Locator: objMeta.Remote.Locator,
1387-
Bounds: userKeyBounds,
1374+
Bounds: meta.UserKeyBounds(),
13881375
Details: err,
13891376
}
13901377
d.opts.EventListener.DataCorruption(info)

event_listener_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,3 +613,80 @@ func TestSSTCorruptionEvent(t *testing.T) {
613613
})
614614
}
615615
}
616+
617+
func TestBlobCorruptionEvent(t *testing.T) {
618+
for _, test := range []string{"missing-file"} {
619+
t.Run(test, func(t *testing.T) {
620+
var mu sync.Mutex
621+
var events []DataCorruptionInfo
622+
fs := vfs.NewMem()
623+
opts := &Options{
624+
FS: fs,
625+
Logger: testLogger{t},
626+
FormatMajorVersion: FormatValueSeparation,
627+
EventListener: &EventListener{
628+
DataCorruption: func(info DataCorruptionInfo) {
629+
mu.Lock()
630+
defer mu.Unlock()
631+
events = append(events, info)
632+
},
633+
},
634+
DisableAutomaticCompactions: true,
635+
}
636+
opts.Experimental.ValueSeparationPolicy = func() ValueSeparationPolicy {
637+
return ValueSeparationPolicy{
638+
Enabled: true,
639+
MinimumSize: 1,
640+
MaxBlobReferenceDepth: 10,
641+
}
642+
}
643+
d, err := Open("", opts)
644+
require.NoError(t, err)
645+
646+
key := func(k int) []byte {
647+
return []byte(fmt.Sprintf("key-%05d", k))
648+
}
649+
650+
// Create large values to ensure they get separated into blob files
651+
largeValue := make([]byte, 1000) // 1KB values
652+
for i := range largeValue {
653+
largeValue[i] = byte('a' + (i % 26))
654+
}
655+
656+
for i := 0; i < 10; i++ {
657+
require.NoError(t, d.Set(key(i), largeValue, nil))
658+
}
659+
require.NoError(t, d.Flush())
660+
661+
// We expect a blob file to be created.
662+
files := testutils.CheckErr(fs.List(""))
663+
blobFiles := slices.DeleteFunc(files, func(name string) bool {
664+
return !strings.HasSuffix(name, ".blob")
665+
})
666+
require.Greaterf(t, len(blobFiles), 0, "expected at least one blob file, got %v", files)
667+
for _, blobFileName := range blobFiles {
668+
switch test {
669+
case "missing-file":
670+
require.NoError(t, fs.Remove(blobFileName))
671+
default:
672+
t.Fatalf("invalid test")
673+
}
674+
}
675+
676+
// Try to read a value that should be in the blob file
677+
_, _, err = d.Get(key(5))
678+
require.Error(t, err)
679+
require.True(t, IsCorruptionError(err))
680+
infoInError := ExtractDataCorruptionInfo(err)
681+
require.NotNil(t, infoInError)
682+
require.Greater(t, len(events), 0)
683+
info := events[0]
684+
require.False(t, info.IsRemote)
685+
// Note: Blob files don't currently have user key bounds, so they're empty
686+
require.Equal(t, base.UserKeyBounds{}, info.Bounds)
687+
require.Equal(t, info, *infoInError)
688+
689+
d.Close()
690+
})
691+
}
692+
}

file_cache.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ type fileCacheHandle struct {
118118
iterCount atomic.Int32
119119
sstStatsCollector block.CategoryStatsCollector
120120

121-
// reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. It expects
122-
// the first argument to be a `*TableMetadata`. It returns an error that
123-
// contains more details.
124-
reportCorruptionFn func(any, error) error
121+
// reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. The
122+
// first argument must implement the ObjectInfo interface. Typically callers
123+
// use *TableMetadata or *PhysicalBlobFile to satisfy this interface.
124+
// reportCorruptionFn returns an error that contains more details.
125+
reportCorruptionFn func(base.ObjectInfo, error) error
125126

126127
// This struct is only populated in race builds.
127128
raceMu struct {
@@ -146,7 +147,7 @@ func (c *FileCache) newHandle(
146147
objProvider objstorage.Provider,
147148
loggerAndTracer LoggerAndTracer,
148149
readerOpts sstable.ReaderOptions,
149-
reportCorruptionFn func(any, error) error,
150+
reportCorruptionFn func(base.ObjectInfo, error) error,
150151
) *fileCacheHandle {
151152
c.Ref()
152153

@@ -268,17 +269,17 @@ func (h *fileCacheHandle) findOrCreateTable(
268269
// the given blob file. If a corruption error is encountered,
269270
// reportCorruptionFn() is called.
270271
func (h *fileCacheHandle) findOrCreateBlob(
271-
ctx context.Context, fileNum base.DiskFileNum,
272+
ctx context.Context, info base.ObjectInfo,
272273
) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
274+
ftyp, fileNum := info.FileInfo()
273275
key := fileCacheKey{
274276
handle: h,
275277
fileNum: fileNum,
276-
fileType: base.FileTypeBlob,
278+
fileType: ftyp,
277279
}
278280
valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
279-
// TODO(jackson): Propagate a blob metadata object here.
280281
if err != nil && IsCorruptionError(err) {
281-
err = h.reportCorruptionFn(nil, err)
282+
err = h.reportCorruptionFn(info, err)
282283
}
283284
return valRef, err
284285
}
@@ -388,9 +389,9 @@ func (h *fileCacheHandle) IterCount() int64 {
388389

389390
// GetValueReader returns a blob.ValueReader for blob file identified by fileNum.
390391
func (h *fileCacheHandle) GetValueReader(
391-
ctx context.Context, fileNum base.DiskFileNum,
392+
ctx context.Context, diskFile base.ObjectInfo,
392393
) (r blob.ValueReader, closeFunc func(), err error) {
393-
ref, err := h.findOrCreateBlob(ctx, fileNum)
394+
ref, err := h.findOrCreateBlob(ctx, diskFile)
394395
if err != nil {
395396
return nil, nil, err
396397
}
@@ -803,7 +804,7 @@ func SetupBlobReaderProvider(
803804
objProvider,
804805
opts.LoggerAndTracer,
805806
readOpts,
806-
func(any, error) error { return nil },
807+
func(base.ObjectInfo, error) error { return nil },
807808
)
808809

809810
return provider, cleanup, nil

file_cache_test.go

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,17 @@ func newFileCacheTest(
179179
}
180180
}
181181

182-
func (t *fileCacheTest) fileByIdx(i int) (base.DiskFileNum, base.FileType) {
182+
func (t *fileCacheTest) fileByIdx(i int) base.ObjectInfo {
183183
if i < fileCacheTestNumTables {
184-
return base.DiskFileNum(i), base.FileTypeTable
184+
return base.ObjectInfoLiteral{
185+
FileType: base.FileTypeTable,
186+
DiskFileNum: base.DiskFileNum(i),
187+
}
188+
}
189+
return base.ObjectInfoLiteral{
190+
FileType: base.FileTypeBlob,
191+
DiskFileNum: base.DiskFileNum(i),
185192
}
186-
return base.DiskFileNum(i), base.FileTypeBlob
187193
}
188194

189195
func (t *fileCacheTest) cleanup() {
@@ -192,7 +198,7 @@ func (t *fileCacheTest) cleanup() {
192198
t.blockCache.Unref()
193199
}
194200

195-
func noopCorruptionFn(_ any, err error) error { return err }
201+
func noopCorruptionFn(_ base.ObjectInfo, err error) error { return err }
196202

197203
// newTestHandle creates a filesystem with a set of test tables and an
198204
// associated file cache handle. The caller must close the handle.
@@ -727,9 +733,10 @@ func testFileCacheFrequentlyUsedInternal(t *testing.T, rangeIter bool) {
727733

728734
for i := 0; i < N; i++ {
729735
for _, j := range [...]int{pinned0, i % fileCacheTestNumFiles, pinned1} {
730-
fn, typ := fct.fileByIdx(j)
731-
if typ == base.FileTypeBlob {
732-
_, closeFunc, err := h.GetValueReader(context.Background(), fn)
736+
obj := fct.fileByIdx(j)
737+
ftyp, fn := obj.FileInfo()
738+
if ftyp == base.FileTypeBlob {
739+
_, closeFunc, err := h.GetValueReader(context.Background(), obj)
733740
if err != nil {
734741
t.Fatalf("i=%d, j=%d: get value reader: %v", i, j, err)
735742
}
@@ -787,9 +794,10 @@ func TestSharedFileCacheFrequentlyUsed(t *testing.T) {
787794

788795
for i := 0; i < N; i++ {
789796
for _, j := range [...]int{pinned0, i % fileCacheTestNumFiles, pinned1} {
790-
fn, typ := fct.fileByIdx(j)
791-
if typ == base.FileTypeBlob {
792-
_, closeFunc, err := h1.GetValueReader(context.Background(), fn)
797+
obj := fct.fileByIdx(j)
798+
ftyp, fn := obj.FileInfo()
799+
if ftyp == base.FileTypeBlob {
800+
_, closeFunc, err := h1.GetValueReader(context.Background(), obj)
793801
if err != nil {
794802
t.Fatalf("i=%d, j=%d: get value reader: %v", i, j, err)
795803
}
@@ -847,9 +855,10 @@ func testFileCacheEvictionsInternal(t *testing.T, rangeIter bool) {
847855

848856
rng := rand.New(rand.NewPCG(2, 2))
849857
for i := 0; i < N; i++ {
850-
fn, typ := fct.fileByIdx(rng.IntN(fileCacheTestNumFiles))
851-
if typ == base.FileTypeBlob {
852-
_, closeFunc, err := h.GetValueReader(context.Background(), fn)
858+
obj := fct.fileByIdx(rng.IntN(fileCacheTestNumFiles))
859+
ftyp, fn := obj.FileInfo()
860+
if ftyp == base.FileTypeBlob {
861+
_, closeFunc, err := h.GetValueReader(context.Background(), obj)
853862
if err != nil {
854863
t.Fatalf("i=%d, fn=%d: get value reader: %v", i, fn, err)
855864
}
@@ -873,7 +882,9 @@ func testFileCacheEvictionsInternal(t *testing.T, rangeIter bool) {
873882
}
874883
}
875884

876-
h.Evict(fct.fileByIdx(int(lo + rng.Uint64N(hi-lo))))
885+
obj = fct.fileByIdx(int(lo + rng.Uint64N(hi-lo)))
886+
ftyp, fn = obj.FileInfo()
887+
h.Evict(fn, ftyp)
877888
}
878889

879890
sumEvicted, nEvicted := 0, 0
@@ -923,13 +934,14 @@ func TestSharedFileCacheEvictions(t *testing.T) {
923934
rng := rand.New(rand.NewPCG(0, 0))
924935
for i := 0; i < N; i++ {
925936
j := rng.IntN(fileCacheTestNumFiles)
926-
fn, typ := fct.fileByIdx(j)
927-
if typ == base.FileTypeBlob {
928-
_, closeFunc1, err := h1.GetValueReader(context.Background(), fn)
937+
obj := fct.fileByIdx(j)
938+
ftyp, fn := obj.FileInfo()
939+
if ftyp == base.FileTypeBlob {
940+
_, closeFunc1, err := h1.GetValueReader(context.Background(), obj)
929941
if err != nil {
930942
t.Fatalf("i=%d, fn=%d: get value reader: %v", i, fn, err)
931943
}
932-
_, closeFunc2, err := h2.GetValueReader(context.Background(), fn)
944+
_, closeFunc2, err := h2.GetValueReader(context.Background(), obj)
933945
if err != nil {
934946
t.Fatalf("i=%d, fn=%d: get value reader: %v", i, fn, err)
935947
}
@@ -955,8 +967,12 @@ func TestSharedFileCacheEvictions(t *testing.T) {
955967
}
956968
}
957969

958-
h1.Evict(fct.fileByIdx(int(lo + rng.Uint64N(hi-lo))))
959-
h2.Evict(fct.fileByIdx(int(lo + rng.Uint64N(hi-lo))))
970+
obj = fct.fileByIdx(int(lo + rng.Uint64N(hi-lo)))
971+
ftyp, fn = obj.FileInfo()
972+
h1.Evict(fn, ftyp)
973+
obj = fct.fileByIdx(int(lo + rng.Uint64N(hi-lo)))
974+
ftyp, fn = obj.FileInfo()
975+
h2.Evict(fn, ftyp)
960976
}
961977

962978
check := func(fs *fileCacheTestFS, h *fileCacheHandle) (float64, float64, float64) {
@@ -1089,13 +1105,14 @@ func TestFileCacheRetryAfterFailure(t *testing.T) {
10891105
h, fs := fct.newTestHandle()
10901106

10911107
fs.setOpenError(true /* enabled */)
1092-
_, _, err := h.GetValueReader(ctx, fileCacheTestNumTables)
1108+
obj := fct.fileByIdx(fileCacheTestNumTables)
1109+
_, _, err := h.GetValueReader(ctx, obj)
10931110
if err == nil {
10941111
t.Fatalf("expected failure, but found success")
10951112
}
10961113
require.Equal(t, "pebble: backing file 000200 error: injected error", err.Error())
10971114
fs.setOpenError(false /* enabled */)
1098-
_, closeFunc, err := h.GetValueReader(ctx, fileCacheTestNumTables)
1115+
_, closeFunc, err := h.GetValueReader(ctx, obj)
10991116
require.NoError(t, err)
11001117
closeFunc()
11011118
fs.validateAndCloseHandle(t, h, nil)

0 commit comments

Comments
 (0)