Skip to content

Commit

Permalink
Merge pull request #13487 from petermattis/pmattis/raft-entry-cache
Browse files Browse the repository at this point in the history
storage: tweak raftEntryCache allocation
  • Loading branch information
petermattis committed Feb 8, 2017
2 parents ff184d2 + 9d8eb1a commit ddbcd15
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions pkg/storage/entry_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type entryCacheKey struct {

// Compare implements the llrb.Comparable interface for entryCacheKey, so that
// it can be used as a key for util.OrderedCache.
func (a entryCacheKey) Compare(b llrb.Comparable) int {
bk := b.(entryCacheKey)
func (a *entryCacheKey) Compare(b llrb.Comparable) int {
bk := b.(*entryCacheKey)
switch {
case a.RangeID < bk.RangeID:
return -1
Expand All @@ -53,9 +53,9 @@ func (a entryCacheKey) Compare(b llrb.Comparable) int {
// recently-written log entries between log append and application
// to the FSM.
type raftEntryCache struct {
syncutil.RWMutex // protects Cache for concurrent access.
bytes uint64 // total size of the cache in bytes
cache *cache.OrderedCache // LRU cache of log entries, keyed by rangeID / log index
syncutil.Mutex // protects Cache for concurrent access.
bytes uint64 // total size of the cache in bytes
cache *cache.OrderedCache // LRU cache of log entries, keyed by rangeID / log index
}

// newRaftEntryCache returns a new RaftEntryCache with the given
Expand All @@ -75,13 +75,27 @@ func newRaftEntryCache(maxBytes uint64) *raftEntryCache {
return rec.bytes > maxBytes && n >= 1
}
rec.cache.Config.OnEvicted = func(k, v interface{}) {
ent := v.(raftpb.Entry)
ent := v.(*raftpb.Entry)
rec.bytes -= uint64(ent.Size())
}

return rec
}

func (rec *raftEntryCache) makeCacheEntry(key entryCacheKey, value raftpb.Entry) *cache.Entry {
alloc := struct {
key entryCacheKey
value raftpb.Entry
entry cache.Entry
}{
key: key,
value: value,
}
alloc.entry.Key = &alloc.key
alloc.entry.Value = &alloc.value
return &alloc.entry
}

// addEntries adds the slice of raft entries, using the range ID and the
// entry indexes as each cached entry's key.
func (rec *raftEntryCache) addEntries(rangeID roachpb.RangeID, ents []raftpb.Entry) {
Expand All @@ -93,7 +107,8 @@ func (rec *raftEntryCache) addEntries(rangeID roachpb.RangeID, ents []raftpb.Ent

for _, e := range ents {
rec.bytes += uint64(e.Size())
rec.cache.Add(entryCacheKey{RangeID: rangeID, Index: e.Index}, e)
entry := rec.makeCacheEntry(entryCacheKey{RangeID: rangeID, Index: e.Index}, e)
rec.cache.AddEntry(entry)
}
}

Expand All @@ -107,19 +122,18 @@ func (rec *raftEntryCache) getEntries(
) ([]raftpb.Entry, uint64, uint64) {
rec.Lock()
defer rec.Unlock()
var ent raftpb.Entry
var bytes uint64
nextIndex := lo

fromKey := entryCacheKey{RangeID: rangeID, Index: lo}
toKey := entryCacheKey{RangeID: rangeID, Index: hi}
fromKey := &entryCacheKey{RangeID: rangeID, Index: lo}
toKey := &entryCacheKey{RangeID: rangeID, Index: hi}
rec.cache.DoRange(func(k, v interface{}) bool {
ecKey := k.(entryCacheKey)
ecKey := k.(*entryCacheKey)
if ecKey.Index != nextIndex {
return true
}
ent = v.(raftpb.Entry)
ents = append(ents, ent)
ent := v.(*raftpb.Entry)
ents = append(ents, *ent)
bytes += uint64(ent.Size())
nextIndex++
if maxBytes > 0 && bytes > maxBytes {
Expand All @@ -138,11 +152,11 @@ func (rec *raftEntryCache) delEntries(rangeID roachpb.RangeID, lo, hi uint64) {
if lo >= hi {
return
}
var keys []entryCacheKey
fromKey := entryCacheKey{RangeID: rangeID, Index: lo}
toKey := entryCacheKey{RangeID: rangeID, Index: hi}
var keys []*entryCacheKey
fromKey := &entryCacheKey{RangeID: rangeID, Index: lo}
toKey := &entryCacheKey{RangeID: rangeID, Index: hi}
rec.cache.DoRange(func(k, v interface{}) bool {
keys = append(keys, k.(entryCacheKey))
keys = append(keys, k.(*entryCacheKey))
return false
}, fromKey, toKey)

Expand Down

0 comments on commit ddbcd15

Please sign in to comment.