Skip to content

Commit 2ca2aeb

Browse files
committed
blob,manifest: lookup physical blob file in Version
Configure blob.ValueFetcher to take a func to translate a base.BlobFileID to the base.DiskFileNum of the backing blob file. Add a (*BlobFileSet).Lookup function with the same signature, and configure the various users of ValueFetcher to propagate their Version's BlobFileSet's Lookup func. Informs #4802.
1 parent 2eabac8 commit 2ca2aeb

17 files changed

+249
-93
lines changed

checkpoint.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/cockroachdb/pebble/internal/base"
1515
"github.com/cockroachdb/pebble/internal/manifest"
1616
"github.com/cockroachdb/pebble/record"
17-
"github.com/cockroachdb/pebble/sstable/blob"
1817
"github.com/cockroachdb/pebble/vfs"
1918
"github.com/cockroachdb/pebble/vfs/atomicfs"
2019
)
@@ -171,6 +170,9 @@ func (d *DB) Checkpoint(
171170
}
172171

173172
// Disable file deletions.
173+
// We acquire a reference on the version down below that will prevent any
174+
// sstables or blob files from becoming "obsolete" and potentially deleted,
175+
// but this doesn't protect the current WALs or manifests.
174176
d.mu.Lock()
175177
d.disableFileDeletions()
176178
defer func() {
@@ -205,10 +207,17 @@ func (d *DB) Checkpoint(
205207
// before our call to List.
206208
allLogicalLogs := d.mu.log.manager.List()
207209

208-
// Release the manifest and DB.mu so we don't block other operations on
209-
// the database.
210+
// Release the manifest and DB.mu so we don't block other operations on the
211+
// database.
212+
//
213+
// But first reference the version to ensure that the version's in-memory
214+
// state and its physical files remain available for the checkpoint. In
215+
// particular, the Version.BlobFileSet is only valid while a version is
216+
// referenced.
217+
current.Ref()
210218
d.mu.versions.logUnlock()
211219
d.mu.Unlock()
220+
defer current.Unref()
212221

213222
// Wrap the normal filesystem with one which wraps newly created files with
214223
// vfs.NewSyncingFile.
@@ -316,10 +325,11 @@ func (d *DB) Checkpoint(
316325
if _, ok := includedBlobFiles[ref.FileID]; !ok {
317326
includedBlobFiles[ref.FileID] = struct{}{}
318327

319-
// TODO(jackson): Perform a translation to the
320-
// appropriate disk file number once we support blob
321-
// file replacement.
322-
diskFileNum := blob.DiskFileNumTODO(ref.FileID)
328+
// Map the BlobFileID to a DiskFileNum in the current version.
329+
diskFileNum, ok := current.BlobFiles.Lookup(ref.FileID)
330+
if !ok {
331+
return errors.Errorf("blob file %s not found", ref.FileID)
332+
}
323333
ckErr = copyFile(base.FileTypeBlob, diskFileNum)
324334
if ckErr != nil {
325335
return ckErr

compaction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3215,7 +3215,7 @@ func (d *DB) compactAndWrite(
32153215
categoryCompaction,
32163216
),
32173217
}
3218-
c.valueFetcher.Init(d.fileCache, blockReadEnv)
3218+
c.valueFetcher.Init(&c.version.BlobFiles, d.fileCache, blockReadEnv)
32193219
iiopts := internalIterOpts{
32203220
compaction: true,
32213221
readEnv: sstable.ReadEnv{Block: blockReadEnv},

compaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ func TestAutomaticFlush(t *testing.T) {
592592
continue
593593
}
594594
err := func() error {
595-
fetcher, blobContext := sstable.LoadValBlobContext(d.fileCache, &meta.BlobReferences)
595+
fetcher, blobContext := sstable.LoadValBlobContext(&v.BlobFiles, d.fileCache, &meta.BlobReferences)
596596
defer fetcher.Close()
597597

598598
f, err := provider.OpenForReading(context.Background(), base.FileTypeTable, meta.TableBacking.DiskFileNum, objstorage.OpenOptions{})

db.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer,
623623
keyBuf: buf.keyBuf,
624624
}
625625
// Set up a blob value fetcher to use for retrieving values from blob files.
626-
i.blobValueFetcher.Init(d.fileCache, block.NoReadEnv)
626+
i.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, block.NoReadEnv)
627627
get.iiopts.blobValueFetcher = &i.blobValueFetcher
628628

629629
if !i.First() {
@@ -1317,15 +1317,18 @@ func (d *DB) newInternalIter(
13171317
// files in the associated version from being deleted if there is a current
13181318
// compaction. The readState is unref'd by Iterator.Close().
13191319
var readState *readState
1320+
var vers *manifest.Version
13201321
if sOpts.vers == nil {
13211322
if sOpts.readState != nil {
13221323
readState = sOpts.readState
13231324
readState.ref()
1325+
vers = readState.current
13241326
} else {
13251327
readState = d.loadReadState()
1328+
vers = readState.current
13261329
}
1327-
}
1328-
if sOpts.vers != nil {
1330+
} else {
1331+
vers = sOpts.vers
13291332
sOpts.vers.Ref()
13301333
}
13311334

@@ -1352,7 +1355,7 @@ func (d *DB) newInternalIter(
13521355
seqNum: seqNum,
13531356
mergingIter: &buf.merging,
13541357
}
1355-
dbi.blobValueFetcher.Init(d.fileCache, block.ReadEnv{})
1358+
dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{})
13561359

13571360
dbi.opts = *o
13581361
dbi.opts.logger = d.opts.Logger
@@ -1434,7 +1437,11 @@ func (i *Iterator) constructPointIter(
14341437
i.opts.Category,
14351438
),
14361439
}
1437-
i.blobValueFetcher.Init(i.fc, readEnv)
1440+
if i.readState != nil {
1441+
i.blobValueFetcher.Init(&i.readState.current.BlobFiles, i.fc, readEnv)
1442+
} else if i.version != nil {
1443+
i.blobValueFetcher.Init(&i.version.BlobFiles, i.fc, readEnv)
1444+
}
14381445
internalOpts := internalIterOpts{
14391446
readEnv: sstable.ReadEnv{Block: readEnv},
14401447
blobValueFetcher: &i.blobValueFetcher,

internal/manifest/blob_metadata.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,56 @@ func (s *BlobFileSet) Count() int {
343343
return s.tree.Count()
344344
}
345345

346+
// Lookup returns the file number of the physical blob file backing the given
347+
// file ID. It returns false for the second return value if the FileID is not
348+
// present in the set.
349+
func (s *BlobFileSet) Lookup(fileID base.BlobFileID) (base.DiskFileNum, bool) {
350+
phys, ok := s.LookupPhysical(fileID)
351+
if !ok {
352+
return 0, false
353+
}
354+
return phys.FileNum, true
355+
}
356+
357+
// LookupPhysical returns the *PhysicalBlobFile backing the given file ID. It
358+
// returns false for the second return value if the FileID is not present in the
359+
// set.
360+
func (s *BlobFileSet) LookupPhysical(fileID base.BlobFileID) (*PhysicalBlobFile, bool) {
361+
// LookupPhysical is performed during value retrieval to determine the
362+
// physical blob file that should be read, so it's considered to be
363+
// performance sensitive. We manually inline the B-Tree traversal and binary
364+
// search with this in mind.
365+
n := s.tree.root
366+
for n != nil {
367+
var h int
368+
// Logic copied from sort.Search.
369+
i, j := 0, int(n.count)
370+
for i < j {
371+
h = int(uint(i+j) >> 1) // avoid overflow when computing h
372+
// i ≤ h < j
373+
v := stdcmp.Compare(fileID, n.items[h].FileID)
374+
if v == 0 {
375+
// Found the sought blob file.
376+
return n.items[h].Physical, true
377+
} else if v > 0 {
378+
i = h + 1
379+
} else {
380+
j = h
381+
}
382+
}
383+
// If we've reached a lead node without finding fileID, the file is not
384+
// present.
385+
if n.leaf {
386+
return nil, false
387+
}
388+
n = n.children[i]
389+
}
390+
return nil, false
391+
}
392+
393+
// Assert that (*BlobFileSet) implements blob.FileMapping.
394+
var _ blob.FileMapping = (*BlobFileSet)(nil)
395+
346396
// clone returns a copy-on-write clone of the blob file set.
347397
func (s *BlobFileSet) clone() BlobFileSet {
348398
return BlobFileSet{tree: s.tree.Clone()}

internal/manifest/blob_metadata_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,46 @@ func TestCurrentBlobFileSet(t *testing.T) {
141141
return ""
142142
})
143143
}
144+
145+
func TestBlobFileSet_Lookup(t *testing.T) {
146+
const numBlobFiles = 10000
147+
set, files := makeTestBlobFiles(numBlobFiles)
148+
for i := 0; i < numBlobFiles; i++ {
149+
fn, ok := set.Lookup(base.BlobFileID(i))
150+
require.True(t, ok)
151+
require.Equal(t, files[i].FileNum, fn)
152+
}
153+
}
154+
155+
func makeTestBlobFiles(numBlobFiles int) (BlobFileSet, []PhysicalBlobFile) {
156+
files := make([]PhysicalBlobFile, numBlobFiles)
157+
for i := 0; i < numBlobFiles; i++ {
158+
fileNum := base.DiskFileNum(i)
159+
if i%2 == 0 {
160+
fileNum = base.DiskFileNum(2*numBlobFiles + i)
161+
}
162+
files[i] = PhysicalBlobFile{
163+
FileNum: fileNum,
164+
Size: uint64(i),
165+
ValueSize: uint64(i),
166+
CreationTime: uint64(i),
167+
}
168+
}
169+
set := MakeBlobFileSet(nil)
170+
for i := 0; i < numBlobFiles; i++ {
171+
set.insert(BlobFileMetadata{
172+
FileID: base.BlobFileID(i % numBlobFiles),
173+
Physical: &files[i],
174+
})
175+
}
176+
return set, files
177+
}
178+
179+
func BenchmarkBlobFileSet_Lookup(b *testing.B) {
180+
const numBlobFiles = 10000
181+
set, _ := makeTestBlobFiles(numBlobFiles)
182+
b.ResetTimer()
183+
for i := 0; i < b.N; i++ {
184+
_, _ = set.Lookup(base.BlobFileID(i % numBlobFiles))
185+
}
186+
}

level_checker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
549549
// TODO(jackson): Add categorized stats.
550550
},
551551
}
552-
checkConfig.blobValueFetcher.Init(d.fileCache, checkConfig.readEnv)
552+
checkConfig.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, checkConfig.readEnv)
553553
defer func() { _ = checkConfig.blobValueFetcher.Close() }()
554554
return checkLevelsInternal(checkConfig)
555555
}

