diff --git a/manifest.go b/manifest.go index 75204569d..0b15badb1 100644 --- a/manifest.go +++ b/manifest.go @@ -212,19 +212,17 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { } // Maybe we could use O_APPEND instead (on certain file systems) mf.appendLock.Lock() + defer mf.appendLock.Unlock() if err := applyChangeSet(&mf.manifest, &changes); err != nil { - mf.appendLock.Unlock() return err } if mf.inMemory { - mf.appendLock.Unlock() return nil } // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care if mf.manifest.Deletions > mf.deletionsRewriteThreshold && mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) { if err := mf.rewrite(); err != nil { - mf.appendLock.Unlock() return err } } else { @@ -233,15 +231,16 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable)) buf = append(lenCrcBuf[:], buf...) if _, err := mf.fp.Write(buf); err != nil { - mf.appendLock.Unlock() return err } } - mf.appendLock.Unlock() - return mf.fp.Sync() + return syncFunc(mf.fp) } +// this function is saved here to allow injection of fake filesystem latency at test time. +var syncFunc = func(f *os.File) error { return f.Sync() } + // Has to be 4 bytes. The value can never change, ever, anyway. var magicText = [4]byte{'B', 'd', 'g', 'r'} diff --git a/manifest_test.go b/manifest_test.go index 05814bc88..a521d8444 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -24,7 +24,9 @@ import ( "os" "path/filepath" "sort" + "sync" "testing" + "time" otrace "go.opencensus.io/trace" @@ -245,3 +247,44 @@ func TestManifestRewrite(t *testing.T) { uint64(deletionsThreshold * 3): {Level: 0}, }, m.Tables) } + +func TestConcurrentManifestCompaction(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + // set this low so rewrites will happen more often + deletionsThreshold := 1 + + // overwrite the sync function to make this race condition easily reproducible + syncFunc = func(f *os.File) error { + // effectively making the Sync() take around 1s makes this reproduce every time + time.Sleep(1 * time.Second) + return f.Sync() + } + + mf, _, err := helpOpenOrCreateManifestFile(dir, false, 0, deletionsThreshold) + require.NoError(t, err) + + cs := &pb.ManifestChangeSet{} + for i := uint64(0); i < 1000; i++ { + cs.Changes = append(cs.Changes, + newCreateChange(i, 0, 0, 0), + newDeleteChange(i), + ) + } + + // simulate 2 concurrent compaction threads + n := 2 + wg := sync.WaitGroup{} + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + require.NoError(t, mf.addChanges(cs.Changes)) + }() + } + wg.Wait() + + require.NoError(t, mf.close()) +}