Skip to content

Commit 7cb70d4

Browse files
committed
db: delete obsolete files before scheduling flushes, compactions
During Open, scan the directory listing for obsolete files before scheduling a memtable flush or compaction. Previously, contrary to its documentation, DB.scanObsoleteFiles could run in parallel to compactions. Subtly, scanObsoleteFiles would block, waiting for compactions to queisce before proceeding. This ensured that once it proceeded it had a consistent view of the set of live files. This structure was subtle, less-deterministic and blocked Open from proceeding until compactions quiesced. This commit moves the scanning of the directory listing before the scheduling of flushes or compactions, and adds an assertion that no flushes or compactions are running during scanObsoleteFiles. Fix #5420.
1 parent aa9627b commit 7cb70d4

File tree

21 files changed

+435
-347
lines changed

21 files changed

+435
-347
lines changed

obsolete_files.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -357,23 +357,16 @@ func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
357357
return pacerInfo
358358
}
359359

360-
// scanObsoleteFiles scans the filesystem for files that are no longer needed
361-
// and adds those to the internal lists of obsolete files. Note that the files
362-
// are not actually deleted by this method. A subsequent call to
363-
// deleteObsoleteFiles must be performed. Must be not be called concurrently
364-
// with compactions and flushes. db.mu must be held when calling this function.
360+
// scanObsoleteFiles compares the provided directory listing to the set of
361+
// known, in-use files to find files no longer needed and adds those to the
362+
// internal lists of obsolete files. Note that the files are not actually
363+
// deleted by this method. A subsequent call to deleteObsoleteFiles must be
364+
// performed. Must be not be called concurrently with compactions and flushes
365+
// and will panic if any are in-progress. db.mu must be held when calling this
366+
// function.
365367
func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
366-
// Disable automatic compactions temporarily to avoid concurrent compactions /
367-
// flushes from interfering. The original value is restored on completion.
368-
disabledPrev := d.opts.DisableAutomaticCompactions
369-
defer func() {
370-
d.opts.DisableAutomaticCompactions = disabledPrev
371-
}()
372-
d.opts.DisableAutomaticCompactions = true
373-
374-
// Wait for any ongoing compaction to complete before continuing.
375-
for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
376-
d.mu.compact.cond.Wait()
368+
if d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
369+
panic(errors.AssertionFailedf("compaction or flush in progress"))
377370
}
378371

379372
liveFileNums := make(map[base.DiskFileNum]struct{})

open.go

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,45 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
512512
}
513513
d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
514514

515+
if !d.opts.ReadOnly {
516+
// Write the current options to disk.
517+
d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
518+
tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
519+
optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
520+
521+
// Write them to a temporary file first, in case we crash before
522+
// we're done. A corrupt options file prevents opening the
523+
// database.
524+
optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
525+
if err != nil {
526+
return nil, err
527+
}
528+
serializedOpts := []byte(opts.String())
529+
if _, err := optionsFile.Write(serializedOpts); err != nil {
530+
return nil, errors.CombineErrors(err, optionsFile.Close())
531+
}
532+
d.optionsFileSize = uint64(len(serializedOpts))
533+
if err := optionsFile.Sync(); err != nil {
534+
return nil, errors.CombineErrors(err, optionsFile.Close())
535+
}
536+
if err := optionsFile.Close(); err != nil {
537+
return nil, err
538+
}
539+
// Atomically rename to the OPTIONS-XXXXXX path. This rename is
540+
// guaranteed to be atomic because the destination path does not
541+
// exist.
542+
if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
543+
return nil, err
544+
}
545+
if err := d.dataDir.Sync(); err != nil {
546+
return nil, err
547+
}
548+
549+
// Delete any obsolete files.
550+
d.scanObsoleteFiles(rs.ls, flushableIngests)
551+
d.deleteObsoleteFiles(jobID)
552+
}
553+
515554
// Register with the CompactionScheduler before calling
516555
// d.maybeScheduleFlush, since completion of the flush can trigger
517556
// compactions.
@@ -534,7 +573,6 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
534573
d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
535574
}
536575
d.updateReadStateLocked(d.opts.DebugCheck)
537-
538576
if !d.opts.ReadOnly {
539577
// If the Options specify a format major version higher than the
540578
// loaded database's, upgrade it. If this is a new database, this
@@ -554,52 +592,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
554592
return nil, err
555593
}
556594
}
557-
558-
// Write the current options to disk.
559-
d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
560-
tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
561-
optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
562-
563-
// Write them to a temporary file first, in case we crash before
564-
// we're done. A corrupt options file prevents opening the
565-
// database.
566-
optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
567-
if err != nil {
568-
return nil, err
569-
}
570-
serializedOpts := []byte(opts.String())
571-
if _, err := optionsFile.Write(serializedOpts); err != nil {
572-
return nil, errors.CombineErrors(err, optionsFile.Close())
573-
}
574-
d.optionsFileSize = uint64(len(serializedOpts))
575-
if err := optionsFile.Sync(); err != nil {
576-
return nil, errors.CombineErrors(err, optionsFile.Close())
577-
}
578-
if err := optionsFile.Close(); err != nil {
579-
return nil, err
580-
}
581-
// Atomically rename to the OPTIONS-XXXXXX path. This rename is
582-
// guaranteed to be atomic because the destination path does not
583-
// exist.
584-
if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
585-
return nil, err
586-
}
587-
if err := d.dataDir.Sync(); err != nil {
588-
return nil, err
589-
}
590-
}
591-
592-
if !d.opts.ReadOnly {
593-
// Get a fresh list of files, in case some of the earlier flushes/compactions
594-
// have deleted some files.
595-
ls, err := opts.FS.List(dirname)
596-
if err != nil {
597-
return nil, err
598-
}
599-
d.scanObsoleteFiles(ls, flushableIngests)
600-
d.deleteObsoleteFiles(jobID)
601595
}
602-
// Else, nothing is obsolete.
603596

