Skip to content

Commit 9630202

Browse files
committed
db: embed DataCorruptionInfo into corruption errors
We now add the `DataCorruptionInfo` to corruption errors and allow users to extract it using `ExtractDataCorruptionInfo()`.
1 parent a3c4df0 commit 9630202

File tree

5 files changed

+49
-21
lines changed

5 files changed

+49
-21
lines changed

event.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/cockroachdb/crlib/crtime"
1414
"github.com/cockroachdb/errors"
15+
errorsjoin "github.com/cockroachdb/errors/join"
1516
"github.com/cockroachdb/pebble/internal/base"
1617
"github.com/cockroachdb/pebble/internal/humanize"
1718
"github.com/cockroachdb/pebble/internal/invariants"
@@ -1144,23 +1145,25 @@ func (r *lowDiskSpaceReporter) findThreshold(
11441145
return threshold, ok
11451146
}
11461147

1147-
func (d *DB) reportCorruption(meta any, err error) {
1148+
// reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
1149+
// to the event listener and also adds a DataCorruptionInfo payload to the error.
1150+
func (d *DB) reportCorruption(meta any, err error) error {
11481151
switch meta := meta.(type) {
11491152
case *manifest.TableMetadata:
1150-
d.reportFileCorruption(base.FileTypeTable, meta.FileBacking.DiskFileNum, meta.UserKeyBounds(), err)
1153+
return d.reportFileCorruption(base.FileTypeTable, meta.FileBacking.DiskFileNum, meta.UserKeyBounds(), err)
11511154
case *manifest.BlobFileMetadata:
11521155
// TODO(jackson): Add bounds for blob files.
1153-
d.reportFileCorruption(base.FileTypeBlob, meta.FileNum, base.UserKeyBounds{}, err)
1156+
return d.reportFileCorruption(base.FileTypeBlob, meta.FileNum, base.UserKeyBounds{}, err)
11541157
default:
11551158
panic(fmt.Sprintf("unknown metadata type: %T", meta))
11561159
}
11571160
}
11581161

11591162
func (d *DB) reportFileCorruption(
11601163
fileType base.FileType, fileNum base.DiskFileNum, userKeyBounds base.UserKeyBounds, err error,
1161-
) {
1162-
if invariants.Enabled && err == nil {
1163-
panic("nil error")
1164+
) error {
1165+
if invariants.Enabled && !IsCorruptionError(err) {
1166+
panic("not a corruption error")
11641167
}
11651168

11661169
objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
@@ -1189,4 +1192,25 @@ func (d *DB) reportFileCorruption(
11891192
Details: err,
11901193
}
11911194
d.opts.EventListener.DataCorruption(info)
1195+
// We don't use errors.Join() because that also annotates with this stack
1196+
// trace which would not be useful.
1197+
return errorsjoin.Join(err, &corruptionDetailError{info: info})
1198+
}
1199+
1200+
type corruptionDetailError struct {
1201+
info DataCorruptionInfo
1202+
}
1203+
1204+
func (e *corruptionDetailError) Error() string {
1205+
return "<corruption detail carrier>"
1206+
}
1207+
1208+
// ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
1209+
// corruption error. Returns nil if there is no such detail.
1210+
func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
1211+
var e *corruptionDetailError
1212+
if errors.As(err, &e) {
1213+
return &e.info
1214+
}
1215+
return nil
11921216
}

event_listener_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,11 +590,15 @@ func TestSSTCorruptionEvent(t *testing.T) {
590590
}
591591
_, _, err = d.Get(key(5))
592592
require.Error(t, err)
593+
require.True(t, IsCorruptionError(err))
594+
infoInError := ExtractDataCorruptionInfo(err)
595+
require.NotNil(t, infoInError)
593596
require.Greater(t, len(events), 0)
594597
info := events[0]
595598
require.Equal(t, info.Path, sstFileName)
596599
require.False(t, info.IsRemote)
597600
require.Equal(t, base.UserKeyBoundsInclusive(key(0), key(99)), info.Bounds)
601+
require.Equal(t, info, *infoInError)
598602

599603
d.Close()
600604
})

