Skip to content

Commit

Permalink
Implemented new Append() method (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
snacker81 committed Feb 5, 2020
1 parent fcb069f commit 75a65a8
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 0 deletions.
9 changes: 9 additions & 0 deletions bigcache.go
Expand Up @@ -134,6 +134,15 @@ func (c *BigCache) Set(key string, entry []byte) error {
return shard.set(key, hashedKey, entry)
}

// Append appends entry under the key if key exists, otherwise
// it will set the key (same behaviour as Set()). With Append() you can
// concatenate multiple entries under the same key in an lock optimized way.
func (c *BigCache) Append(key string, entry []byte) error {
hashedKey := c.hash.Sum64(key)
shard := c.getShard(hashedKey)
return shard.append(key, hashedKey, entry)
}

// Delete removes the key
func (c *BigCache) Delete(key string) error {
hashedKey := c.hash.Sum64(key)
Expand Down
114 changes: 114 additions & 0 deletions bigcache_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"math/rand"
"runtime"
"strings"
"sync"
"testing"
"time"
Expand All @@ -27,6 +28,119 @@ func TestWriteAndGetOnCache(t *testing.T) {
assertEqual(t, value, cachedValue)
}

func TestAppendAndGetOnCache(t *testing.T) {
t.Parallel()

// given
cache, _ := NewBigCache(DefaultConfig(5 * time.Second))
key := "key"
value1 := make([]byte, 50)
rand.Read(value1)
value2 := make([]byte, 50)
rand.Read(value2)
value3 := make([]byte, 50)
rand.Read(value3)

// when
_, err := cache.Get(key)

// then
assertEqual(t, ErrEntryNotFound, err)

// when
cache.Append(key, value1)
cachedValue, err := cache.Get(key)

// then
noError(t, err)
assertEqual(t, value1, cachedValue)

// when
cache.Append(key, value2)
cachedValue, err = cache.Get(key)

// then
noError(t, err)
expectedValue := value1
expectedValue = append(expectedValue, value2...)
assertEqual(t, expectedValue, cachedValue)

// when
cache.Append(key, value3)
cachedValue, err = cache.Get(key)

// then
noError(t, err)
expectedValue = value1
expectedValue = append(expectedValue, value2...)
expectedValue = append(expectedValue, value3...)
assertEqual(t, expectedValue, cachedValue)
}

// TestAppendRandomly does simultaneous appends to check for corruption errors.
func TestAppendRandomly(t *testing.T) {
t.Parallel()

c := Config{
Shards: 1,
LifeWindow: 5 * time.Second,
CleanWindow: 1 * time.Second,
MaxEntriesInWindow: 1000 * 10 * 60,
MaxEntrySize: 500,
StatsEnabled: true,
Verbose: true,
Hasher: newDefaultHasher(),
HardMaxCacheSize: 1,
Logger: DefaultLogger(),
}
cache, err := NewBigCache(c)
noError(t, err)

nKeys := 5
nAppendsPerKey := 2000
nWorker := 10
var keys []string
for i := 0; i < nKeys; i++ {
for j := 0; j < nAppendsPerKey; j++ {
keys = append(keys, fmt.Sprintf("key%d", i))
}
}
rand.Shuffle(len(keys), func(i, j int) {
keys[i], keys[j] = keys[j], keys[i]
})

jobs := make(chan string, len(keys))
for _, key := range keys {
jobs <- key
}
close(jobs)

var wg sync.WaitGroup
for i := 0; i < nWorker; i++ {
wg.Add(1)
go func() {
for {
key, ok := <-jobs
if !ok {
break
}
cache.Append(key, []byte(key))
}
wg.Done()
}()
}
wg.Wait()

assertEqual(t, nKeys, cache.Len())
for i := 0; i < nKeys; i++ {
key := fmt.Sprintf("key%d", i)
expectedValue := []byte(strings.Repeat(key, nAppendsPerKey))
cachedValue, err := cache.Get(key)
noError(t, err)
assertEqual(t, expectedValue, cachedValue)
}
}

func TestConstructCacheWithDefaultHasher(t *testing.T) {
t.Parallel()

Expand Down
70 changes: 70 additions & 0 deletions shard.go
Expand Up @@ -84,6 +84,24 @@ func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
return entry, nil
}

func (s *cacheShard) getWithoutLock(key string, hashedKey uint64) ([]byte, error) {
wrappedEntry, err := s.getWrappedEntry(hashedKey)
if err != nil {
return nil, err
}
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
s.collision()
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
}
return nil, ErrEntryNotFound
}
entry := readEntry(wrappedEntry)
s.hitWithoutLock(hashedKey)

return entry, nil
}

func (s *cacheShard) getWrappedEntry(hashedKey uint64) ([]byte, error) {
itemIndex := s.hashmap[hashedKey]

Expand Down Expand Up @@ -131,6 +149,51 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
}
}

func (s *cacheShard) setWithoutLock(key string, hashedKey uint64, entry []byte) error {
currentTimestamp := uint64(s.clock.epoch())

if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
resetKeyFromEntry(previousEntry)
}
}

if oldestEntry, err := s.entries.Peek(); err == nil {
s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
}

w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)

for {
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint32(index)
return nil
}
if s.removeOldestEntry(NoSpace) != nil {
return fmt.Errorf("entry is bigger than max shard size")
}
}
}

func (s *cacheShard) append(key string, hashedKey uint64, entry []byte) error {
s.lock.Lock()
var newEntry []byte
oldEntry, err := s.getWithoutLock(key, hashedKey)
if err != nil {
if err != ErrEntryNotFound {
s.lock.Unlock()
return err
}
} else {
newEntry = oldEntry
}

newEntry = append(newEntry, entry...)
err = s.setWithoutLock(key, hashedKey, newEntry)
s.lock.Unlock()
return err
}

func (s *cacheShard) del(hashedKey uint64) error {
// Optimistic pre-check using only readlock
s.lock.RLock()
Expand Down Expand Up @@ -287,6 +350,13 @@ func (s *cacheShard) hit(key uint64) {
}
}

func (s *cacheShard) hitWithoutLock(key uint64) {
atomic.AddInt64(&s.stats.Hits, 1)
if s.statsEnabled {
s.hashmapStats[key]++
}
}

func (s *cacheShard) miss() {
atomic.AddInt64(&s.stats.Misses, 1)
}
Expand Down

0 comments on commit 75a65a8

Please sign in to comment.