Skip to content

Commit

Permalink
make PubSub tests more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
paskal committed May 28, 2020
1 parent b9b9add commit f227a26
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 21 deletions.
21 changes: 18 additions & 3 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package lcw
import (
"errors"
"fmt"
"log"
"math/rand"
"strings"
"sync"
Expand Down Expand Up @@ -661,19 +660,35 @@ func (s sizedString) MarshalBinary() (data []byte, err error) {
type mockPubSub struct {
calledKeys []string
fns []func(fromID string, key string)
sync.Mutex
sync.WaitGroup
}

func (m *mockPubSub) CalledKeys() []string {
m.Lock()
defer m.Unlock()
return m.calledKeys
}

func (m *mockPubSub) Subscribe(fn func(fromID string, key string)) error {
m.Lock()
defer m.Unlock()
m.fns = append(m.fns, fn)
return nil
}

func (m *mockPubSub) Publish(fromID, key string) error {
m.Lock()
defer m.Unlock()
m.calledKeys = append(m.calledKeys, key)
for _, fn := range m.fns {
fn := fn
m.Add(1)
// run in goroutine to prevent deadlock
go fn(fromID, key)
log.Println("!!!", fromID, key)
go func() {
fn(fromID, key)
m.Done()
}()
}
return nil
}
41 changes: 33 additions & 8 deletions expirable_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ func TestExpirableCacheWithBus(t *testing.T) {
ps := &mockPubSub{}
lc1, err := NewExpirableCache(MaxKeys(5), TTL(time.Millisecond*100), EventBus(ps))
require.NoError(t, err)
defer lc1.Close()

lc2, err := NewExpirableCache(MaxKeys(50), TTL(time.Millisecond*5000), EventBus(ps))
require.NoError(t, err)
defer lc2.Close()

start := time.Now()
t.Log("start: ", start)

// add 5 keys to the first node cache
for i := 0; i < 5; i++ {
Expand All @@ -131,26 +136,46 @@ func TestExpirableCacheWithBus(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

assert.Equal(t, 0, len(ps.calledKeys), "no events")
t.Log("after 5 keys add: ", time.Since(start))

assert.Equal(t, 0, len(ps.CalledKeys()), "no events")
assert.Equal(t, 5, lc1.Stat().Keys)
assert.Equal(t, int64(5), lc1.Stat().Misses)

t.Log("after checks for lc1: ", time.Since(start))

// add key-1 key to the second node
_, e := lc2.Get(fmt.Sprintf("key-1"), func() (Value, error) {
_, e := lc2.Get("key-1", func() (Value, error) {
return "result-111", nil
})
assert.NoError(t, e)
assert.Equal(t, 1, lc2.Stat().Keys)
assert.Equal(t, int64(1), lc2.Stat().Misses, lc2.Stat())

time.Sleep(55 * time.Millisecond) // let key-1 expire
assert.Equal(t, 1, len(ps.calledKeys), "1 event, key-0 expired")
assert.Equal(t, 4, lc1.Stat().Keys)
assert.Equal(t, 1, lc2.Stat().Keys, "key-1 still in cache2")
t.Log("after add key to lc2: ", time.Since(start))
newStart := time.Now()

// let key-1 expire
for {
t.Log("wait key-1 expiring: ", time.Since(newStart))
if len(ps.CalledKeys()) > 0 {
t.Log("key-1 expired: ", time.Since(newStart), time.Since(start))
assert.Equal(t, 4, lc1.Stat().Keys)
assert.Equal(t, 1, lc2.Stat().Keys, "key-1 still in cache2")
break
}
lc1.backend.DeleteExpired() // enforce DeleteExpired for GitHub earlier than TTL/2
ps.Wait() // wait for onBusEvent goroutines to finish
time.Sleep(time.Millisecond * 10)
}

t.Log("going to all keys expire: ", time.Since(start))
time.Sleep(210 * time.Millisecond) // let all keys expire
assert.Equal(t, 6, len(ps.calledKeys), "6 events, key-1 expired %+v", ps.calledKeys)
t.Log("last sleep: ", time.Since(start))
ps.Wait() // wait for onBusEvent goroutines to finish
t.Log("after wait: ", time.Since(start))
assert.Equal(t, 6, len(ps.CalledKeys()), "6 events, key-1 expired %+v", ps.calledKeys)
assert.Equal(t, 0, lc1.Stat().Keys)
assert.Equal(t, 0, lc2.Stat().Keys, "key-1 removed from cache2")

t.Log("end: ", time.Since(start))
}
2 changes: 0 additions & 2 deletions lru_cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package lcw

import (
"log"
"sync/atomic"

"github.com/google/uuid"
Expand Down Expand Up @@ -151,7 +150,6 @@ func (c *LruCache) Close() error {
// onBusEvent reacts on invalidation message triggered by event bus from another cache instance
func (c *LruCache) onBusEvent(id, key string) {
if id != c.id && c.backend.Contains(key) { // prevent reaction on event from this cache
log.Println("!! ", id, key)
c.backend.Remove(key)
}
}
Expand Down
28 changes: 20 additions & 8 deletions lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) {
var coldCalls int32
lc1, err := NewLruCache(MaxKeys(5), MaxValSize(10), EventBus(ps))
require.Nil(t, err)
defer lc1.Close()

lc2, err := NewLruCache(MaxKeys(50), MaxValSize(100), EventBus(ps))
require.Nil(t, err)
defer lc2.Close()

// put 5 keys to cache1
for i := 0; i < 5; i++ {
Expand All @@ -99,41 +101,51 @@ func TestLruCache_MaxKeysWithBus(t *testing.T) {
atomic.AddInt32(&coldCalls, 1)
return fmt.Sprintf("result-%d", i), nil
})
assert.Nil(t, e)
assert.NoError(t, e)
assert.Equal(t, fmt.Sprintf("result-%d", i), res.(string))
assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls))
}
// check if really cached
res, err := lc1.Get("key-3", func() (Value, error) {
return "result-blah", nil
})
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, "result-3", res.(string), "should be cached")

assert.Equal(t, 0, len(ps.CalledKeys()), "no events")

// put 1 key to cache2
res, e := lc2.Get(fmt.Sprintf("key-1"), func() (Value, error) {
return fmt.Sprintf("result-111"), nil
})
assert.Nil(t, e)
assert.Equal(t, fmt.Sprintf("result-111"), res.(string))
assert.NoError(t, e)
assert.Equal(t, "result-111", res.(string))

// try to cache1 after maxKeys reached, will remove key-0
res, err = lc1.Get("key-X", func() (Value, error) {
return "result-X", nil
})
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, "result-X", res.(string))
assert.Equal(t, 5, lc1.backend.Len())

assert.Equal(t, 1, len(ps.CalledKeys()), "1 event, key-0 expired")

assert.Equal(t, 1, lc2.backend.Len(), "cache2 still has key-1")

// try to cache1 after maxKeys reached, will remove key-1
res, err = lc1.Get("key-X2", func() (Value, error) {
return "result-X", nil
})
assert.Nil(t, err)
assert.Equal(t, "result-2", res.(string))
assert.Equal(t, 1, lc2.backend.Len(), "cache2 removed key-1")
assert.NoError(t, err)
assert.Equal(t, "result-X", res.(string))

assert.Equal(t, 2, len(ps.CalledKeys()), "2 events, key-1 expired")

// wait for onBusEvent goroutines to finish
ps.Wait()

assert.Equal(t, 0, lc2.backend.Len(), "cache2 removed key-1")
}

// LruCache illustrates the use of LRU loading cache
Expand Down

0 comments on commit f227a26

Please sign in to comment.