604597
d.mu.tableStats.cond.L = &d.mu.Mutex
605598
d.mu.tableValidation.cond.L = &d.mu.Mutex

open_test.go

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"testing"
3030
"time"
3131

32+
"github.com/cockroachdb/crlib/testutils/leaktest"
3233
"github.com/cockroachdb/datadriven"
3334
"github.com/cockroachdb/errors"
3435
"github.com/cockroachdb/metamorphic"
@@ -278,6 +279,59 @@ func TestOpen_WALFailover(t *testing.T) {
278279
})
279280
}
280281

282+
func TestOpenRecovery(t *testing.T) {
283+
defer leaktest.AfterTest(t)()
284+
285+
mkOpts := func(td *datadriven.TestData) *Options {
286+
opts := &Options{FS: vfs.NewMem(), Logger: testutils.Logger{T: t}}
287+
parseDBOptionsArgs(opts, td.CmdArgs)
288+
return opts
289+
}
290+
var d *DB
291+
var opts *Options
292+
closeDB := func() {
293+
if d != nil {
294+
require.NoError(t, d.Close())
295+
d = nil
296+
}
297+
}
298+
defer closeDB()
299+
datadriven.RunTestAny(t, "testdata/open_recovery", func(t testing.TB, td *datadriven.TestData) string {
300+
switch td.Cmd {
301+
case "batch":
302+
writeBatch := newBatch(d)
303+
if err := runBatchDefineCmd(td, writeBatch); err != nil {
304+
return err.Error()
305+
}
306+
if err := writeBatch.Commit(nil); err != nil {
307+
return err.Error()
308+
}
309+
return ""
310+
case "define":
311+
closeDB()
312+
opts = mkOpts(td)
313+
var err error
314+
d, err = runDBDefineCmd(td, opts)
315+
if err != nil {
316+
return err.Error()
317+
}
318+
return runLSMCmd(td, d)
319+
case "reopen":
320+
closeDB()
321+
var err error
322+
require.NoError(t, parseDBOptionsArgs(opts, td.CmdArgs))
323+
d, err = Open("", opts)
324+
if err != nil {
325+
return err.Error()
326+
}
327+
waitForCompactionsAndTableStats(d)
328+
return runLSMCmd(td, d)
329+
default:
330+
return fmt.Sprintf("unrecognized command %q", td.Cmd)
331+
}
332+
})
333+
}
334+
281335
// TestOpenAlreadyLocked verifies that we acquire the directory locks
282336
// required by the database during Open.
283337
// Each test case:
@@ -499,10 +553,10 @@ func TestOpenAlreadyLocked(t *testing.T) {
499553
func TestNewDBFilenames(t *testing.T) {
500554
versions := map[FormatMajorVersion][]string{
501555
internalFormatNewest: {
502-
"000002.log",
556+
"000003.log",
503557
"LOCK",
504558
"MANIFEST-000001",
505-
"OPTIONS-000003",
559+
"OPTIONS-000002",
506560
"marker.format-version.000015.028",
507561
"marker.manifest.000001.MANIFEST-000001",
508562
},

replay/testdata/corpus/findManifestStart

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ open
3737
list-files build
3838
----
3939
build:
40-
000002.log
40+
000003.log
4141
LOCK
4242
MANIFEST-000001
43-
OPTIONS-000003
43+
OPTIONS-000002
4444
marker.format-version.000001.013
4545
marker.manifest.000001.MANIFEST-000001
4646

@@ -56,12 +56,12 @@ flush
5656
list-files build
5757
----
5858
build:
59-
000002.log
59+
000003.log
6060
000004.log
6161
000005.sst
6262
LOCK
6363
MANIFEST-000001
64-
OPTIONS-000003
64+
OPTIONS-000002
6565
marker.format-version.000001.013
6666
marker.manifest.000001.MANIFEST-000001
6767

@@ -76,20 +76,20 @@ list-files build
7676
----
7777
build:
7878
000005.sst
79-
000008.log
79+
000009.log
8080
LOCK
8181
MANIFEST-000001
82-
MANIFEST-000007
83-
OPTIONS-000009
82+
MANIFEST-000008
83+
OPTIONS-000007
8484
marker.format-version.000001.013
85-
marker.manifest.000002.MANIFEST-000007
85+
marker.manifest.000002.MANIFEST-000008
8686

8787
delete-all build/MANIFEST-000007
8888
----
8989

9090
find-manifest-start build
9191
----
92-
index: 0, offset: 0, error: nil
92+
index: 1, offset: 87, error: nil
9393

9494
make-file build manifest 7
9595
bf13d7161a00010114636f636b726f6163685f636f6d70617261746f7203
@@ -122,4 +122,4 @@ created
122122

123123
find-manifest-start build
124124
----
125-
index: 1, offset: 739, error: nil
125+
index: 2, offset: 87, error: nil

replay/testdata/corpus/high_read_amp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ open
44
list-files build
55
----
66
build:
7-
000002.log
7+
000003.log
88
LOCK
99
MANIFEST-000001
10-
OPTIONS-000003
10+
OPTIONS-000002
1111
marker.format-version.000001.013
1212
marker.manifest.000001.MANIFEST-000001
1313

@@ -75,7 +75,7 @@ build:
7575
LOCK
7676
MANIFEST-000001
7777
MANIFEST-000010
78-
OPTIONS-000003
78+
OPTIONS-000002
7979
marker.format-version.000001.013
8080
marker.manifest.000002.MANIFEST-000010
8181

@@ -91,7 +91,7 @@ high_read_amp/checkpoint:
9191
000008.log
9292
000009.sst
9393
MANIFEST-000010
94-
OPTIONS-000003
94+
OPTIONS-000002
9595
marker.format-version.000001.013
9696
marker.manifest.000001.MANIFEST-000010
9797

replay/testdata/corpus/simple

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ open
44
list-files build
55
----
66
build:
7-
000002.log
7+
000003.log
88
LOCK
99
MANIFEST-000001
10-
OPTIONS-000003
10+
OPTIONS-000002
1111
marker.format-version.000001.013
1212
marker.manifest.000001.MANIFEST-000001
1313

@@ -23,12 +23,12 @@ flush
2323
list-files build
2424
----
2525
build:
26-
000002.log
26+
000003.log
2727
000004.log
2828
000005.sst
2929
LOCK
3030
MANIFEST-000001
31-
OPTIONS-000003
31+
OPTIONS-000002
3232
marker.format-version.000001.013
3333
marker.manifest.000001.MANIFEST-000001
3434

@@ -47,7 +47,7 @@ simple/checkpoint:
4747
000004.log
4848
000005.sst
4949
MANIFEST-000001
50-
OPTIONS-000003
50+
OPTIONS-000002
5151
marker.format-version.000001.013
5252
marker.manifest.000001.MANIFEST-000001
5353

replay/testdata/corpus/simple_ingest

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ open-ingest-excise
44
list-files build
55
----
66
build:
7-
000002.log
7+
000003.log
88
LOCK
99
MANIFEST-000001
10-
OPTIONS-000003
10+
OPTIONS-000002
1111
marker.format-version.000012.025
1212
marker.manifest.000001.MANIFEST-000001
1313

@@ -34,12 +34,12 @@ ingest-and-excised
3434
list-files build
3535
----
3636
build:
37-
000002.log
37+
000003.log
3838
000004.sst
3939
000005.sst
4040
LOCK
4141
MANIFEST-000001
42-
OPTIONS-000003
42+
OPTIONS-000002
4343
marker.format-version.000012.025
4444
marker.manifest.000001.MANIFEST-000001
4545

@@ -55,11 +55,11 @@ simple_ingest:
5555
list-files simple_ingest/checkpoint
5656
----
5757
simple_ingest/checkpoint:
58-
000002.log
58+
000003.log
5959
000004.sst
6060
000005.sst
6161
MANIFEST-000001
62-
OPTIONS-000003
62+
OPTIONS-000002
6363
marker.format-version.000001.025
6464
marker.manifest.000001.MANIFEST-000001
6565

0 commit comments

Comments
 (0)