Skip to content

Commit

Permalink
Simplify Raft WAL storage caching (#3102)
Browse files Browse the repository at this point in the history
Switch cache in raftwal to a sync.Map. Also put lastEntry behind the cache, considering how often it is called.
  • Loading branch information
manishrjain committed Mar 6, 2019
1 parent 760ed50 commit 840dad5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 44 deletions.
75 changes: 33 additions & 42 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

"github.com/dgraph-io/badger"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
Expand All @@ -32,48 +33,17 @@ import (
"github.com/dgraph-io/dgraph/x"
)

type localCache struct {
sync.RWMutex
firstIndex uint64
snap pb.Snapshot
}

func (c *localCache) setFirst(first uint64) {
c.Lock()
defer c.Unlock()
c.firstIndex = first
}

func (c *localCache) first() uint64 {
c.RLock()
defer c.RUnlock()
return c.firstIndex
}

func (c *localCache) setSnapshot(s pb.Snapshot) {
c.Lock()
defer c.Unlock()
c.snap = s
c.firstIndex = 0 // Reset firstIndex.
}

func (c *localCache) snapshot() pb.Snapshot {
c.RLock()
defer c.RUnlock()
return c.snap
}

type DiskStorage struct {
db *badger.DB
id uint64
gid uint32
elog trace.EventLog

cache localCache
cache *sync.Map
}

func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {
w := &DiskStorage{db: db, id: id, gid: gid}
w := &DiskStorage{db: db, id: id, gid: gid, cache: new(sync.Map)}
x.Check(w.StoreRaftId(id))
w.elog = trace.NewEventLog("Badger", "RaftStorage")

Expand Down Expand Up @@ -212,21 +182,33 @@ func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint6
return index, err
}

var (
snapshotKey = "snapshot"
firstKey = "first"
lastKey = "last"
)

// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot).
func (w *DiskStorage) FirstIndex() (uint64, error) {
snap := w.cache.snapshot()
if !raft.IsEmptySnap(snap) {
return snap.Metadata.Index + 1, nil
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return snap.Metadata.Index + 1, nil
}
}
if first := w.cache.first(); first > 0 {
return first, nil
if val, ok := w.cache.Load(firstKey); ok {
if first, ok := val.(uint64); ok {
return first, nil
}
}

// Now look into Badger.
index, err := w.seekEntry(nil, 0, false)
if err == nil {
glog.V(2).Infof("Setting first index: %d", index+1)
w.cache.setFirst(index + 1)
w.cache.Store(firstKey, index+1)
} else if glog.V(2) {
glog.Errorf("While seekEntry. Error: %v", err)
}
Expand All @@ -235,6 +217,11 @@ func (w *DiskStorage) FirstIndex() (uint64, error) {

// LastIndex returns the index of the last entry in the log.
func (w *DiskStorage) LastIndex() (uint64, error) {
if val, ok := w.cache.Load(lastKey); ok {
if last, ok := val.(uint64); ok {
return last, nil
}
}
return w.seekEntry(nil, math.MaxUint64, true)
}

Expand Down Expand Up @@ -281,8 +268,11 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
if s := w.cache.snapshot(); !raft.IsEmptySnap(s) {
return s, nil
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return *snap, nil
}
}
err := w.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(w.snapshotKey())
Expand Down Expand Up @@ -324,7 +314,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
}

// Cache it.
w.cache.setSnapshot(s)
w.cache.Store(snapshotKey, proto.Clone(&s))
return nil
}

Expand Down Expand Up @@ -627,5 +617,6 @@ func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) e
if laste < last {
return w.deleteFrom(batch, laste+1)
}
w.cache.Store(lastKey, laste)
return nil
}
4 changes: 2 additions & 2 deletions raftwal/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestStorageFirstIndex(t *testing.T) {
batch := db.NewWriteBatch()
require.NoError(t, ds.deleteUntil(batch, 4))
require.NoError(t, batch.Flush())
ds.cache.firstIndex = 0
ds.cache.Store(firstKey, 0)
first, err = ds.FirstIndex()
if err != nil {
t.Errorf("err = %v, want nil", err)
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestStorageCompact(t *testing.T) {
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
ds.cache.firstIndex = 0
ds.cache.Store(firstKey, 0)
index, err := ds.FirstIndex()
require.NoError(t, err)
// Do the minus one here to get the index of the snapshot.
Expand Down

0 comments on commit 840dad5

Please sign in to comment.