Skip to content
Permalink
Browse files

fix two ingestion bugs

Fix a bug where ingestion was blindly updating the seqnum for the
largest key in an sstable. This unintentionally extended the bounds for
an sstable when the largest key was a range tombstone sentinel key. This
is a minor cosmetic issue as the range tombstone's effect was not
changed by the extension of the sstable bounds.

Fix a bug where concurrent ingestions could collide in choosing their
target level. This collision occurred in `DB.ingestApply` which appears
to have mutual exclusion via `DB.mu`, but that exclusion was being
thwarted by `versionSet.logAndApply` which releases `DB.mu` during the
write to the MANIFEST. Internally, `logAndApply` achieves mutual
exclusion of MANIFEST operations by a separate mechanism. That mechanism
is now made visible via `versionSet.log{Lock,Unlock}` calls.
  • Loading branch information
petermattis committed Nov 21, 2019
1 parent ef22579 commit eb9e9599a1ea744f4f2360a3f694cf37289f4709
Showing with 148 additions and 70 deletions.
  1. +2 −0 compaction.go
  2. +1 −0 data_test.go
  3. +15 −2 ingest.go
  4. +44 −1 ingest_test.go
  5. +1 −0 open.go
  6. +53 −53 testdata/ingest
  7. +32 −14 version_set.go
@@ -871,6 +871,7 @@ func (d *DB) flush1() error {
metrics.BytesIn += size
}