replay/replay.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
797797
}
798798
var newFiles []base.DiskFileNum
799799
blobRefMap := make(map[base.DiskFileNum]manifest.BlobReferences)
800+
blobFileMap := make(map[base.BlobFileID]base.DiskFileNum)
800801
for _, nf := range ve.NewTables {
801802
newFiles = append(newFiles, nf.Meta.TableBacking.DiskFileNum)
802803
if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum || nf.Level != 0) {
@@ -806,6 +807,9 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
806807
blobRefMap[nf.Meta.TableBacking.DiskFileNum] = nf.Meta.BlobReferences
807808
}
808809
}
810+
for _, bf := range ve.NewBlobFiles {
811+
blobFileMap[bf.FileID] = bf.Physical.FileNum
812+
}
809813
if previousVersion != nil {
810814
// previousVersion contains the current version, and so l0Organizer is
811815
// consistent with it. The subsequent call to currentVersion will
@@ -861,7 +865,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
861865
// Load all of the flushed sstables' keys into a batch.
862866
s.flushBatch = r.d.NewBatch()
863867
err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath,
864-
newFiles, blobRefMap, provider, r.readerOpts, &flushBufs)
868+
newFiles, blobRefMap, blobFileMap, provider, r.readerOpts, &flushBufs)
865869
if err != nil {
866870
return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
867871
}
@@ -995,6 +999,13 @@ func findManifestStart(
995999
return index, info.Size(), nil
9961000
}
9971001

