Skip to content

Commit 459c8ae

Browse files
committed
db: scan and filter WALs during recovery
Move the scanning of WAL directories' files into recoverState. Semantically, this is a part of recovery.
1 parent 91dfead commit 459c8ae

File tree

2 files changed

+55
-57
lines changed

2 files changed

+55
-57
lines changed

open.go

Lines changed: 12 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
102102
}
103103
}
104104

105-
// Open the database and WAL directories first.
106-
dirs, err := prepareOpenAndLockDirs(dirname, opts)
107-
if err != nil {
108-
err = errors.Wrapf(err, "error opening database at %q", dirname)
109-
err = errors.CombineErrors(err, dirs.Close())
110-
return nil, err
111-
}
112-
// Locks in RecoveryDirLocks can be closed as soon as we've finished opening
113-
// the database.
114-
defer func() { _ = dirs.RecoveryDirLocks.Close() }()
115-
defer maybeCleanUp(dirs.Close)
116-
105+
// Recover the current database state.
117106
rs, err := recoverState(opts, dirname)
118107
if err != nil {
119108
return nil, err
@@ -149,7 +138,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
149138
}
150139

151140
if !opts.ReadOnly {
152-
if err := rs.RemoveObsolete(); err != nil {
141+
if err := rs.RemoveObsolete(opts); err != nil {
153142
return nil, err
154143
}
155144
}
@@ -169,7 +158,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
169158
split: opts.Comparer.Split,
170159
abbreviatedKey: opts.Comparer.AbbreviatedKey,
171160
largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
172-
dirs: dirs,
161+
dirs: rs.dirs,
173162
objProvider: rs.objProvider,
174163
closed: new(atomic.Value),
175164
closedCh: make(chan struct{}),
@@ -297,8 +286,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
297286
})
298287

299288
walOpts := wal.Options{
300-
Primary: dirs.WALPrimary,
301-
Secondary: dirs.WALSecondary,
289+
Primary: rs.dirs.WALPrimary,
290+
Secondary: rs.dirs.WALSecondary,
302291
MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
303292
MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
304293
NoSyncOnClose: opts.NoSyncOnClose,
@@ -323,45 +312,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
323312
return nil, err
324313
}
325314

326-
// Remove obsolete WAL files now (as opposed to relying on asynchronous
327-
// cleanup) to prevent crash loops due to no disk space (ENOSPC).
328-
var retainedWALs wal.Logs
329-
for _, w := range wals {
330-
// Any WALs with file numbers ≥ minUnflushedLogNum must be replayed to
331-
// recover the state.
332-
if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
333-
retainedWALs = append(retainedWALs, w)
334-
continue
335-
}
336-
// Skip removal of obsolete WALs in read-only mode.
337-
if opts.ReadOnly {
338-
continue
339-
}
340-
// Remove obsolete WALs, logging each removal.
341-
for i := range w.NumSegments() {
342-
fs, path := w.SegmentLocation(i)
343-
if err := fs.Remove(path); err != nil {
344-
// It's not a big deal if we can't delete the file now.
345-
// We'll try to remove it later in the cleanup process.
346-
d.opts.EventListener.WALDeleted(WALDeleteInfo{
347-
JobID: 0,
348-
Path: path,
349-
FileNum: base.DiskFileNum(w.Num),
350-
Err: err,
351-
})
352-
retainedWALs = append(retainedWALs, w)
353-
} else {
354-
d.opts.EventListener.WALDeleted(WALDeleteInfo{
355-
JobID: 0,
356-
Path: path,
357-
FileNum: base.DiskFileNum(w.Num),
358-
Err: nil,
359-
})
360-
}
361-
}
362-
}
363-
364-
walManager, err := wal.Init(walOpts, retainedWALs)
315+
walManager, err := wal.Init(walOpts, rs.walsReplay)
365316
if err != nil {
366317
return nil, err
367318
}
@@ -536,6 +487,12 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
536487
d.maybeScheduleFlush()
537488
d.maybeScheduleCompaction()
538489

490+
// Locks in RecoveryDirLocks can be closed as soon as we've finished opening
491+
// the database.
492+
if err = rs.dirs.RecoveryDirLocks.Close(); err != nil {
493+
return nil, err
494+
}
495+
539496
// Note: this is a no-op if invariants are disabled or race is enabled.
540497
//
541498
// Setting a finalizer on *DB causes *DB to never be reclaimed and the