file_cache.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ type fileCacheHandle struct {
106106
sstStatsCollector block.CategoryStatsCollector
107107

108108
// reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. It expects
109-
// the first argument to be a `*TableMetadata`.
110-
reportCorruptionFn func(any, error)
109+
// the first argument to be a `*TableMetadata`. It returns an error that
110+
// contains more details.
111+
reportCorruptionFn func(any, error) error
111112

112113
// This struct is only populated in race builds.
113114
raceMu struct {
@@ -132,7 +133,7 @@ func (c *FileCache) newHandle(
132133
objProvider objstorage.Provider,
133134
loggerAndTracer LoggerAndTracer,
134135
readerOpts sstable.ReaderOptions,
135-
reportCorruptionFn func(any, error),
136+
reportCorruptionFn func(any, error) error,
136137
) *fileCacheHandle {
137138
c.Ref()
138139

@@ -241,7 +242,7 @@ func (h *fileCacheHandle) findOrCreateTable(
241242
}
242243
valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
243244
if err != nil && IsCorruptionError(err) {
244-
h.reportCorruptionFn(meta, err)
245+
err = h.reportCorruptionFn(meta, err)
245246
}
246247
return valRef, err
247248
}

file_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (t *fileCacheTest) cleanup() {
192192
t.blockCache.Unref()
193193
}
194194

195-
func noopCorruptionFn(any, error) {}
195+
func noopCorruptionFn(_ any, err error) error { return err }
196196

197197
// newTestHandle creates a filesystem with a set of test tables and an
198198
// associated file cache handle. The caller must close the handle.

sstable/block/block.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -377,8 +377,9 @@ type ReadEnv struct {
377377

378378
// ReportCorruptionFn is called with ReportCorruptionArg and the error
379379
// whenever an SSTable corruption is detected. The argument is used to avoid
380-
// allocating a separate function for each object.
381-
ReportCorruptionFn func(opaque any, err error)
380+
// allocating a separate function for each object. It returns an error with
381+
// more details.
382+
ReportCorruptionFn func(opaque any, err error) error
382383
ReportCorruptionArg any
383384
}
384385

@@ -406,10 +407,11 @@ func (env *ReadEnv) BlockRead(blockLength uint64, readDuration time.Duration) {
406407

407408
// maybeReportCorruption calls the ReportCorruptionFn if the given error
408409
// indicates corruption.
409-
func (env *ReadEnv) maybeReportCorruption(err error) {
410+
func (env *ReadEnv) maybeReportCorruption(err error) error {
410411
if env.ReportCorruptionFn != nil && base.IsCorruptionError(err) {
411-
env.ReportCorruptionFn(env.ReportCorruptionArg, err)
412+
return env.ReportCorruptionFn(env.ReportCorruptionArg, err)
412413
}
414+
return err
413415
}
414416

415417
// A Reader reads blocks from a single file, handling caching, checksum
@@ -471,8 +473,7 @@ func (r *Reader) Read(
471473
}
472474
value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn)
473475
if err != nil {
474-
env.maybeReportCorruption(err)
475-
return BufferHandle{}, err
476+
return BufferHandle{}, env.maybeReportCorruption(err)
476477
}
477478
return value.MakeHandle(), nil
478479
}
@@ -490,8 +491,7 @@ func (r *Reader) Read(
490491
// to report corruption errors separately, since the ReportCorruptionArg
491492
// could be different. In particular, we might read the same physical block
492493
// (e.g. an index block) for two different virtual tables.
493-
env.maybeReportCorruption(err)
494-
return BufferHandle{}, err
494+
return BufferHandle{}, env.maybeReportCorruption(err)
495495
}
496496

497497
if cv != nil {
@@ -507,8 +507,7 @@ func (r *Reader) Read(
507507
value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn)
508508
if err != nil {
509509
crh.SetReadError(err)
510-
env.maybeReportCorruption(err)
511-
return BufferHandle{}, err
510+
return BufferHandle{}, env.maybeReportCorruption(err)
512511
}
513512
crh.SetReadValue(value.v)
514513
return value.MakeHandle(), nil

0 commit comments

Comments
 (0)