1002+
type blobFileMap map[base.BlobFileID]base.DiskFileNum
1003+
1004+
func (m blobFileMap) Lookup(fileID base.BlobFileID) (base.DiskFileNum, bool) {
1005+
diskFileNum, ok := m[fileID]
1006+
return diskFileNum, ok
1007+
}
1008+
9981009
// loadFlushedSSTableKeys copies keys from the sstables specified by `fileNums`
9991010
// in the directory specified by `path` into the provided batch. Keys are
10001011
// applied to the batch in the order dictated by their sequence numbers within
@@ -1011,6 +1022,7 @@ func loadFlushedSSTableKeys(
10111022
path string,
10121023
fileNums []base.DiskFileNum,
10131024
blobRefMap map[base.DiskFileNum]manifest.BlobReferences,
1025+
blobFileMap blobFileMap,
10141026
provider blob.ReaderProvider,
10151027
readOpts sstable.ReaderOptions,
10161028
bufs *flushBuffers,
@@ -1040,7 +1052,7 @@ func loadFlushedSSTableKeys(
10401052
if bf, ok := blobRefMap[fileNum]; ok {
10411053
blobRefs = &bf
10421054
}
1043-
vf, blobContext := sstable.LoadValBlobContext(provider, blobRefs)
1055+
vf, blobContext := sstable.LoadValBlobContext(blobFileMap, provider, blobRefs)
10441056
defer func() { _ = vf.Close() }()
10451057
iter, err := r.NewIter(sstable.NoTransforms, nil, nil, blobContext)
10461058
if err != nil {

replay/replay_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func TestLoadFlushedSSTableKeys(t *testing.T) {
211211
return err.Error()
212212
}
213213
defer closeFunc()
214-
err = loadFlushedSSTableKeys(b, opts.FS, "", diskFileNums, nil /* blobRefMap */, provider,
214+
err = loadFlushedSSTableKeys(b, opts.FS, "", diskFileNums, nil /* blobRefMap */, nil /* blobFileMap */, provider,
215215
readerOpts, &flushBufs)
216216
if err != nil {
217217
b.Close()

sstable/blob/fetcher.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,22 @@ type ValueReader interface {
3939
ReadIndexBlock(context.Context, block.ReadEnv, objstorage.ReadHandle) (block.BufferHandle, error)
4040
}
4141

42+
// A FileMapping defines the mapping between blob file IDs and disk file numbers.
43+
// It's implemented by *manifest.BlobFileSet.
44+
type FileMapping interface {
45+
// Lookup returns the disk file number for the given blob file ID. It
46+
// returns false for the second return value if the blob file ID is not
47+
// present in the mapping.
48+
Lookup(base.BlobFileID) (base.DiskFileNum, bool)
49+
}
50+
4251
// A ReaderProvider is an interface that can be used to retrieve a ValueReader
4352
// for a given file number.
4453
type ReaderProvider interface {
4554
// GetValueReader returns a ValueReader for the given file number.
4655
GetValueReader(ctx context.Context, fileNum base.DiskFileNum) (r ValueReader, closeFunc func(), err error)
4756
}
4857

49-
// DiskFileNumTODO is a temporary function to convert a BlobFileID to a
50-
// DiskFileNum. It should be removed once the manifest.Version contains a
51-
// mapping.
52-
func DiskFileNumTODO(blobFileID base.BlobFileID) base.DiskFileNum {
53-
return base.DiskFileNum(blobFileID)
54-
}
55-
5658
// A ValueFetcher retrieves values stored out-of-band in separate blob files.
5759
// The ValueFetcher caches accessed file readers to avoid redundant file cache
5860
// and block cache lookups when performing consecutive value retrievals.
@@ -63,6 +65,7 @@ func DiskFileNumTODO(blobFileID base.BlobFileID) base.DiskFileNum {
6365
// When finished with a ValueFetcher, one must call Close to release all cached
6466
// readers and block buffers.
6567
type ValueFetcher struct {
68+
fileMapping FileMapping
6669
readerProvider ReaderProvider
6770
env block.ReadEnv
6871
fetchCount int
@@ -76,7 +79,8 @@ type ValueFetcher struct {
7679
var _ base.ValueFetcher = (*ValueFetcher)(nil)
7780

7881
// Init initializes the ValueFetcher.
79-
func (r *ValueFetcher) Init(rp ReaderProvider, env block.ReadEnv) {
82+
func (r *ValueFetcher) Init(fm FileMapping, rp ReaderProvider, env block.ReadEnv) {
83+
r.fileMapping = fm
8084
r.readerProvider = rp
8185
r.env = env
8286
if r.readerProvider == nil {
@@ -129,7 +133,10 @@ func (r *ValueFetcher) retrieve(ctx context.Context, vh Handle) (val []byte, err
129133
return nil, err
130134
}
131135
}
132-
diskFileNum := DiskFileNumTODO(vh.BlobFileID)
136+
diskFileNum, ok := r.fileMapping.Lookup(vh.BlobFileID)
137+
if !ok {
138+
return nil, errors.AssertionFailedf("blob file %s not found", vh.BlobFileID)
139+
}
133140
if cr.r, cr.closeFunc, err = r.readerProvider.GetValueReader(ctx, diskFileNum); err != nil {
134141
return nil, err
135142
}

0 commit comments

Comments
 (0)