diff --git a/cache.go b/cache.go index 603647ab..0c3e272e 100644 --- a/cache.go +++ b/cache.go @@ -23,6 +23,7 @@ import ( "bytes" "errors" "fmt" + "sync" "sync/atomic" "time" @@ -36,6 +37,12 @@ const ( type onEvictFunc func(uint64, uint64, interface{}, int64) +var itemPool = &sync.Pool{ + New: func() interface{} { + return &item{} + }, +} + // Cache is a thread-safe implementation of a hashmap with a TinyLFU admission // policy and a Sampled LFU eviction policy. You can use the same Cache instance // from as many goroutines as you want. @@ -128,6 +135,10 @@ type item struct { expiration time.Time } +func (i *item) reset() { + i.flag, i.key, i.conflict, i.value, i.cost, i.expiration = 0, 0, 0, nil, 0, time.Time{} +} + // NewCache returns a new Cache instance and any configuration errors, if any. func NewCache(config *Config) (*Cache, error) { switch { @@ -216,14 +227,13 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration } keyHash, conflictHash := c.keyToHash(key) - i := &item{ - flag: itemNew, - key: keyHash, - conflict: conflictHash, - value: value, - cost: cost, - expiration: expiration, - } + i := itemPool.Get().(*item) + i.flag = itemNew + i.key = keyHash + i.conflict = conflictHash + i.value = value + i.cost = cost + i.expiration = expiration // cost is eventually updated. The expiration must also be immediately updated // to prevent items from being prematurely removed from the map. if c.store.Update(i) { @@ -251,11 +261,11 @@ func (c *Cache) Del(key interface{}) { // So we must push the same item to `setBuf` with the deletion flag. // This ensures that if a set is followed by a delete, it will be // applied in the correct order. - c.setBuf <- &item{ - flag: itemDelete, - key: keyHash, - conflict: conflictHash, - } + i := itemPool.Get().(*item) + i.flag = itemDelete + i.key = keyHash + i.conflict = conflictHash + c.setBuf <- i } // Close stops all goroutines and closes all channels. @@ -314,6 +324,8 @@ func (c *Cache) processItems() { if c.onEvict != nil { c.onEvict(victim.key, victim.conflict, victim.value, victim.cost) } + victim.reset() + itemPool.Put(victim) } case itemUpdate: @@ -323,6 +335,8 @@ func (c *Cache) processItems() { c.policy.Del(i.key) // Deals with metrics updates. c.store.Del(i.key, i.conflict) } + i.reset() + itemPool.Put(i) case <-c.cleanupTicker.C: c.store.Cleanup(c.policy, c.onEvict) case <-c.stop: diff --git a/cache_test.go b/cache_test.go index c1dc7456..ee1ed522 100644 --- a/cache_test.go +++ b/cache_test.go @@ -610,6 +610,22 @@ func TestCacheMetricsClear(t *testing.T) { c.Metrics.Clear() } +func BenchmarkConcurrentCacheSetNeverEvict(b *testing.B) { + c, err := NewCache(&Config{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + }) + require.NoError(b, err) + b.RunParallel(func(pb *testing.PB) { + cnt := rand.Int() + for pb.Next() { + cnt++ + c.Set(cnt, cnt, 0) + } + }) +} + func init() { // Set bucketSizeSecs to 1 to avoid waiting too much during the tests. bucketDurationSecs = 1 diff --git a/policy.go b/policy.go index 00664980..274920d7 100644 --- a/policy.go +++ b/policy.go @@ -175,11 +175,9 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) { sample[minId] = sample[len(sample)-1] sample = sample[:len(sample)-1] // Store victim in evicted victims slice. - victims = append(victims, &item{ - key: minKey, - conflict: 0, - cost: minCost, - }) + i := itemPool.Get().(*item) + i.key, i.conflict, i.cost = minKey, 0, minCost + victims = append(victims, i) } p.evict.add(key, cost) p.metrics.add(costAdd, key, uint64(cost))