Skip to content

Commit 6e324c2

Browse files
committed
db: include blob files in checkpoints
When checkpointing the database through (*pebble.DB).Checkpoint, link or copy blob files referenced by sstables included in the checkpoint. When a checkpoint operation is limited by key range, the checkpoint will exclude blob files that are part of the current Version of the LSM but not referenced by any sstable overlapping the checkpointed key ranges. Similarly to excluded sstables, these excluded blob files will be marked as removed by a final version edit appended to the checkpoint's MANIFEST so that the MANIFEST accurately reflects the set of blob files in the checkpoint. Close #4555.
1 parent 0ef2427 commit 6e324c2

File tree

4 files changed

+350
-28
lines changed

4 files changed

+350
-28
lines changed

checkpoint.go

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/errors"
1313
"github.com/cockroachdb/errors/oserror"
1414
"github.com/cockroachdb/pebble/internal/base"
15+
"github.com/cockroachdb/pebble/internal/manifest"
1516
"github.com/cockroachdb/pebble/record"
1617
"github.com/cockroachdb/pebble/vfs"
1718
"github.com/cockroachdb/pebble/vfs/atomicfs"
@@ -196,6 +197,7 @@ func (d *DB) Checkpoint(
196197
d.mu.versions.virtualBackings.ForEach(func(backing *fileBacking) {
197198
virtualBackingFiles[backing.DiskFileNum] = struct{}{}
198199
})
200+
versionBlobFiles := d.mu.versions.blobFiles.Metadatas()
199201

200202
// Acquire the logs while holding mutexes to ensure we don't race with a
201203
// flush that might mark a log that's relevant to `current` as obsolete
@@ -263,10 +265,32 @@ func (d *DB) Checkpoint(
263265
}
264266

265267
var excludedTables map[deletedFileEntry]*tableMetadata
268+
var includedBlobFiles map[base.DiskFileNum]struct{}
266269
var remoteFiles []base.DiskFileNum
267270
// Set of FileBacking.DiskFileNum which will be required by virtual sstables
268271
// in the checkpoint.
269272
requiredVirtualBackingFiles := make(map[base.DiskFileNum]struct{})
273+
274+
copyFile := func(typ base.FileType, fileNum base.DiskFileNum) error {
275+
meta, err := d.objProvider.Lookup(typ, fileNum)
276+
if err != nil {
277+
return err
278+
}
279+
if meta.IsRemote() {
280+
// We don't copy remote files. This is desirable as checkpointing is
281+
// supposed to be a fast operation, and references to remote files can
282+
// always be resolved by any checkpoint readers by reading the object
283+
// catalog. We don't add this file to excludedFiles either, as that'd
284+
// cause it to be deleted in the second manifest entry which is also
285+
// inaccurate.
286+
remoteFiles = append(remoteFiles, meta.DiskFileNum)
287+
return nil
288+
}
289+
srcPath := base.MakeFilepath(fs, d.dirname, typ, fileNum)
290+
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
291+
return vfs.LinkOrCopy(fs, srcPath, destPath)
292+
}
293+
270294
// Link or copy the sstables.
271295
for l := range current.Levels {
272296
iter := current.Levels[l].Iter()
@@ -282,32 +306,30 @@ func (d *DB) Checkpoint(
282306
continue
283307
}
284308

309+
// Copy any referenced blob files that have not already been copied.
310+
if len(f.BlobReferences) > 0 {
311+
if includedBlobFiles == nil {
312+
includedBlobFiles = make(map[base.DiskFileNum]struct{})
313+
}
314+
for _, ref := range f.BlobReferences {
315+
if _, ok := includedBlobFiles[ref.FileNum]; !ok {
316+
includedBlobFiles[ref.FileNum] = struct{}{}
317+
ckErr = copyFile(base.FileTypeBlob, ref.FileNum)
318+
if ckErr != nil {
319+
return ckErr
320+
}
321+
}
322+
}
323+
}
324+
285325
fileBacking := f.FileBacking
286326
if f.Virtual {
287327
if _, ok := requiredVirtualBackingFiles[fileBacking.DiskFileNum]; ok {
288328
continue
289329
}
290330
requiredVirtualBackingFiles[fileBacking.DiskFileNum] = struct{}{}
291331
}
292-
meta, err := d.objProvider.Lookup(base.FileTypeTable, fileBacking.DiskFileNum)
293-
if err != nil {
294-
ckErr = err
295-
return ckErr
296-
}
297-
if meta.IsRemote() {
298-
// We don't copy remote files. This is desirable as checkpointing is
299-
// supposed to be a fast operation, and references to remote files can
300-
// always be resolved by any checkpoint readers by reading the object
301-
// catalog. We don't add this file to excludedFiles either, as that'd
302-
// cause it to be deleted in the second manifest entry which is also
303-
// inaccurate.
304-
remoteFiles = append(remoteFiles, meta.DiskFileNum)
305-
continue
306-
}
307-
308-
srcPath := base.MakeFilepath(fs, d.dirname, base.FileTypeTable, fileBacking.DiskFileNum)
309-
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
310-
ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
332+
ckErr = copyFile(base.FileTypeTable, fileBacking.DiskFileNum)
311333
if ckErr != nil {
312334
return ckErr
313335
}
@@ -322,16 +344,29 @@ func (d *DB) Checkpoint(
322344
removeBackingTables = append(removeBackingTables, diskFileNum)
323345
}
324346
}
347+
// Record the blob files that are not referenced by any included sstables.
348+
// When we write the MANIFEST of the checkpoint, we'll include a final
349+
// VersionEdit that removes these blob files so that the checkpointed
350+
// manifest is consistent.
351+
var excludedBlobFiles map[base.DiskFileNum]*manifest.BlobFileMetadata
352+
if len(includedBlobFiles) < len(versionBlobFiles) {
353+
excludedBlobFiles = make(map[base.DiskFileNum]*manifest.BlobFileMetadata, len(versionBlobFiles)-len(includedBlobFiles))
354+
for _, blobFile := range versionBlobFiles {
355+
if _, ok := includedBlobFiles[blobFile.FileNum]; !ok {
356+
excludedBlobFiles[blobFile.FileNum] = blobFile
357+
}
358+
}
359+
}
325360

326361
ckErr = d.writeCheckpointManifest(
327362
fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
328-
excludedTables, removeBackingTables,
363+
excludedTables, removeBackingTables, excludedBlobFiles,
329364
)
330365
if ckErr != nil {
331366
return ckErr
332367
}
333368
if len(remoteFiles) > 0 {
334-
ckErr = d.objProvider.CheckpointState(fs, destDir, base.FileTypeTable, remoteFiles)
369+
ckErr = d.objProvider.CheckpointState(fs, destDir, remoteFiles)
335370
if ckErr != nil {
336371
return ckErr
337372
}
@@ -429,6 +464,7 @@ func (d *DB) writeCheckpointManifest(
429464
manifestSize int64,
430465
excludedTables map[deletedFileEntry]*tableMetadata,
431466
removeBackingTables []base.DiskFileNum,
467+
excludedBlobFiles map[base.DiskFileNum]*manifest.BlobFileMetadata,
432468
) error {
433469
// Copy the MANIFEST, and create a pointer to it. We copy rather
434470
// than link because additional version edits added to the
@@ -477,11 +513,12 @@ func (d *DB) writeCheckpointManifest(
477513
}
478514
}
479515

480-
if len(excludedTables) > 0 {
516+
if len(excludedTables) > 0 || len(excludedBlobFiles) > 0 {
481517
// Write out an additional VersionEdit that deletes the excluded SST files.
482518
ve := versionEdit{
483519
DeletedTables: excludedTables,
484520
RemovedBackingTables: removeBackingTables,
521+
DeletedBlobFiles: excludedBlobFiles,
485522
}
486523

487524
rw, err := w.Next()

objstorage/objstorage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ type Provider interface {
338338
// CheckpointState saves any saved state on local disk to the specified
339339
// directory on the specified VFS. A new Pebble instance instantiated at that
340340
// path should be able to resolve references to the specified files.
341-
CheckpointState(fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum) error
341+
CheckpointState(fs vfs.FS, dir string, fileNums []base.DiskFileNum) error
342342

343343
// Metrics returns metrics about objstorage. Currently, it only returns metrics
344344
// about the shared cache.

objstorage/objstorageprovider/provider.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -541,17 +541,15 @@ func (p *provider) Metrics() sharedcache.Metrics {
541541
}
542542

543543
// CheckpointState is part of the objstorage.Provider interface.
544-
func (p *provider) CheckpointState(
545-
fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
546-
) error {
544+
func (p *provider) CheckpointState(fs vfs.FS, dir string, fileNums []base.DiskFileNum) error {
547545
p.mu.Lock()
548546
defer p.mu.Unlock()
549547
for i := range fileNums {
550548
if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
551549
return errors.Wrapf(
552550
os.ErrNotExist,
553-
"file %s (type %s) unknown to the objstorage provider",
554-
fileNums[i], fileType,
551+
"file %s unknown to the objstorage provider",
552+
fileNums[i],
555553
)
556554
}
557555
// Prevent this object from deletion, at least for the life of this instance.

0 commit comments

Comments
 (0)