Skip to content

Commit

Permalink
Improve encapsulation of shards (allegro#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg authored and janisz committed Sep 25, 2017
1 parent e055ac8 commit 2831151
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 88 deletions.
71 changes: 4 additions & 67 deletions bigcache.go
Expand Up @@ -2,7 +2,6 @@ package bigcache

import (
"fmt"
"log"
"time"
)

Expand Down Expand Up @@ -56,7 +55,7 @@ func newBigCache(config Config, clock clock) (*BigCache, error) {
}

for i := 0; i < config.Shards; i++ {
cache.shards[i] = initNewShard(config, onRemove)
cache.shards[i] = initNewShard(config, onRemove, clock)
}

if config.CleanWindow > 0 {
Expand All @@ -74,84 +73,30 @@ func newBigCache(config Config, clock clock) (*BigCache, error) {
func (c *BigCache) Get(key string) ([]byte, error) {
hashedKey := c.hash.Sum64(key)
shard := c.getShard(hashedKey)
shard.lock.RLock()

itemIndex := shard.hashmap[hashedKey]

if itemIndex == 0 {
shard.lock.RUnlock()
return nil, notFound(key)
}

wrappedEntry, err := shard.entries.Get(int(itemIndex))
if err != nil {
shard.lock.RUnlock()
return nil, err
}
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
if c.config.Verbose {
log.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
}
shard.lock.RUnlock()
return nil, notFound(key)
}
shard.lock.RUnlock()
return readEntry(wrappedEntry), nil
return shard.get(key, hashedKey)
}

// Set saves entry under the key
func (c *BigCache) Set(key string, entry []byte) error {
hashedKey := c.hash.Sum64(key)
shard := c.getShard(hashedKey)
shard.lock.Lock()

currentTimestamp := uint64(c.clock.epoch())

if previousIndex := shard.hashmap[hashedKey]; previousIndex != 0 {
if previousEntry, err := shard.entries.Get(int(previousIndex)); err == nil {
resetKeyFromEntry(previousEntry)
}
}

if oldestEntry, err := shard.entries.Peek(); err == nil {
c.onEvict(oldestEntry, currentTimestamp, shard.removeOldestEntry)
}

w := wrapEntry(currentTimestamp, hashedKey, key, entry, &shard.entryBuffer)

for {
if index, err := shard.entries.Push(w); err == nil {
shard.hashmap[hashedKey] = uint32(index)
shard.lock.Unlock()
return nil
} else if shard.removeOldestEntry() != nil {
shard.lock.Unlock()
return fmt.Errorf("Entry is bigger than max shard size.")
}
}
return shard.set(key, hashedKey, entry)
}

// Reset empties all cache shards
func (c *BigCache) Reset() error {
for _, shard := range c.shards {
shard.lock.Lock()
shard.reset(c.config)
shard.lock.Unlock()
}

return nil
}

// Len computes number of entries in cache
func (c *BigCache) Len() int {
var len int

for _, shard := range c.shards {
shard.lock.RLock()
len += shard.len()
shard.lock.RUnlock()
}

return len
}

Expand All @@ -171,15 +116,7 @@ func (c *BigCache) onEvict(oldestEntry []byte, currentTimestamp uint64, evict fu

func (c *BigCache) cleanUp(currentTimestamp uint64) {
for _, shard := range c.shards {
shard.lock.Lock()
for {
if oldestEntry, err := shard.entries.Peek(); err != nil {
break
} else if evicted := c.onEvict(oldestEntry, currentTimestamp, shard.removeOldestEntry); !evicted {
break
}
}
shard.lock.Unlock()
shard.cleanUp(currentTimestamp)
}
}

Expand Down
21 changes: 3 additions & 18 deletions iterator.go
Expand Up @@ -55,21 +55,6 @@ type EntryInfoIterator struct {
valid bool
}

func copyCurrentShardMap(shard *cacheShard) ([]uint32, int) {
shard.lock.RLock()

var elements = make([]uint32, len(shard.hashmap))
next := 0

for _, index := range shard.hashmap {
elements[next] = index
next++
}

shard.lock.RUnlock()
return elements, next
}

// SetNext moves to next element and returns true if it exists.
func (it *EntryInfoIterator) SetNext() bool {
it.mutex.Lock()
Expand All @@ -84,7 +69,7 @@ func (it *EntryInfoIterator) SetNext() bool {
}

for i := it.currentShard + 1; i < it.cache.config.Shards; i++ {
it.elements, it.elementsCount = copyCurrentShardMap(it.cache.shards[i])
it.elements, it.elementsCount = it.cache.shards[i].copyKeys()

// Non empty shard - stick with it
if it.elementsCount > 0 {
Expand All @@ -100,7 +85,7 @@ func (it *EntryInfoIterator) SetNext() bool {
}

func newIterator(cache *BigCache) *EntryInfoIterator {
elements, count := copyCurrentShardMap(cache.shards[0])
elements, count := cache.shards[0].copyKeys()

return &EntryInfoIterator{
cache: cache,
Expand All @@ -120,7 +105,7 @@ func (it *EntryInfoIterator) Value() (EntryInfo, error) {
return emptyEntryInfo, ErrInvalidIteratorState
}

entry, err := it.cache.shards[it.currentShard].entries.Get(int(it.elements[it.currentIndex]))
entry, err := it.cache.shards[it.currentShard].getEntry(int(it.elements[it.currentIndex]))

if err != nil {
it.mutex.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion iterator_test.go
Expand Up @@ -96,7 +96,7 @@ func TestEntriesIteratorWithConcurrentUpdate(t *testing.T) {

// Quite ugly but works
for i := 0; i < cache.config.Shards; i++ {
if oldestEntry, err := cache.shards[i].entries.Peek(); err == nil {
if oldestEntry, err := cache.shards[i].getOldestEntry(); err == nil {
cache.onEvict(oldestEntry, 10, cache.shards[i].removeOldestEntry)
}
}
Expand Down
116 changes: 114 additions & 2 deletions shard.go
@@ -1,6 +1,8 @@
package bigcache

import (
"fmt"
"log"
"sync"

"github.com/allegro/bigcache/queue"
Expand All @@ -12,10 +14,111 @@ type cacheShard struct {
lock sync.RWMutex
entryBuffer []byte
onRemove func(wrappedEntry []byte)

isVerbose bool
clock clock
lifeWindow uint64
}

type onRemoveCallback func(wrappedEntry []byte)

func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
s.lock.RLock()
itemIndex := s.hashmap[hashedKey]

if itemIndex == 0 {
s.lock.RUnlock()
return nil, notFound(key)
}

wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
s.lock.RUnlock()
return nil, err
}
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
if s.isVerbose {
log.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
}
s.lock.RUnlock()
return nil, notFound(key)
}
s.lock.RUnlock()
return readEntry(wrappedEntry), nil
}

func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
currentTimestamp := uint64(s.clock.epoch())

s.lock.Lock()

if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
resetKeyFromEntry(previousEntry)
}
}

if oldestEntry, err := s.entries.Peek(); err == nil {
s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
}

w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)

for {
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint32(index)
s.lock.Unlock()
return nil
} else if s.removeOldestEntry() != nil {
s.lock.Unlock()
return fmt.Errorf("Entry is bigger than max shard size.")
}
}
}

func (s *cacheShard) onEvict(oldestEntry []byte, currentTimestamp uint64, evict func() error) bool {
oldestTimestamp := readTimestampFromEntry(oldestEntry)
if currentTimestamp-oldestTimestamp > s.lifeWindow {
evict()
return true
}
return false
}

func (s *cacheShard) cleanUp(currentTimestamp uint64) {
s.lock.Lock()
for {
if oldestEntry, err := s.entries.Peek(); err != nil {
break
} else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted {
break
}
}
s.lock.Unlock()
}

func (s *cacheShard) getOldestEntry() ([]byte, error) {
return s.entries.Peek()
}

func (s *cacheShard) getEntry(index int) ([]byte, error) {
return s.entries.Get(index)
}

func (s *cacheShard) copyKeys() (keys []uint32, next int) {
keys = make([]uint32, len(s.hashmap))

s.lock.RLock()

for _, index := range s.hashmap {
keys[next] = index
next++
}

s.lock.RUnlock()
return keys, next
}

func (s *cacheShard) removeOldestEntry() error {
oldest, err := s.entries.Pop()
if err == nil {
Expand All @@ -28,20 +131,29 @@ func (s *cacheShard) removeOldestEntry() error {
}

func (s *cacheShard) reset(config Config) {
s.lock.Lock()
s.hashmap = make(map[uint64]uint32, config.initialShardSize())
s.entryBuffer = make([]byte, config.MaxEntrySize+headersSizeInBytes)
s.entries.Reset()
s.lock.Unlock()
}

func (s *cacheShard) len() int {
return len(s.hashmap)
s.lock.RLock()
res := len(s.hashmap)
s.lock.RUnlock()
return res
}

func initNewShard(config Config, callback onRemoveCallback) *cacheShard {
func initNewShard(config Config, callback onRemoveCallback, clock clock) *cacheShard {
return &cacheShard{
hashmap: make(map[uint64]uint32, config.initialShardSize()),
entries: *queue.NewBytesQueue(config.initialShardSize()*config.MaxEntrySize, config.maximumShardSize(), config.Verbose),
entryBuffer: make([]byte, config.MaxEntrySize+headersSizeInBytes),
onRemove: callback,

isVerbose: config.Verbose,
clock: clock,
lifeWindow: uint64(config.LifeWindow.Seconds()),
}
}

0 comments on commit 2831151

Please sign in to comment.