Skip to content

Commit

Permalink
remove concurrency test
Browse files Browse the repository at this point in the history
  • Loading branch information
hookokoko committed Dec 25, 2022
1 parent 3ca7b38 commit 0f1c6d3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 33 deletions.
12 changes: 8 additions & 4 deletions client/cache/bloom_filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ import (
"time"

"github.com/beego/beego/v2/core/berror"
"github.com/bits-and-blooms/bloom/v3"
)

type BloomFilterCache struct {
Cache
*bloom.BloomFilter
BloomFilter
loadFunc func(ctx context.Context, key string) (any, error)
expiration time.Duration // set cache expiration, default never expire
}

func NewBloomFilterCache(cache Cache, ln func(context.Context, string) (any, error), blm *bloom.BloomFilter,
type BloomFilter interface {
Test(data string) bool
Add(data string)
}

func NewBloomFilterCache(cache Cache, ln func(context.Context, string) (any, error), blm BloomFilter,
expiration time.Duration,
) (*BloomFilterCache, error) {
if cache == nil || ln == nil || blm == nil {
Expand All @@ -51,7 +55,7 @@ func (bfc *BloomFilterCache) Get(ctx context.Context, key string) (any, error) {
return nil, err
}
if errors.Is(err, ErrKeyNotExist) {
exist := bfc.BloomFilter.TestString(key)
exist := bfc.BloomFilter.Test(key)
if exist {
val, err = bfc.loadFunc(ctx, key)
if err != nil {
Expand Down
85 changes: 56 additions & 29 deletions client/cache/bloom_filter_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cache
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -33,10 +32,36 @@ type MockDB struct {
loadCnt int64
}

type BloomFilterMock struct {
*bloom.BloomFilter
lock *sync.RWMutex
concurrent bool
}

func (b *BloomFilterMock) Add(data string) {
if b.concurrent {
b.lock.Lock()
defer b.lock.Unlock()
}
b.BloomFilter.AddString(data)
}

func (b *BloomFilterMock) Test(data string) bool {
if b.concurrent {
b.lock.Lock()
defer b.lock.Unlock()
}
return b.BloomFilter.TestString(data)
}

var (
mockDB = MockDB{Db: NewMemoryCache(), loadCnt: 0}
mockBloom = bloom.NewWithEstimates(20000, 0.99)
loadFunc = func(ctx context.Context, key string) (any, error) {
mockBloom = &BloomFilterMock{
BloomFilter: bloom.NewWithEstimates(20000, 0.01),
lock: &sync.RWMutex{},
concurrent: false,
}
loadFunc = func(ctx context.Context, key string) (any, error) {
mockDB.loadCnt += 1 // flag of number load data from db
v, err := mockDB.Db.Get(context.Background(), key)
if err != nil {
Expand Down Expand Up @@ -97,7 +122,7 @@ func TestBloomFilterCache_Get(t *testing.T) {
before: func() {
_ = mockDB.Db.ClearAll(context.Background())
_ = mockDB.Db.Put(context.Background(), "exist_in_DB", "exist_in_DB", 0)
mockBloom.AddString("exist_in_DB")
mockBloom.Add("exist_in_DB")
},
key: "exist_in_DB",
wantVal: "exist_in_DB",
Expand All @@ -114,7 +139,7 @@ func TestBloomFilterCache_Get(t *testing.T) {
{
name: "load db fail",
before: func() {
mockBloom.AddString("not_exist_in_DB")
mockBloom.Add("not_exist_in_DB")
},
after: func() {
assert.Equal(t, mockDB.loadCnt, int64(1))
Expand Down Expand Up @@ -150,27 +175,29 @@ func TestBloomFilterCache_Get(t *testing.T) {
}
}

func TestBloomFilterCache_Get_Concurrency(t *testing.T) {
bfc, err := NewBloomFilterCache(cacheUnderlying, loadFunc, mockBloom, time.Minute)
assert.Nil(t, err)

_ = mockDB.Db.ClearAll(context.Background())
_ = mockDB.Db.Put(context.Background(), "key_11", "value_11", 0)
mockBloom.AddString("key_11")

var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key_%d", i)
go func(key string) {
defer wg.Done()
val, _ := bfc.Get(context.Background(), key)

if val != nil {
assert.Equal(t, "value_11", val)
}
}(key)
}
wg.Wait()
assert.Equal(t, int64(1), mockDB.loadCnt)
}
// This implementation of Bloom filters cache is NOT safe for concurrent use.
// Uncomment the following method.
// func TestBloomFilterCache_Get_Concurrency(t *testing.T) {
// bfc, err := NewBloomFilterCache(cacheUnderlying, loadFunc, mockBloom, time.Minute)
// assert.Nil(t, err)
//
// _ = mockDB.Db.ClearAll(context.Background())
// _ = mockDB.Db.Put(context.Background(), "key_11", "value_11", 0)
// mockBloom.AddString("key_11")
//
// var wg sync.WaitGroup
// wg.Add(100000)
// for i := 0; i < 100000; i++ {
// key := fmt.Sprintf("key_%d", i)
// go func(key string) {
// defer wg.Done()
// val, _ := bfc.Get(context.Background(), key)
//
// if val != nil {
// assert.Equal(t, "value_11", val)
// }
// }(key)
// }
// wg.Wait()
// assert.Equal(t, int64(1), mockDB.loadCnt)
// }

0 comments on commit 0f1c6d3

Please sign in to comment.