Skip to content

Commit c1cf0d7

Browse files
Fix deadlock in discard stats (#1070)
Fixes #1032 Currently discardStats flow is as follows: * Discard Stats are generated during compaction. At the end, compaction routine updates these stats in vlog(vlog maintains all discard stats). If number of updates exceeds a threshold, a new request is generated and sent to write channel. Routine waits for request to complete(request.Wait()). * Requests are consumed from write channel and written to vlog first and then to memtable. * If memtable is full, it is flushed to flush channel. *From flush channel, memtables are written to L0 only if there are less than or equal to NumLevelZeroTablesStall tables already. Events which can lead to deadlock: Compaction is running on L0 which has NumLevelZeroTablesStall tables currently and tries to flush discard stats to write channel. After pushing stats to write channel, it waits for write request to complete, which cannot be completed due to cyclic dependency. Fix: This PR introduces a flush channel(buffered) for discardStats. Compaction routine, will push generated discard stats to flush channel, if channel is full it just returns. This decouples compaction and writes. We have a separate routine for consuming stats from flush chan.
1 parent dbe1cb9 commit c1cf0d7

File tree

5 files changed

+114
-85
lines changed

5 files changed

+114
-85
lines changed

db.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,10 +380,6 @@ func (db *DB) Close() error {
380380
func (db *DB) close() (err error) {
381381
db.elog.Printf("Closing database")
382382

383-
if err := db.vlog.flushDiscardStats(); err != nil {
384-
return errors.Wrap(err, "failed to flush discard stats")
385-
}
386-
387383
atomic.StoreInt32(&db.blockWrites, 1)
388384

389385
// Stop value GC first.

db2_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -369,22 +369,20 @@ func TestDiscardMapTooBig(t *testing.T) {
369369
defer os.RemoveAll(dir)
370370

371371
db, err := Open(DefaultOptions(dir))
372-
require.NoError(t, err, "error while openning db")
372+
require.NoError(t, err, "error while opening db")
373373

374-
// Add some data so that memtable flush happens on close
374+
// Add some data so that memtable flush happens on close.
375375
require.NoError(t, db.Update(func(txn *Txn) error {
376376
return txn.Set([]byte("foo"), []byte("bar"))
377377
}))
378378

379379
// overwrite discardstat with large value
380-
db.vlog.lfDiscardStats = &lfDiscardStats{
381-
m: createDiscardStats(),
382-
}
380+
db.vlog.lfDiscardStats.m = createDiscardStats()
383381

384382
require.NoError(t, db.Close())
385383
// reopen the same DB
386384
db, err = Open(DefaultOptions(dir))
387-
require.NoError(t, err, "error while openning db")
385+
require.NoError(t, err, "error while opening db")
388386
require.NoError(t, db.Close())
389387
}
390388

levels.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -638,9 +638,7 @@ func (s *levelsController) compactBuildTables(
638638
sort.Slice(newTables, func(i, j int) bool {
639639
return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
640640
})
641-
if err := s.kv.vlog.updateDiscardStats(discardStats); err != nil {
642-
return nil, nil, errors.Wrap(err, "failed to update discard stats")
643-
}
641+
s.kv.vlog.updateDiscardStats(discardStats)
644642
s.kv.opt.Debugf("Discard stats: %v", discardStats)
645643
return newTables, func() error { return decrRefs(newTables) }, nil
646644
}

value.go

Lines changed: 74 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -762,8 +762,10 @@ func (vlog *valueLog) dropAll() (int, error) {
762762
// lfDiscardStats keeps track of the amount of data that could be discarded for
763763
// a given logfile.
764764
type lfDiscardStats struct {
765-
sync.Mutex
765+
sync.RWMutex
766766
m map[uint32]int64
767+
flushChan chan map[uint32]int64
768+
closer *y.Closer
767769
updatesSinceFlush int
768770
}
769771

@@ -838,6 +840,7 @@ func (lf *logFile) open(path string, flags uint32) error {
838840
if lf.fd, err = y.OpenExistingFile(path, flags); err != nil {
839841
return y.Wrapf(err, "Error while opening file in logfile %s", path)
840842
}
843+
841844
fi, err := lf.fd.Stat()
842845
if err != nil {
843846
return errFile(err, lf.path, "Unable to run file.Stat")
@@ -999,7 +1002,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
9991002
vlog.elog = trace.NewEventLog("Badger", "Valuelog")
10001003
}
10011004
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
1002-
vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
1005+
vlog.lfDiscardStats = &lfDiscardStats{
1006+
m: make(map[uint32]int64),
1007+
closer: y.NewCloser(1),
1008+
flushChan: make(chan map[uint32]int64, 16),
1009+
}
1010+
go vlog.flushDiscardStats()
10031011
if err := vlog.populateFilesMap(); err != nil {
10041012
return err
10051013
}
@@ -1131,6 +1139,9 @@ func (lf *logFile) init() error {
11311139
}
11321140

11331141
func (vlog *valueLog) Close() error {
1142+
// close flushDiscardStats.
1143+
vlog.lfDiscardStats.closer.SignalAndWait()
1144+
11341145
vlog.elog.Printf("Stopping garbage collection of values.")
11351146
defer vlog.elog.Finish()
11361147

@@ -1217,7 +1228,7 @@ func (reqs requests) DecrRef() {
12171228
// sync function syncs content of latest value log file to disk. Syncing of value log directory is
12181229
// not required here as it happens every time a value log file rotation happens(check createVlogFile
12191230
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
1220-
// if fid >= vlog.maxFid. In some cases such as replay(while openning db), it might be called with
1231+
// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
12211232
// fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
12221233
func (vlog *valueLog) sync(fid uint32) error {
12231234
if vlog.opt.SyncWrites {
@@ -1443,7 +1454,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi
14431454
fid uint32
14441455
discard int64
14451456
}{math.MaxUint32, 0}
1446-
vlog.lfDiscardStats.Lock()
1457+
vlog.lfDiscardStats.RLock()
14471458
for _, fid := range fids {
14481459
if fid >= head.Fid {
14491460
break
@@ -1453,7 +1464,7 @@ func (vlog *valueLog) pickLog(head valuePointer, tr trace.Trace) (files []*logFi
14531464
candidate.discard = vlog.lfDiscardStats.m[fid]
14541465
}
14551466
}
1456-
vlog.lfDiscardStats.Unlock()
1467+
vlog.lfDiscardStats.RUnlock()
14571468

14581469
if candidate.fid != math.MaxUint32 { // Found a candidate
14591470
tr.LazyPrintf("Found candidate via discard stats: %v", candidate)
@@ -1682,58 +1693,72 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error {
16821693
}
16831694
}
16841695

1685-
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error {
1686-
vlog.lfDiscardStats.Lock()
1687-
1688-
for fid, sz := range stats {
1689-
vlog.lfDiscardStats.m[fid] += sz
1690-
vlog.lfDiscardStats.updatesSinceFlush++
1691-
}
1692-
if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
1693-
vlog.lfDiscardStats.Unlock()
1694-
// flushDiscardStats also acquires lock. So, we need to unlock here.
1695-
return vlog.flushDiscardStats()
1696+
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
1697+
select {
1698+
case vlog.lfDiscardStats.flushChan <- stats:
1699+
default:
1700+
vlog.opt.Warningf("updateDiscardStats called: discard stats flushChan full, " +
1701+
"returning without pushing to flushChan")
16961702
}
1697-
vlog.lfDiscardStats.Unlock()
1698-
return nil
16991703
}
17001704

1701-
// flushDiscardStats inserts discard stats into badger. Returns error on failure.
1702-
func (vlog *valueLog) flushDiscardStats() error {
1703-
vlog.lfDiscardStats.Lock()
1704-
defer vlog.lfDiscardStats.Unlock()
1705+
func (vlog *valueLog) flushDiscardStats() {
1706+
defer vlog.lfDiscardStats.closer.Done()
17051707

1706-
if len(vlog.lfDiscardStats.m) == 0 {
1707-
return nil
1708+
mergeStats := func(stats map[uint32]int64) ([]byte, error) {
1709+
vlog.lfDiscardStats.Lock()
1710+
defer vlog.lfDiscardStats.Unlock()
1711+
for fid, count := range stats {
1712+
vlog.lfDiscardStats.m[fid] += count
1713+
vlog.lfDiscardStats.updatesSinceFlush++
1714+
}
1715+
1716+
if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold {
1717+
encodedDS, err := json.Marshal(vlog.lfDiscardStats.m)
1718+
if err != nil {
1719+
return nil, err
1720+
}
1721+
vlog.lfDiscardStats.updatesSinceFlush = 0
1722+
return encodedDS, nil
1723+
}
1724+
return nil, nil
17081725
}
1709-
entries := []*Entry{{
1710-
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
1711-
Value: vlog.encodedDiscardStats(),
1712-
}}
1713-
req, err := vlog.db.sendToWriteCh(entries)
1714-
if err == ErrBlockedWrites {
1715-
// We'll block write while closing db.
1716-
// When L0 compaction in close may push discard stats.
1717-
// So ignoring it.
1718-
// https://github.com/dgraph-io/badger/issues/970
1719-
return nil
1720-
} else if err != nil {
1721-
return errors.Wrapf(err, "failed to push discard stats to write channel")
1726+
1727+
process := func(stats map[uint32]int64) error {
1728+
encodedDS, err := mergeStats(stats)
1729+
if err != nil || encodedDS == nil {
1730+
return err
1731+
}
1732+
1733+
entries := []*Entry{{
1734+
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
1735+
Value: encodedDS,
1736+
}}
1737+
req, err := vlog.db.sendToWriteCh(entries)
1738+
// No special handling of ErrBlockedWrites is required as err is just logged in
1739+
// for loop below.
1740+
if err != nil {
1741+
return errors.Wrapf(err, "failed to push discard stats to write channel")
1742+
}
1743+
return req.Wait()
17221744
}
1723-
vlog.lfDiscardStats.updatesSinceFlush = 0
1724-
return req.Wait()
1725-
}
17261745

1727-
// encodedDiscardStats returns []byte representation of lfDiscardStats
1728-
// This will be called while storing stats in BadgerDB
1729-
// caller should acquire lock before encoding the stats.
1730-
func (vlog *valueLog) encodedDiscardStats() []byte {
1731-
encodedStats, _ := json.Marshal(vlog.lfDiscardStats.m)
1732-
return encodedStats
1746+
closer := vlog.lfDiscardStats.closer
1747+
for {
1748+
select {
1749+
case <-closer.HasBeenClosed():
1750+
// For simplicity just return without processing already present in stats in flushChan.
1751+
return
1752+
case stats := <-vlog.lfDiscardStats.flushChan:
1753+
if err := process(stats); err != nil {
1754+
vlog.opt.Errorf("unable to process discardstats with error: %s", err)
1755+
}
1756+
}
1757+
}
17331758
}
17341759

1735-
// populateDiscardStats populates vlog.lfDiscardStats
1736-
// This function will be called while initializing valueLog
1760+
// populateDiscardStats populates vlog.lfDiscardStats.
1761+
// This function will be called while initializing valueLog.
17371762
func (vlog *valueLog) populateDiscardStats() error {
17381763
key := y.KeyWithTs(lfDiscardStatsKey, math.MaxUint64)
17391764
var statsMap map[uint32]int64
@@ -1785,6 +1810,6 @@ func (vlog *valueLog) populateDiscardStats() error {
17851810
return errors.Wrapf(err, "failed to unmarshal discard stats")
17861811
}
17871812
vlog.opt.Debugf("Value Log Discard stats: %v", statsMap)
1788-
vlog.lfDiscardStats = &lfDiscardStats{m: statsMap}
1813+
vlog.lfDiscardStats.flushChan <- statsMap
17891814
return nil
17901815
}

value_test.go

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package badger
1818

1919
import (
2020
"bytes"
21+
"encoding/json"
2122
"fmt"
2223
"io/ioutil"
2324
"math/rand"
@@ -470,23 +471,32 @@ func TestPersistLFDiscardStats(t *testing.T) {
470471
require.NoError(t, err)
471472
}
472473

473-
// wait for compaction to complete
474-
time.Sleep(1 * time.Second)
474+
time.Sleep(1 * time.Second) // wait for compaction to complete
475475

476476
persistedMap := make(map[uint32]int64)
477477
db.vlog.lfDiscardStats.Lock()
478+
require.True(t, len(db.vlog.lfDiscardStats.m) > 0, "some discardStats should be generated")
478479
for k, v := range db.vlog.lfDiscardStats.m {
479480
persistedMap[k] = v
480481
}
482+
db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1
481483
db.vlog.lfDiscardStats.Unlock()
484+
485+
// db.vlog.lfDiscardStats.updatesSinceFlush is already > discardStatsFlushThreshold,
486+
// send empty map to flushChan, so that latest discardStats map can be persisted.
487+
db.vlog.lfDiscardStats.flushChan <- map[uint32]int64{}
488+
time.Sleep(1 * time.Second) // Wait for map to be persisted.
482489
err = db.Close()
483490
require.NoError(t, err)
484491

485492
db, err = Open(opt)
486493
require.NoError(t, err)
487494
defer db.Close()
495+
time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats().
496+
db.vlog.lfDiscardStats.RLock()
488497
require.True(t, reflect.DeepEqual(persistedMap, db.vlog.lfDiscardStats.m),
489498
"Discard maps are not equal")
499+
db.vlog.lfDiscardStats.RUnlock()
490500
}
491501

492502
func TestChecksums(t *testing.T) {
@@ -630,7 +640,6 @@ func TestPartialAppendToValueLog(t *testing.T) {
630640
kv, err = Open(opts)
631641
require.NoError(t, err)
632642
checkKeys(t, kv, [][]byte{k3})
633-
634643
// Replay value log from beginning, badger head is past k2.
635644
require.NoError(t, kv.vlog.Close())
636645
require.NoError(t,
@@ -1003,14 +1012,12 @@ func TestTruncatedDiscardStat(t *testing.T) {
10031012
for i := uint32(0); i < uint32(20); i++ {
10041013
stat[i] = 0
10051014
}
1006-
// Set discard stats.
1007-
db.vlog.lfDiscardStats = &lfDiscardStats{
1008-
m: stat,
1009-
}
1015+
db.vlog.lfDiscardStats.m = stat
1016+
encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m)
10101017
entries := []*Entry{{
10111018
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
10121019
// Insert truncated discard stats. This is important.
1013-
Value: db.vlog.encodedDiscardStats()[:10],
1020+
Value: encodedDS[:10],
10141021
}}
10151022
// Push discard stats entry to the write channel.
10161023
req, err := db.sendToWriteCh(entries)
@@ -1059,14 +1066,14 @@ func TestDiscardStatsMove(t *testing.T) {
10591066
stat[i] = 0
10601067
}
10611068

1062-
// Set discard stats.
1063-
db.vlog.lfDiscardStats = &lfDiscardStats{
1064-
m: stat,
1065-
}
1069+
db.vlog.lfDiscardStats.Lock()
1070+
db.vlog.lfDiscardStats.m = stat
1071+
encodedDS, _ := json.Marshal(db.vlog.lfDiscardStats.m)
1072+
db.vlog.lfDiscardStats.Unlock()
10661073
entries := []*Entry{{
10671074
Key: y.KeyWithTs(lfDiscardStatsKey, 1),
10681075
// The discard stat value is more than value threshold.
1069-
Value: db.vlog.encodedDiscardStats(),
1076+
Value: encodedDS,
10701077
}}
10711078
// Push discard stats entry to the write channel.
10721079
req, err := db.sendToWriteCh(entries)
@@ -1076,7 +1083,9 @@ func TestDiscardStatsMove(t *testing.T) {
10761083
// Unset discard stats. We've already pushed the stats. If we don't unset it then it will be
10771084
// pushed again on DB close. Also, the first insertion was in vlog file 1, this insertion would
10781085
// be in value log file 3.
1086+
db.vlog.lfDiscardStats.Lock()
10791087
db.vlog.lfDiscardStats.m = nil
1088+
db.vlog.lfDiscardStats.Unlock()
10801089

10811090
// Push more entries so that we get more than 1 value log files.
10821091
require.NoError(t, db.Update(func(txn *Txn) error {
@@ -1086,7 +1095,6 @@ func TestDiscardStatsMove(t *testing.T) {
10861095
require.NoError(t, db.Update(func(txn *Txn) error {
10871096
e := NewEntry([]byte("ff"), []byte("1"))
10881097
return txn.SetEntry(e)
1089-
10901098
}))
10911099

10921100
tr := trace.New("Badger.ValueLog", "GC")
@@ -1096,8 +1104,13 @@ func TestDiscardStatsMove(t *testing.T) {
10961104
require.NoError(t, db.Close())
10971105

10981106
db, err = Open(ops)
1107+
// discardStats will be populate using vlog.populateDiscardStats(), which pushes discard stats
1108+
// to vlog.lfDiscardStats.flushChan. Hence wait for some time, for discard stats to be updated.
1109+
time.Sleep(1 * time.Second)
10991110
require.NoError(t, err)
1111+
db.vlog.lfDiscardStats.RLock()
11001112
require.Equal(t, stat, db.vlog.lfDiscardStats.m)
1113+
db.vlog.lfDiscardStats.RUnlock()
11011114
require.NoError(t, db.Close())
11021115
}
11031116

@@ -1109,11 +1122,13 @@ func TestBlockedDiscardStats(t *testing.T) {
11091122
db, err := Open(getTestOptions(dir))
11101123
require.NoError(t, err)
11111124
// Set discard stats.
1112-
db.vlog.lfDiscardStats = &lfDiscardStats{
1113-
m: map[uint32]int64{0: 0},
1114-
}
1125+
db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0}
11151126
db.blockWrite()
1116-
require.NoError(t, db.vlog.flushDiscardStats())
1127+
// Push discard stats more than the capacity of flushChan. This ensures at least one flush
1128+
// operation completes successfully after the writes were blocked.
1129+
for i := 0; i < cap(db.vlog.lfDiscardStats.flushChan)+2; i++ {
1130+
db.vlog.lfDiscardStats.flushChan <- db.vlog.lfDiscardStats.m
1131+
}
11171132
db.unblockWrite()
11181133
require.NoError(t, db.Close())
11191134
}
@@ -1126,11 +1141,8 @@ func TestBlockedDiscardStatsOnClose(t *testing.T) {
11261141

11271142
db, err := Open(getTestOptions(dir))
11281143
require.NoError(t, err)
1129-
// Set discard stats.
1130-
db.vlog.lfDiscardStats = &lfDiscardStats{
1131-
m: map[uint32]int64{0: 0},
1132-
}
1133-
// This is important. Set updateSinceFlush to discardStatsFlushThresold so
1144+
db.vlog.lfDiscardStats.m = map[uint32]int64{0: 0}
1145+
// This is important. Set updateSinceFlush to discardStatsFlushThreshold so
11341146
// that the next update call flushes the discard stats.
11351147
db.vlog.lfDiscardStats.updatesSinceFlush = discardStatsFlushThreshold + 1
11361148
require.NoError(t, db.Close())

0 commit comments

Comments
 (0)