recovery.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/pebble/record"
1818
"github.com/cockroachdb/pebble/vfs"
1919
"github.com/cockroachdb/pebble/vfs/atomicfs"
20+
"github.com/cockroachdb/pebble/wal"
2021
)
2122

2223
// recoverState reads the named database directory and recovers the set of files
@@ -34,9 +35,16 @@ func recoverState(opts *Options, dirname string) (s *recoveredState, err error)
3435
}
3536

3637
func (rs *recoveredState) init(opts *Options, dirname string) error {
38+
dirs, err := prepareOpenAndLockDirs(dirname, opts)
39+
if err != nil {
40+
err = errors.Wrapf(err, "error opening database at %q", dirname)
41+
err = errors.CombineErrors(err, dirs.Close())
42+
return err
43+
}
44+
rs.dirs = dirs
45+
3746
// List the directory contents. This also happens to include WAL log files,
3847
// if they are in the same dir.
39-
var err error
4048
if rs.ls, err = opts.FS.List(dirname); err != nil {
4149
return errors.Wrapf(err, "pebble: database %q", dirname)
4250
}
@@ -100,13 +108,27 @@ func (rs *recoveredState) init(opts *Options, dirname string) error {
100108
rs.obsoleteTempFilenames = append(rs.obsoleteTempFilenames, filename)
101109
}
102110
}
111+
112+
// Find all the WAL files across the various WAL directories.
113+
wals, err := wal.Scan(rs.dirs.WALDirs()...)
114+
if err != nil {
115+
return err
116+
}
117+
for _, w := range wals {
118+
if rs.recoveredVersion == nil || base.DiskFileNum(w.Num) >= rs.recoveredVersion.minUnflushedLogNum {
119+
rs.walsReplay = append(rs.walsReplay, w)
120+
} else {
121+
rs.walsObsolete = append(rs.walsObsolete, w)
122+
}
123+
}
103124
return nil
104125
}
105126

106127
// recoveredState encapsulates state recovered from reading the database
107128
// directory.
108129
type recoveredState struct {
109130
dirname string
131+
dirs *resolvedDirs
110132
fmv FormatMajorVersion
111133
fmvMarker *atomicfs.Marker
112134
fs vfs.FS
@@ -118,10 +140,12 @@ type recoveredState struct {
118140
objProvider objstorage.Provider
119141
previousOptionsFilename string
120142
recoveredVersion *recoveredVersion
143+
walsObsolete wal.Logs
144+
walsReplay wal.Logs
121145
}
122146

123147
// RemoveObsolete removes obsolete files uncovered during recovery.
124-
func (rs *recoveredState) RemoveObsolete() error {
148+
func (rs *recoveredState) RemoveObsolete(opts *Options) error {
125149
var err error
126150
// Atomic markers may leave behind obsolete files if there's a crash
127151
// mid-update.
@@ -137,6 +161,20 @@ func (rs *recoveredState) RemoveObsolete() error {
137161
for _, filename := range rs.obsoleteTempFilenames {
138162
err = errors.CombineErrors(err, rs.fs.Remove(rs.fs.PathJoin(rs.dirname, filename)))
139163
}
164+
// Remove any WAL files that are already obsolete. Pebble keeps some old WAL
165+
// files around for recycling.
166+
for _, w := range rs.walsObsolete {
167+
for i := range w.NumSegments() {
168+
fs, path := w.SegmentLocation(i)
169+
rmErr := fs.Remove(path)
170+
opts.EventListener.WALDeleted(WALDeleteInfo{
171+
JobID: 0,
172+
Path: path,
173+
FileNum: base.DiskFileNum(w.Num),
174+
Err: rmErr,
175+
})
176+
}
177+
}
140178
return err
141179
}
142180

@@ -153,6 +191,9 @@ func (rs *recoveredState) Close() error {
153191
if rs.objProvider != nil {
154192
err = errors.CombineErrors(err, rs.objProvider.Close())
155193
}
194+
if rs.dirs != nil {
195+
err = errors.CombineErrors(err, rs.dirs.Close())
196+
}
156197
return err
157198
}
158199

0 commit comments

Comments
 (0)