From 840dad5853ff79a6d6f993d65f6bca96991ed39c Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 5 Mar 2019 17:09:24 -0800 Subject: [PATCH] Simplify Raft WAL storage caching (#3102) Switch cache in raftwal to a sync.Map. Also put lastEntry behind the cache, considering how often it is called. --- raftwal/storage.go | 75 ++++++++++++++++++----------------------- raftwal/storage_test.go | 4 +-- 2 files changed, 35 insertions(+), 44 deletions(-) diff --git a/raftwal/storage.go b/raftwal/storage.go index 5ab2377a30a..aaad65e72cd 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/dgraph-io/badger" + "github.com/gogo/protobuf/proto" "github.com/golang/glog" "go.etcd.io/etcd/raft" pb "go.etcd.io/etcd/raft/raftpb" @@ -32,48 +33,17 @@ import ( "github.com/dgraph-io/dgraph/x" ) -type localCache struct { - sync.RWMutex - firstIndex uint64 - snap pb.Snapshot -} - -func (c *localCache) setFirst(first uint64) { - c.Lock() - defer c.Unlock() - c.firstIndex = first -} - -func (c *localCache) first() uint64 { - c.RLock() - defer c.RUnlock() - return c.firstIndex -} - -func (c *localCache) setSnapshot(s pb.Snapshot) { - c.Lock() - defer c.Unlock() - c.snap = s - c.firstIndex = 0 // Reset firstIndex. -} - -func (c *localCache) snapshot() pb.Snapshot { - c.RLock() - defer c.RUnlock() - return c.snap -} - type DiskStorage struct { db *badger.DB id uint64 gid uint32 elog trace.EventLog - cache localCache + cache *sync.Map } func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { - w := &DiskStorage{db: db, id: id, gid: gid} + w := &DiskStorage{db: db, id: id, gid: gid, cache: new(sync.Map)} x.Check(w.StoreRaftId(id)) w.elog = trace.NewEventLog("Badger", "RaftStorage") @@ -212,21 +182,33 @@ func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint6 return index, err } +var ( + snapshotKey = "snapshot" + firstKey = "first" + lastKey = "last" +) + // FirstIndex returns the index of the first log entry that is // possibly available via Entries (older entries have been incorporated // into the latest Snapshot). func (w *DiskStorage) FirstIndex() (uint64, error) { - snap := w.cache.snapshot() - if !raft.IsEmptySnap(snap) { - return snap.Metadata.Index + 1, nil + if val, ok := w.cache.Load(snapshotKey); ok { + snap, ok := val.(*pb.Snapshot) + if ok && !raft.IsEmptySnap(*snap) { + return snap.Metadata.Index + 1, nil + } } - if first := w.cache.first(); first > 0 { - return first, nil + if val, ok := w.cache.Load(firstKey); ok { + if first, ok := val.(uint64); ok { + return first, nil + } } + + // Now look into Badger. index, err := w.seekEntry(nil, 0, false) if err == nil { glog.V(2).Infof("Setting first index: %d", index+1) - w.cache.setFirst(index + 1) + w.cache.Store(firstKey, index+1) } else if glog.V(2) { glog.Errorf("While seekEntry. Error: %v", err) } @@ -235,6 +217,11 @@ func (w *DiskStorage) FirstIndex() (uint64, error) { // LastIndex returns the index of the last entry in the log. func (w *DiskStorage) LastIndex() (uint64, error) { + if val, ok := w.cache.Load(lastKey); ok { + if last, ok := val.(uint64); ok { + return last, nil + } + } return w.seekEntry(nil, math.MaxUint64, true) } @@ -281,8 +268,11 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) { - if s := w.cache.snapshot(); !raft.IsEmptySnap(s) { - return s, nil + if val, ok := w.cache.Load(snapshotKey); ok { + snap, ok := val.(*pb.Snapshot) + if ok && !raft.IsEmptySnap(*snap) { + return *snap, nil + } } err := w.db.View(func(txn *badger.Txn) error { item, err := txn.Get(w.snapshotKey()) @@ -324,7 +314,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error } // Cache it. - w.cache.setSnapshot(s) + w.cache.Store(snapshotKey, proto.Clone(&s)) return nil } @@ -627,5 +617,6 @@ func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) e if laste < last { return w.deleteFrom(batch, laste+1) } + w.cache.Store(lastKey, laste) return nil } diff --git a/raftwal/storage_test.go b/raftwal/storage_test.go index 02fd95c5776..a9e8a832c82 100644 --- a/raftwal/storage_test.go +++ b/raftwal/storage_test.go @@ -201,7 +201,7 @@ func TestStorageFirstIndex(t *testing.T) { batch := db.NewWriteBatch() require.NoError(t, ds.deleteUntil(batch, 4)) require.NoError(t, batch.Flush()) - ds.cache.firstIndex = 0 + ds.cache.Store(firstKey, 0) first, err = ds.FirstIndex() if err != nil { t.Errorf("err = %v, want nil", err) @@ -244,7 +244,7 @@ func TestStorageCompact(t *testing.T) { if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) } - ds.cache.firstIndex = 0 + ds.cache.Store(firstKey, 0) index, err := ds.FirstIndex() require.NoError(t, err) // Do the minus one here to get the index of the snapshot.