Skip to content

Commit 3dd8e6f

Browse files
committed
db: factor out version recovery
Refactor the replaying of the manifest to recover the latest Version of the LSM and perform this recovery as a part of recoverState during Open.
1 parent 323c049 commit 3dd8e6f

File tree

6 files changed

+250
-202
lines changed

6 files changed

+250
-202
lines changed

compaction_picker_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,15 +622,15 @@ func TestCompactionPickerL0(t *testing.T) {
622622
latest.l0Organizer.InitCompactingFileInfo(inProgressL0Compactions(inProgressCompactions))
623623
vs := &versionSet{
624624
opts: opts,
625-
latest: *latest,
625+
latest: latest,
626626
cmp: DefaultComparer,
627627
}
628628
vs.versions.Init(nil)
629629
vs.append(version)
630630
picker = &compactionPickerByScore{
631631
opts: opts,
632632
vers: version,
633-
latestVersionState: &vs.latest,
633+
latestVersionState: vs.latest,
634634
baseLevel: baseLevel,
635635
}
636636
vs.picker = picker
@@ -861,7 +861,7 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) {
861861
vs := &versionSet{
862862
opts: opts,
863863
cmp: DefaultComparer,
864-
latest: *latest,
864+
latest: latest,
865865
}
866866
vs.versions.Init(nil)
867867
vs.append(vers)
@@ -1245,7 +1245,7 @@ func TestCompactionOutputFileSize(t *testing.T) {
12451245
vs := &versionSet{
12461246
opts: opts,
12471247
cmp: DefaultComparer,
1248-
latest: *latest,
1248+
latest: latest,
12491249
}
12501250
vs.versions.Init(nil)
12511251
vs.append(vers)

open.go

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,6 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
243243
d.mu.snapshots.snapshotList.init()
244244
d.mu.snapshots.ongoingExcises = make(map[SeqNum]KeyRange)
245245
d.mu.snapshots.ongoingExcisesRemovedCond = sync.NewCond(&d.mu.Mutex)
246-
// logSeqNum is the next sequence number that will be assigned.
247-
// Start assigning sequence numbers from base.SeqNumStart to leave
248-
// room for reserved sequence numbers (see comments around
249-
// SeqNumStart).
250-
d.mu.versions.logSeqNum.Store(base.SeqNumStart)
251246
d.mu.formatVers.vers.Store(uint64(formatVersion))
252247
d.mu.formatVers.marker = rs.fmvMarker
253248
d.openedAt = d.opts.private.timeNow()
@@ -257,29 +252,29 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
257252

258253
jobID := d.newJobIDLocked()
259254

260-
blobRewriteHeuristic := manifest.BlobRewriteHeuristic{
261-
CurrentTime: d.opts.private.timeNow,
262-
MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
263-
}
264-
265-
if !rs.manifestExists {
255+
if rs.recoveredVersion == nil {
266256
// DB does not exist.
267257
if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
268258
return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
269259
}
270-
271-
// Create the DB.
272-
if err := d.mu.versions.create(
260+
// Create a fresh version set and create an initial manifest file.
261+
blobRewriteHeuristic := manifest.BlobRewriteHeuristic{
262+
CurrentTime: d.opts.private.timeNow,
263+
MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
264+
}
265+
if err := d.mu.versions.initNewDB(
273266
jobID, dirname, d.objProvider, opts, rs.manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
274267
return nil, err
275268
}
276269
} else {
277270
if opts.ErrorIfExists {
278271
return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
279272
}
280-
// Load the version set.
281-
if err := d.mu.versions.load(
282-
dirname, d.objProvider, opts, rs.manifestFileNum, rs.manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
273+
// Initialize the version set from the recovered version.
274+
if err := d.mu.versions.initRecoveredDB(
275+
dirname, d.objProvider, opts, rs.recoveredVersion, rs.manifestMarker,
276+
d.FormatMajorVersion, &d.mu.Mutex,
277+
); err != nil {
283278
return nil, err
284279
}
285280
if opts.ErrorIfNotPristine {
@@ -437,13 +432,6 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
437432

438433
d.cleanupManager = openCleanupManager(opts, d.objProvider, d.getDeletionPacerInfo)
439434

440-
if rs.manifestExists && !opts.DisableConsistencyCheck {
441-
curVersion := d.mu.versions.currentVersion()
442-
if err := checkConsistency(curVersion, d.objProvider); err != nil {
443-
return nil, err
444-
}
445-
}
446-
447435
fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
448436
if opts.FileCache == nil {
449437
opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)

recovery.go

Lines changed: 200 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,15 @@
55
package pebble
66

77
import (
8+
"io"
9+
810
"github.com/cockroachdb/errors"
911
"github.com/cockroachdb/pebble/internal/base"
12+
"github.com/cockroachdb/pebble/internal/invariants"
13+
"github.com/cockroachdb/pebble/internal/manifest"
1014
"github.com/cockroachdb/pebble/objstorage"
1115
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
16+
"github.com/cockroachdb/pebble/record"
1217
"github.com/cockroachdb/pebble/vfs"
1318
"github.com/cockroachdb/pebble/vfs/atomicfs"
1419
)
@@ -39,10 +44,7 @@ func (rs *recoveredState) init(opts *Options, dirname string) error {
3944
if err != nil {
4045
return errors.Wrapf(err, "pebble: database %q", dirname)
4146
}
42-
rs.manifestMarker, rs.manifestFileNum, rs.manifestExists, err = findCurrentManifest(opts.FS, dirname, rs.ls)
43-
if err != nil {
44-
return errors.Wrapf(err, "pebble: database %q", dirname)
45-
}
47+
4648
// Open the object storage provider.
4749
providerSettings := opts.MakeObjStorageProviderSettings(dirname)
4850
providerSettings.FSDirInitialListing = rs.ls
@@ -51,6 +53,26 @@ func (rs *recoveredState) init(opts *Options, dirname string) error {
5153
return errors.Wrapf(err, "pebble: database %q", dirname)
5254
}
5355

56+
// Determine which manifest is current, and if one exists, replay it to
57+
// recover the current Version of the LSM.
58+
var manifestExists bool
59+
rs.manifestMarker, rs.manifestFileNum, manifestExists, err = findCurrentManifest(opts.FS, dirname, rs.ls)
60+
if err != nil {
61+
return errors.Wrapf(err, "pebble: database %q", dirname)
62+
}
63+
if manifestExists {
64+
recoveredVersion, err := recoverVersion(opts, dirname, rs.objProvider, rs.manifestFileNum)
65+
if err != nil {
66+
return err
67+
}
68+
if !opts.DisableConsistencyCheck {
69+
if err := checkConsistency(recoveredVersion.version, rs.objProvider); err != nil {
70+
return err
71+
}
72+
}
73+
rs.recoveredVersion = recoveredVersion
74+
}
75+
5476
// Identify the maximal file number in the directory. We do not want to
5577
// reuse any existing file numbers even if they are obsolete file numbers to
5678
// avoid modifying an ingested sstable's original external file.
@@ -90,11 +112,11 @@ type recoveredState struct {
90112
ls []string
91113
manifestMarker *atomicfs.Marker
92114
manifestFileNum base.DiskFileNum
93-
manifestExists bool
94115
maxFilenumUsed base.DiskFileNum
95116
obsoleteTempFilenames []string
96117
objProvider objstorage.Provider
97118
previousOptionsFilename string
119+
recoveredVersion *recoveredVersion
98120
}
99121

100122
// RemoveObsolete removes obsolete files uncovered during recovery.
@@ -132,3 +154,176 @@ func (rs *recoveredState) Close() error {
132154
}
133155
return err
134156
}
157+
158+
// recoveredVersion describes the latest Version of the LSM recovered by
159+
// replaying a manifest file.
160+
type recoveredVersion struct {
161+
manifestFileNum base.DiskFileNum
162+
minUnflushedLogNum base.DiskFileNum
163+
nextFileNum base.DiskFileNum
164+
logSeqNum base.SeqNum
165+
latest *latestVersionState
166+
metrics Metrics
167+
version *manifest.Version
168+
}
169+
170+
// recoverVersion replays the named manifest file to recover the latest version
171+
// of the LSM from persisted state.
172+
func recoverVersion(
173+
opts *Options, dirname string, provider objstorage.Provider, manifestFileNum base.DiskFileNum,
174+
) (*recoveredVersion, error) {
175+
vs := &recoveredVersion{
176+
manifestFileNum: manifestFileNum,
177+
nextFileNum: 1,
178+
logSeqNum: base.SeqNumStart,
179+
latest: &latestVersionState{
180+
l0Organizer: manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
181+
virtualBackings: manifest.MakeVirtualBackings(),
182+
},
183+
}
184+
manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
185+
manifestFilename := opts.FS.PathBase(manifestPath)
186+
187+
// Read the versionEdits in the manifest file.
188+
var bve manifest.BulkVersionEdit
189+
bve.AllAddedTables = make(map[base.TableNum]*manifest.TableMetadata)
190+
manifestFile, err := opts.FS.Open(manifestPath)
191+
if err != nil {
192+
return nil, errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
193+
errors.Safe(manifestFilename), dirname)
194+
}
195+
defer manifestFile.Close()
196+
rr := record.NewReader(manifestFile, 0 /* logNum */)
197+
for {
198+
r, err := rr.Next()
199+
if err == io.EOF || record.IsInvalidRecord(err) {
200+
break
201+
}
202+
if err != nil {
203+
return nil, errors.Wrapf(err, "pebble: error when loading manifest file %q",
204+
errors.Safe(manifestFilename))
205+
}
206+
var ve manifest.VersionEdit
207+
err = ve.Decode(r)
208+
if err != nil {
209+
// Break instead of returning an error if the record is corrupted
210+
// or invalid.
211+
if err == io.EOF || record.IsInvalidRecord(err) {
212+
break
213+
}
214+
return nil, err
215+
}
216+
if ve.ComparerName != "" {
217+
if ve.ComparerName != opts.Comparer.Name {
218+
return nil, errors.Errorf("pebble: manifest file %q for DB %q: "+
219+
"comparer name from file %q != comparer name from Options %q",
220+
errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(opts.Comparer.Name))
221+
}
222+
}
223+
if err := bve.Accumulate(&ve); err != nil {
224+
return nil, err
225+
}
226+
if ve.MinUnflushedLogNum != 0 {
227+
vs.minUnflushedLogNum = ve.MinUnflushedLogNum
228+
}
229+
if ve.NextFileNum != 0 {
230+
vs.nextFileNum = base.DiskFileNum(ve.NextFileNum)
231+
}
232+
if ve.LastSeqNum != 0 {
233+
// logSeqNum is the _next_ sequence number that will be assigned,
234+
// while LastSeqNum is the last assigned sequence number. Note that
235+
// this behaviour mimics that in RocksDB; the first sequence number
236+
// assigned is one greater than the one present in the manifest
237+
// (assuming no WALs contain higher sequence numbers than the
238+
// manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
239+
// next sequence number that will be assigned.
240+
//
241+
// If LastSeqNum is less than SeqNumStart, increase it to at least
242+
// SeqNumStart to leave ample room for reserved sequence numbers.
243+
vs.logSeqNum = max(ve.LastSeqNum+1, base.SeqNumStart)
244+
}
245+
}
246+
247+
// We have already set vs.nextFileNum=1 at the beginning of the function and
248+
// could have only updated it to some other non-zero value, so it cannot be
249+
// 0 here.
250+
if vs.minUnflushedLogNum == 0 {
251+
if vs.nextFileNum >= 2 {
252+
// We either have a freshly created DB, or a DB created by RocksDB
253+
// that has not had a single flushed SSTable yet. This is because
254+
// RocksDB bumps up nextFileNum in this case without bumping up
255+
// minUnflushedLogNum, even if WALs with non-zero file numbers are
256+
// present in the directory.
257+
} else {
258+
return nil, base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
259+
errors.Safe(manifestFilename), dirname)
260+
}
261+
}
262+
vs.nextFileNum = max(vs.nextFileNum, vs.minUnflushedLogNum+1)
263+
264+
// Populate the virtual backings for virtual sstables since we have finished
265+
// version edit accumulation.
266+
for _, b := range bve.AddedFileBacking {
267+
isLocal := objstorage.IsLocalTable(provider, b.DiskFileNum)
268+
vs.latest.virtualBackings.AddAndRef(b, isLocal)
269+
}
270+
for l, addedLevel := range bve.AddedTables {
271+
for _, m := range addedLevel {
272+
if m.Virtual {
273+
vs.latest.virtualBackings.AddTable(m, l)
274+
}
275+
}
276+
}
277+
278+
if invariants.Enabled {
279+
// There should be no deleted tables or backings, since we're starting
280+
// from an empty state.
281+
for _, deletedLevel := range bve.DeletedTables {
282+
if len(deletedLevel) != 0 {
283+
panic("deleted files after manifest replay")
284+
}
285+
}
286+
if len(bve.RemovedFileBacking) > 0 {
287+
panic("deleted backings after manifest replay")
288+
}
289+
}
290+
291+
emptyVersion := manifest.NewInitialVersion(opts.Comparer)
292+
newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
293+
if err != nil {
294+
return nil, err
295+
}
296+
vs.latest.l0Organizer.PerformUpdate(vs.latest.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
297+
vs.latest.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
298+
vs.latest.blobFiles.Init(&bve, manifest.BlobRewriteHeuristic{
299+
CurrentTime: opts.private.timeNow,
300+
MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
301+
})
302+
vs.version = newVersion
303+
304+
for i := range vs.metrics.Levels {
305+
l := &vs.metrics.Levels[i]
306+
l.TablesCount = int64(newVersion.Levels[i].Len())
307+
files := newVersion.Levels[i].Slice()
308+
l.TablesSize = int64(files.TableSizeSum())
309+
}
310+
for _, l := range newVersion.Levels {
311+
for f := range l.All() {
312+
if !f.Virtual {
313+
isLocal, localSize := sizeIfLocal(f.TableBacking, provider)
314+
vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
315+
if isLocal {
316+
vs.metrics.Table.Local.LiveCount++
317+
}
318+
}
319+
}
320+
}
321+
for backing := range vs.latest.virtualBackings.All() {
322+
isLocal, localSize := sizeIfLocal(backing, provider)
323+
vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
324+
if isLocal {
325+
vs.metrics.Table.Local.LiveCount++
326+
}
327+
}
328+
return vs, nil
329+
}

testdata/checkpoint_shared

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,9 @@ lock: checkpoints/checkpoint1/LOCK
329329
open-dir: checkpoints/checkpoint1
330330
open-dir: checkpoints/checkpoint1
331331
open-dir: checkpoints/checkpoint1
332-
open-dir: checkpoints/checkpoint1
333332
open: checkpoints/checkpoint1/REMOTE-OBJ-CATALOG-000001
334333
close: checkpoints/checkpoint1/REMOTE-OBJ-CATALOG-000001
334+
open-dir: checkpoints/checkpoint1
335335
open: checkpoints/checkpoint1/MANIFEST-000001
336336
close: checkpoints/checkpoint1/MANIFEST-000001
337337
open-dir: checkpoints/checkpoint1
@@ -381,9 +381,9 @@ lock: checkpoints/checkpoint2/LOCK
381381
open-dir: checkpoints/checkpoint2
382382
open-dir: checkpoints/checkpoint2
383383
open-dir: checkpoints/checkpoint2
384-
open-dir: checkpoints/checkpoint2
385384
open: checkpoints/checkpoint2/REMOTE-OBJ-CATALOG-000001
386385
close: checkpoints/checkpoint2/REMOTE-OBJ-CATALOG-000001
386+
open-dir: checkpoints/checkpoint2
387387
open: checkpoints/checkpoint2/MANIFEST-000001
388388
close: checkpoints/checkpoint2/MANIFEST-000001
389389
open-dir: checkpoints/checkpoint2

0 commit comments

Comments
 (0)