d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, d.dataDir)
for _, fileNum := range pendingOutputs {
if _, ok := d.mu.compact.pendingOutputs[fileNum]; !ok {
@@ -997,6 +998,7 @@ func (d *DB) compact1() (err error) {
ve, pendingOutputs, err := d.runCompaction(jobID, c, compactionPacer)

if err == nil {
d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, d.dataDir)
for _, fileNum := range pendingOutputs {
if _, ok := d.mu.compact.pendingOutputs[fileNum]; !ok {
@@ -362,6 +362,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
if len(ve.NewFiles) > 0 {
jobID := d.mu.nextJobID
d.mu.nextJobID++
d.mu.versions.logLock()
if err := d.mu.versions.logAndApply(jobID, ve, nil, d.dataDir); err != nil {
return nil, err
}
@@ -195,7 +195,9 @@ func ingestCleanup(fs vfs.FS, dirname string, meta []*fileMetadata) error {
return firstErr
}

func ingestLink(jobID int, opts *Options, dirname string, paths []string, meta []*fileMetadata) error {
func ingestLink(
jobID int, opts *Options, dirname string, paths []string, meta []*fileMetadata,
) error {
// Wrap the normal filesystem with one which wraps newly created files with
// vfs.NewSyncingFile.
fs := syncingFS{
@@ -274,7 +276,12 @@ func ingestMemtableOverlaps(cmp Compare, mem flushable, meta []*fileMetadata) bo
func ingestUpdateSeqNum(opts *Options, dirname string, seqNum uint64, meta []*fileMetadata) error {
for _, m := range meta {
m.Smallest = base.MakeInternalKey(m.Smallest.UserKey, seqNum, m.Smallest.Kind())
m.Largest = base.MakeInternalKey(m.Largest.UserKey, seqNum, m.Largest.Kind())
// Don't update the seqnum for the largest key if that key is a range
// deletion sentinel key as doing so unintentionally extends the bounds of
// the table.
if m.Largest.Trailer != InternalKeyRangeDeleteSentinel {
m.Largest = base.MakeInternalKey(m.Largest.UserKey, seqNum, m.Largest.Kind())
}
// Setting smallestSeqNum == largestSeqNum triggers the setting of
// Properties.GlobalSeqNum when an sstable is loaded.
m.SmallestSeqNum = seqNum
@@ -496,6 +503,12 @@ func (d *DB) ingestApply(jobID int, meta []*fileMetadata) (*versionEdit, error)
NewFiles: make([]newFileEntry, len(meta)),
}
metrics := make(map[int]*LevelMetrics)

// Lock the manifest for writing before we use the current version to
// determine the target level. This prevents two concurrent ingestion jobs
// from using the same version to determine the target level, and also
// provides serialization with concurrent compaction and flush jobs.
d.mu.versions.logLock()
current := d.mu.versions.currentVersion()
for i := range meta {
// Determine the lowest level in the LSM for which the sstable doesn't
@@ -610,7 +610,7 @@ func TestIngest(t *testing.T) {

case "lsm":
d.mu.Lock()
s := d.mu.versions.currentVersion().String()
s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)
d.mu.Unlock()
return s

@@ -662,3 +662,46 @@ func TestIngestCompact(t *testing.T) {
}
}
}

func TestConcurrentIngest(t *testing.T) {
mem := vfs.NewMem()
d, err := Open("", &Options{
FS: mem,
})
if err != nil {
t.Fatal(err)
}

// Create an sstable with 2 keys. This is necessary to trigger the overlap
// bug because an sstable with a single key will not have overlap in internal
// key space and the sequence number assignment had already guaranteed
// correct ordering.
f, err := mem.Create("ext")
if err != nil {
t.Fatal(err)
}
w := sstable.NewWriter(f, sstable.WriterOptions{})
if err := w.Set([]byte("a"), nil); err != nil {
t.Fatal(err)
}
if err := w.Set([]byte("b"), nil); err != nil {
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}

// Ingest the same sstable multiple times concurrently.
errCh := make(chan error, 5)
for i := 0; i < cap(errCh); i++ {
go func() {
errCh <- d.Ingest([]string{"ext"})
}()
}
for i := 0; i < cap(errCh); i++ {
err := <-errCh
if err != nil {
t.Fatal(err)
}
}
}
@@ -225,6 +225,7 @@ func Open(dirname string, opts *Options) (*DB, error) {
// sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
// newLogNum. There should be no difference in using either value.
ve.MinUnflushedLogNum = newLogNum
d.mu.versions.logLock()
if err := d.mu.versions.logAndApply(jobID, &ve, nil, d.dataDir); err != nil {
return nil, err
}
@@ -27,7 +27,7 @@ ingest ext0
lsm
----
6:
6:[a-b]
6:[a#1,SET-b#1,SET]

iter
seek-ge a
@@ -56,9 +56,9 @@ ingest ext1
lsm
----
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
6:[a#1,SET-b#1,SET]

iter
seek-ge a
@@ -86,11 +86,11 @@ ingest ext2
lsm
----
4:
8:[a-c]
8:[a#3,SET-c#3,SET]
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
6:[a#1,SET-b#1,SET]

iter
seek-ge a
@@ -121,13 +121,13 @@ ingest ext3
lsm
----
3:
9:[b-c]
9:[b#4,MERGE-c#4,DEL]
4:
8:[a-c]
8:[a#3,SET-c#3,SET]
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
6:[a#1,SET-b#1,SET]

iter
seek-ge a
@@ -158,14 +158,14 @@ ingest ext4
lsm
----
3:
9:[b-c]
9:[b#4,MERGE-c#4,DEL]
4:
8:[a-c]
8:[a#3,SET-c#3,SET]
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
10:[x-y]
6:[a#1,SET-b#1,SET]
10:[x#5,SET-y#5,SET]

iter
seek-lt y
@@ -198,17 +198,17 @@ ingest ext5
lsm
----
0:
13:[j-k]
11:[k-k]
13:[j#6,SET-k#7,SET]
11:[k#8,SET-k#8,SET]
3:
9:[b-c]
9:[b#4,MERGE-c#4,DEL]
4:
8:[a-c]
8:[a#3,SET-c#3,SET]
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
10:[x-y]
6:[a#1,SET-b#1,SET]
10:[x#5,SET-y#5,SET]

iter
seek-ge j
@@ -238,18 +238,18 @@ ingest ext6
lsm
----
0:
13:[j-k]
11:[k-k]
13:[j#6,SET-k#7,SET]
11:[k#8,SET-k#8,SET]
3:
9:[b-c]
9:[b#4,MERGE-c#4,DEL]
4:
8:[a-c]
8:[a#3,SET-c#3,SET]
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
14:[n-n]
10:[x-y]
6:[a#1,SET-b#1,SET]
14:[n#10,SET-n#10,SET]
10:[x#5,SET-y#5,SET]

get
m
@@ -269,20 +269,20 @@ ingest ext7
lsm
----
0:
13:[j-k]
11:[k-k]
17:[m-m]
15:[a-z]
13:[j#6,SET-k#7,SET]
11:[k#8,SET-k#8,SET]
17:[m#9,SET-m#9,SET]
15:[a#11,RANGEDEL-z#72057594037927935,RANGEDEL]
3:
9:[b-c]
9:[b#4,MERGE-c#4,DEL]
4:
8:[a-c]
8:[a#3,SET-c#3,SET]
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
14:[n-n]
10:[x-y]
6:[a#1,SET-b#1,SET]
14:[n#10,SET-n#10,SET]
10:[x#5,SET-y#5,SET]

get
a
@@ -353,20 +353,20 @@ y:40
lsm
----
0:
13:[j-k]
11:[k-k]
17:[m-m]
15:[a-z]
18:[j-m]
19:[a-x]
20:[y-y]
13:[j#6,SET-k#7,SET]
11:[k#8,SET-k#8,SET]
17:[m#9,SET-m#9,SET]
15:[a#11,RANGEDEL-z#72057594037927935,RANGEDEL]
18:[j#12,RANGEDEL-m#12,SET]
19:[a#13,RANGEDEL-x#72057594037927935,RANGEDEL]
20:[y#14,SET-y#14,SET]
3:
9:[b-c]
9:[b#4,MERGE-c#4,DEL]
4:
8:[a-c]
8:[a#3,SET-c#3,SET]
5:
7:[a-b]
7:[a#2,SET-b#2,DEL]
6:
6:[a-b]
14:[n-n]
10:[x-y]
6:[a#1,SET-b#1,SET]
14:[n#10,SET-n#10,SET]
10:[x#5,SET-y#5,SET]
@@ -255,26 +255,44 @@ func (vs *versionSet) close() error {
return nil
}

// logAndApply logs the version edit to the manifest, applies the version edit
// to the current version, and installs the new version. DB.mu must be held
// when calling this method and will be released temporarily while performing
// file I/O.
func (vs *versionSet) logAndApply(
jobID int,
ve *versionEdit,
metrics map[int]*LevelMetrics,
dir vfs.File,
) error {
// logLock locks the manifest for writing. The lock must be released by either
// a call to logUnlock or logAndApply.
//
// DB.mu must be held when calling this method.
func (vs *versionSet) logLock() {
// Wait for any existing writing to the manifest to complete, then mark the
// manifest as busy.
for vs.writing {
vs.writerCond.Wait()
}
vs.writing = true
defer func() {
vs.writing = false
vs.writerCond.Signal()
}()
}

// logUnlock releases the lock for manifest writing.
//
// DB.mu must be held when calling this method.
func (vs *versionSet) logUnlock() {
if !vs.writing {
vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
}
vs.writing = false
vs.writerCond.Signal()
}

// logAndApply logs the version edit to the manifest, applies the version edit
// to the current version, and installs the new version.
//
// DB.mu must be held when calling this method and will be released temporarily
// while performing file I/O. Requires that the manifest is locked for writing
// (see logLock). Will unconditionally release the manifest lock (via
// logUnlock) even if an error occurs.
func (vs *versionSet) logAndApply(
jobID int, ve *versionEdit, metrics map[int]*LevelMetrics, dir vfs.File,
) error {
if !vs.writing {
vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
}
defer vs.logUnlock()

if ve.MinUnflushedLogNum != 0 {
if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||

0 comments on commit eb9e959

Please sign in to comment.
You can’t perform that action at this time.