Skip to content

Commit 20b0451

Browse files
committed
db: validate sstable's blob value liveness during DebugCheckLevels
In `DebugCheckLevels`, we now validate all sstables in the LSM by checking the correctness of their blob reference liveness index block (if existent). We perform this validation by scanning over each sst, gathering all blob handle references, and comparing each referenced valueID with the bitmap encoded in our blob reference livenss index block. We also ensure that the total value size for said referenced values are tracked correctly. We do not try to do this validation sometimes during invariants as validation will effectively make any test that prints file cache stats non-deterministic. Fixes: #4975
1 parent ba96148 commit 20b0451

File tree

6 files changed

+311
-12
lines changed

6 files changed

+311
-12
lines changed

compaction.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3224,10 +3224,6 @@ func (d *DB) runCompaction(
32243224
defer d.mu.Lock()
32253225

32263226
// Determine whether we should separate values into blob files.
3227-
//
3228-
// TODO(jackson): Currently we never separate values in non-tests. Choose
3229-
// and initialize the appropriate ValueSeparation implementation based on
3230-
// Options and the compaction inputs.
32313227
valueSeparation := c.getValueSeparation(jobID, c, c.tableFormat)
32323228

32333229
result := d.compactAndWrite(jobID, c, snapshots, c.tableFormat, valueSeparation)

compaction_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
3939
"github.com/cockroachdb/pebble/objstorage/remote"
4040
"github.com/cockroachdb/pebble/sstable"
41+
"github.com/cockroachdb/pebble/sstable/blob"
42+
"github.com/cockroachdb/pebble/sstable/block"
4143
"github.com/cockroachdb/pebble/vfs"
4244
"github.com/cockroachdb/pebble/vfs/errorfs"
4345
"github.com/stretchr/testify/require"
@@ -1127,6 +1129,44 @@ func TestCompaction(t *testing.T) {
11271129
}
11281130
return describeLSM(d, verbose)
11291131

1132+
case "validate-blob-reference-index-block":
1133+
var inputTables []*manifest.TableMetadata
1134+
for _, line := range crstrings.Lines(td.Input) {
1135+
// Parse the file number from the filename
1136+
fileName := strings.TrimSuffix(line, ".sst")
1137+
fileNum, err := strconv.ParseUint(fileName, 10, 64)
1138+
if err != nil {
1139+
return err.Error()
1140+
}
1141+
tableNum := base.TableNum(fileNum)
1142+
1143+
d.mu.Lock()
1144+
currentVersion := d.mu.versions.currentVersion()
1145+
d.mu.Unlock()
1146+
1147+
var tableMeta *manifest.TableMetadata
1148+
for _, levelMetadata := range currentVersion.Levels {
1149+
for f := range levelMetadata.All() {
1150+
if f.TableNum == tableNum {
1151+
tableMeta = f
1152+
break
1153+
}
1154+
}
1155+
if tableMeta != nil {
1156+
inputTables = append(inputTables, tableMeta)
1157+
break
1158+
}
1159+
}
1160+
}
1161+
vf := &blob.ValueFetcher{}
1162+
vf.Init(&d.mu.versions.currentVersion().BlobFiles, d.fileCache, block.ReadEnv{})
1163+
defer func() { _ = vf.Close() }()
1164+
err := validateBlobValueLiveness(inputTables, d.fileCache, block.ReadEnv{}, vf)
1165+
if err != nil {
1166+
return err.Error()
1167+
}
1168+
return "validated"
1169+
11301170
case "auto-compact":
11311171
expectedCount := int64(1)
11321172
td.MaybeScanArgs(t, "count", &expectedCount)

internal/compact/run.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ import (
2020
// Result stores the result of a compaction - more specifically, the "data" part
2121
// where we use the compaction iterator to write output tables.
2222
type Result struct {
23-
// Err is the result of the compaction. On success, Err is nil and Tables
24-
// stores the output tables. On failure, Err is set and Tables stores the
25-
// tables created so far (and which need to be cleaned up).
23+
// Err is the result of the compaction. On failure, Err is set, Tables/Blobs
24+
// stores the tables/blobs created so far (and which need to be cleaned up).
2625
Err error
2726
Tables []OutputTable
2827
Blobs []OutputBlob

level_checker.go

Lines changed: 158 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"io"
1212
"iter"
13+
"maps"
1314
"slices"
1415
"sort"
1516

@@ -20,6 +21,7 @@ import (
2021
"github.com/cockroachdb/pebble/sstable"
2122
"github.com/cockroachdb/pebble/sstable/blob"
2223
"github.com/cockroachdb/pebble/sstable/block"
24+
"github.com/cockroachdb/pebble/sstable/colblk"
2325
)
2426

2527
// This file implements DB.CheckLevels() which checks that every entry in the
@@ -360,6 +362,7 @@ type checkConfig struct {
360362
// blobValueFetcher is the ValueFetcher to use when retrieving values stored
361363
// externally in blob files.
362364
blobValueFetcher blob.ValueFetcher
365+
fileCache *fileCacheHandle
363366
}
364367

365368
// cmp is shorthand for comparer.Compare.
@@ -527,6 +530,7 @@ type CheckLevelsStats struct {
527530
// - Point keys in sstables are ordered.
528531
// - Range delete tombstones in sstables are ordered and fragmented.
529532
// - Successful processing of all MERGE records.
533+
// - Each sstable's blob reference liveness block is valid.
530534
func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
531535
// Grab and reference the current readState.
532536
readState := d.loadReadState()
@@ -548,8 +552,9 @@ func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
548552
readEnv: block.ReadEnv{
549553
// TODO(jackson): Add categorized stats.
550554
},
555+
fileCache: d.fileCache,
551556
}
552-
checkConfig.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, checkConfig.readEnv)
557+
checkConfig.blobValueFetcher.Init(&readState.current.BlobFiles, checkConfig.fileCache, checkConfig.readEnv)
553558
defer func() { _ = checkConfig.blobValueFetcher.Close() }()
554559
return checkLevelsInternal(checkConfig)
555560
}
@@ -608,6 +613,8 @@ func checkLevelsInternal(c *checkConfig) (err error) {
608613
mlevels = append(mlevels, simpleMergingIterLevel{})
609614
}
610615
mlevelAlloc := mlevels[start:]
616+
var allTables []*manifest.TableMetadata
617+
611618
// Add L0 files by sublevel.
612619
for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
613620
if current.L0SublevelFiles[sublevel].Empty() {
@@ -621,19 +628,24 @@ func checkLevelsInternal(c *checkConfig) (err error) {
621628
li.initRangeDel(&mlevelAlloc[0])
622629
mlevelAlloc[0].iter = li
623630
mlevelAlloc = mlevelAlloc[1:]
631+
for f := range current.L0SublevelFiles[sublevel].All() {
632+
allTables = append(allTables, f)
633+
}
624634
}
625635
for level := 1; level < len(current.Levels); level++ {
626636
if current.Levels[level].Empty() {
627637
continue
628638
}
629-
630639
iterOpts := IterOptions{logger: c.logger}
631640
li := &levelIter{}
632641
li.init(context.Background(), iterOpts, c.comparer, c.newIters,
633642
current.Levels[level].Iter(), manifest.Level(level), internalOpts)
634643
li.initRangeDel(&mlevelAlloc[0])
635644
mlevelAlloc[0].iter = li
636645
mlevelAlloc = mlevelAlloc[1:]
646+
for f := range current.Levels[level].All() {
647+
allTables = append(allTables, f)
648+
}
637649
}
638650

639651
mergingIter := &simpleMergingIter{}
@@ -648,7 +660,150 @@ func checkLevelsInternal(c *checkConfig) (err error) {
648660
}
649661

650662
// Phase 2: Check that the tombstones are mutually consistent.
651-
return checkRangeTombstones(c)
663+
if err := checkRangeTombstones(c); err != nil {
664+
return err
665+
}
666+
667+
// Phase 3: Validate blob value liveness block for all tables in the LSM.
668+
// TODO(annie): This is a very expensive operation. We should try to reduce
669+
// the amount of work performed. One possibility is to have the caller
670+
// pass in a prng seed and use that to choose which tables to validate.
671+
if err := validateBlobValueLiveness(allTables, c.fileCache, c.readEnv, &c.blobValueFetcher); err != nil {
672+
return err
673+
}
674+
675+
return nil
676+
}
677+
678+
type valuesInfo struct {
679+
valueIDs []int
680+
totalSize int
681+
}
682+
683+
// gatherBlobHandles gathers all the blob handles in an sstable, returning a
684+
// slice of maps; indexing into the slice at `i` is equivalent to retrieving
685+
// each blob.BlockID's referenced blob.BlockValueID for the `i`th blob reference.
686+
func gatherBlobHandles(
687+
ctx context.Context,
688+
r *sstable.Reader,
689+
blobRefs manifest.BlobReferences,
690+
valueFetcher base.ValueFetcher,
691+
) ([]map[blob.BlockID]valuesInfo, error) {
692+
iter, err := r.NewPointIter(ctx, sstable.IterOptions{
693+
BlobContext: sstable.TableBlobContext{
694+
ValueFetcher: valueFetcher,
695+
References: &blobRefs,
696+
},
697+
})
698+
if err != nil {
699+
return nil, err
700+
}
701+
defer func() { _ = iter.Close() }()
702+
703+
referenced := make([]map[blob.BlockID]valuesInfo, len(blobRefs))
704+
for i := range referenced {
705+
referenced[i] = make(map[blob.BlockID]valuesInfo)
706+
}
707+
for kv := iter.First(); kv != nil; kv = iter.Next() {
708+
if kv.V.IsBlobValueHandle() {
709+
lv := kv.V.LazyValue()
710+
handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
711+
refID, ok := blobRefs.IDByBlobFileID(lv.Fetcher.BlobFileID)
712+
if !ok {
713+
return nil, errors.Errorf("blob file ID %d not found in blob references", lv.Fetcher.BlobFileID)
714+
}
715+
blockID := handleSuffix.BlockID
716+
valueID := int(handleSuffix.ValueID)
717+
vi := referenced[refID][blockID]
718+
vi.valueIDs = append(vi.valueIDs, valueID)
719+
vi.totalSize += lv.Len()
720+
referenced[refID][blockID] = vi
721+
}
722+
}
723+
return referenced, nil
724+
}
725+
726+
func performValidationForSSTable(
727+
decoder colblk.ReferenceLivenessBlockDecoder,
728+
tableNum base.TableNum,
729+
referenced []map[blob.BlockID]valuesInfo,
730+
) error {
731+
if len(referenced) != decoder.BlockDecoder().Rows() {
732+
return errors.Errorf("mismatch in number of references in blob value "+
733+
"liveness block: expected=%d found=%d", len(referenced),
734+
decoder.BlockDecoder().Rows())
735+
}
736+
for refID, blockValues := range referenced {
737+
bitmapEncodings := slices.Clone(decoder.LivenessAtReference(refID))
738+
for _, blockEnc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
739+
blockID := blockEnc.BlockID
740+
vi, ok := blockValues[blockID]
741+
if !ok {
742+
return errors.Errorf("dangling refID=%d blockID=%d in blob value "+
743+
"liveness encoding for sstable %d", refID, blockID, tableNum)
744+
}
745+
encodedVals := slices.Collect(sstable.IterSetBitsInRunLengthBitmap(blockEnc.Bitmap))
746+
if !slices.Equal(vi.valueIDs, encodedVals) {
747+
return errors.Errorf("bitmap mismatch for refID=%d blockID=%d: "+
748+
"expected=%v encoded=%v for sstable %d", refID, blockID, vi.valueIDs,
749+
encodedVals, tableNum)
750+
}
751+
if vi.totalSize != blockEnc.ValuesSize {
752+
return errors.Errorf("value size mismatch for refID=%d blockID=%d: "+
753+
"expected=%d encoded=%d for sstable %d", refID, blockID, vi.totalSize,
754+
blockEnc.ValuesSize, tableNum)
755+
}
756+
// Remove the processed blockID from the map so that later,
757+
// we can check if we processed everything. This is to
758+
// ensure that we do not have any missing references in the
759+
// blob reference liveness block for any of the references
760+
// in the sstable.
761+
delete(blockValues, blockID)
762+
}
763+
if len(blockValues) > 0 {
764+
return errors.Errorf("refID=%d blockIDs=%v referenced by sstable %d "+
765+
"is/are not present in blob reference liveness block", refID,
766+
slices.Collect(maps.Keys(blockValues)), tableNum)
767+
}
768+
}
769+
return nil
770+
}
771+
772+
// validateBlobValueLiveness iterates through each table,
773+
// gathering all the blob handles, and then compares the values encoded in the
774+
// blob reference liveness block to the values referenced by the blob handles.
775+
func validateBlobValueLiveness(
776+
tables []*manifest.TableMetadata,
777+
fc *fileCacheHandle,
778+
readEnv block.ReadEnv,
779+
valueFetcher base.ValueFetcher,
780+
) error {
781+
ctx := context.TODO()
782+
var decoder colblk.ReferenceLivenessBlockDecoder
783+
for _, t := range tables {
784+
if len(t.BlobReferences) == 0 {
785+
continue
786+
}
787+
if err := fc.withReader(ctx, readEnv, t, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
788+
// For this sstable, gather all the blob handles -- tracking
789+
// each blob.ReferenceID + blob.BlockID's referenced
790+
// blob.BlockValueIDs.
791+
referenced, err := gatherBlobHandles(ctx, r, t.BlobReferences, valueFetcher)
792+
if err != nil {
793+
return err
794+
}
795+
h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
796+
if err != nil {
797+
return err
798+
}
799+
defer h.Release()
800+
decoder.Init(h.BlockData())
801+
return performValidationForSSTable(decoder, t.TableNum, referenced)
802+
}); err != nil {
803+
return err
804+
}
805+
}
806+
return nil
652807
}
653808

654809
type simpleMergingIterItem struct {

0 commit comments

Comments
 (0)