Skip to content

Commit 716613c

Browse files
committed
db: support blob values in the level checker
Update the level checker to initialize a blob value fetcher and use it.
1 parent 1fec080 commit 716613c

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

level_checker.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ import (
1717
"github.com/cockroachdb/pebble/internal/base"
1818
"github.com/cockroachdb/pebble/internal/keyspan"
1919
"github.com/cockroachdb/pebble/internal/manifest"
20+
"github.com/cockroachdb/pebble/sstable"
21+
"github.com/cockroachdb/pebble/sstable/blob"
22+
"github.com/cockroachdb/pebble/sstable/block"
2023
)
2124

2225
// This file implements DB.CheckLevels() which checks that every entry in the
@@ -353,6 +356,10 @@ type checkConfig struct {
353356
stats *CheckLevelsStats
354357
merge Merge
355358
formatKey base.FormatKey
359+
readEnv block.ReadEnv
360+
// blobValueFetcher is the ValueFetcher to use when retrieving values stored
361+
// externally in blob files.
362+
blobValueFetcher blob.ValueFetcher
356363
}
357364

358365
// cmp is shorthand for comparer.Compare.
@@ -538,11 +545,21 @@ func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
538545
stats: stats,
539546
merge: d.merge,
540547
formatKey: d.opts.Comparer.FormatKey,
548+
readEnv: block.ReadEnv{
549+
// TODO(jackson): Add categorized stats.
550+
},
541551
}
552+
checkConfig.blobValueFetcher.Init(d.fileCache, checkConfig.readEnv)
553+
defer func() { _ = checkConfig.blobValueFetcher.Close() }()
542554
return checkLevelsInternal(checkConfig)
543555
}
544556

545557
func checkLevelsInternal(c *checkConfig) (err error) {
558+
internalOpts := internalIterOpts{
559+
readEnv: sstable.ReadEnv{Block: c.readEnv},
560+
blobValueFetcher: &c.blobValueFetcher,
561+
}
562+
546563
// Phase 1: Use a simpleMergingIter to step through all the points and ensure
547564
// that points with the same user key at different levels are not inverted
548565
// wrt sequence numbers and the same holds for tombstones that cover points.
@@ -600,7 +617,7 @@ func checkLevelsInternal(c *checkConfig) (err error) {
600617
iterOpts := IterOptions{logger: c.logger}
601618
li := &levelIter{}
602619
li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
603-
manifest.L0Sublevel(sublevel), internalIterOpts{})
620+
manifest.L0Sublevel(sublevel), internalOpts)
604621
li.initRangeDel(&mlevelAlloc[0])
605622
mlevelAlloc[0].iter = li
606623
mlevelAlloc = mlevelAlloc[1:]
@@ -613,7 +630,7 @@ func checkLevelsInternal(c *checkConfig) (err error) {
613630
iterOpts := IterOptions{logger: c.logger}
614631
li := &levelIter{}
615632
li.init(context.Background(), iterOpts, c.comparer, c.newIters,
616-
current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
633+
current.Levels[level].Iter(), manifest.Level(level), internalOpts)
617634
li.initRangeDel(&mlevelAlloc[0])
618635
mlevelAlloc[0].iter = li
619636
mlevelAlloc = mlevelAlloc[1:]

0 commit comments

Comments
 (0)