Skip to content

Commit

Permalink
rewrite internal cache package for lcw requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
paskal committed May 2, 2020
1 parent 3f55de9 commit 1a4431f
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 389 deletions.
255 changes: 144 additions & 111 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
// Package cache implements LoadingCache similar to Guava's cache.
// Package cache implements LoadingCache.
//
// Usual way is to use Get(key, ttl, func loader), but direct access to values
// possible with GetValue. Cache also keeps stats and exposes it with Stat method.
// Supported size-based eviction with LRC and LRU, and TTL based eviction.
// Support size-based eviction with LRC and LRU, and TTL based eviction.
package cache

import (
"fmt"
"log"
"sort"
"sync"
"time"

"github.com/pkg/errors"
)

// LoadingCache defines loading cache interface
type LoadingCache interface {
fmt.Stringer
Get(key string, ttl time.Duration, fn func() (data interface{}, err error)) (interface{}, error)
GetValue(key string) (interface{}, bool)
Get(key string) (interface{}, bool)
Peek(key string) (interface{}, bool)
Set(key string, value interface{})
Keys() []string
ItemCount() int
Invalidate(key string)
InvalidateFn(fn func(key string) bool)
Stat() Stats
Purge()
DeleteExpired()
Close()
}

// loadingCacheImpl provides loading cache with on-demand loading, similar to Guava LoadingCache.
Expand All @@ -29,129 +31,182 @@ type LoadingCache interface {
// consistency loadingCacheImpl locking on particular key, but no locks across multiple keys.
type loadingCacheImpl struct {
purgeEvery time.Duration
maxKeys int
allowError bool
ttl time.Duration
maxKeys int64
isLRU bool
done chan struct{}
onEvicted func(key string, value interface{})

sync.Mutex
stat Stats
data map[string]*cacheItem
}

// Stats provides statistics for cache
type Stats struct {
Size int // number of records in cache
Hits, Misses int // cache effectiveness
Added, Evicted int // number of added and auto-evicted records
}

const noEvictionTTL = time.Hour * 24 * 365 * 10

// NewLoadingCache returns a new cache, activates purge with purgeEvery (0 to never purge)
func NewLoadingCache(options ...Option) LoadingCache {
// NewLoadingCache returns a new cache, activates purge with purgeEvery (0 to never purge).
// Default MaxKeys is unlimited (0).
func NewLoadingCache(options ...Option) (LoadingCache, error) {
res := loadingCacheImpl{
data: map[string]*cacheItem{},
purgeEvery: 0,
maxKeys: 0,
done: make(chan struct{}),
}

for _, opt := range options {
if err := opt(&res); err != nil {
log.Printf("[WARN] failed to set cache option, %v", err)
return nil, errors.Wrap(err, "failed to set cache option")
}
}

if res.ttl == 0 {
res.ttl = noEvictionTTL // very long ttl to prevent eviction
}

if res.maxKeys > 0 || res.purgeEvery > 0 {
if res.purgeEvery == 0 {
res.purgeEvery = time.Minute * 5 // non-zero purge enforced because maxKeys defined
}
go func() {
go func(done <-chan struct{}) {
ticker := time.NewTicker(res.purgeEvery)
for range ticker.C {
res.Lock()
res.purge(res.maxKeys)
res.Unlock()
for {
select {
case <-done:
return
case <-ticker.C:
res.Lock()
res.purge(res.maxKeys)
res.Unlock()
}
}
}()
}(res.done)
}
log.Printf("[DEBUG] cache created. purge=%v, max=%d", res.purgeEvery, res.maxKeys)
return &res
return &res, nil
}

// Get by key if found, or get value and return
func (c *loadingCacheImpl) Get(key string, ttl time.Duration, f func() (interface{}, error)) (interface{}, error) {

// Set key
func (c *loadingCacheImpl) Set(key string, value interface{}) {
c.Lock()
var ci *cacheItem
if ci = c.data[key]; ci == nil {

itemTTL := ttl
if ttl == 0 {
itemTTL = noEvictionTTL // very long ttl to prevent eviction
}
defer c.Unlock()

ci = &cacheItem{fun: f, ttl: itemTTL}
c.data[key] = ci
c.stat.Added++
now := time.Now()
_, ok := c.data[key]
if !ok {
c.data[key] = &cacheItem{}
}
c.data[key].data = value
c.data[key].expiresAt = now.Add(c.ttl)
if c.isLRU {
c.data[key].lastReadAt = now
}
c.Unlock()

// we don't want to block the possible call as this may take time
// and also cause dead-lock if one cached value calls (the same cache) for another
data, called, err := ci.value(c.allowError)
if c.maxKeys > 0 && int64(len(c.data)) >= c.maxKeys*2 {
c.purge(c.maxKeys)
}
}

// Get returns the key value
func (c *loadingCacheImpl) Get(key string) (interface{}, bool) {
c.Lock()
defer c.Unlock()
c.stat.Size = len(c.data)
if called {
c.stat.Misses++
} else {
c.stat.Hits++
value, ok := c.getValue(key)
if !ok {
return nil, false
}
if c.maxKeys > 0 && c.stat.Size >= c.maxKeys*2 {
c.purge(c.maxKeys)
if c.isLRU {
c.data[key].lastReadAt = time.Now()
}
return data, err
return value, ok
}

// GetValue by key from cache
func (c *loadingCacheImpl) GetValue(key string) (interface{}, bool) {
// Peek returns the key value (or undefined if not found) without updating the "recently used"-ness of the key.
func (c *loadingCacheImpl) Peek(key string) (interface{}, bool) {
c.Lock()
defer c.Unlock()
res, ok := c.data[key]
value, ok := c.getValue(key)
if !ok {
return nil, false
}
return res.data, ok
return value, ok
}

// Invalidate key (item) from the cache
func (c *loadingCacheImpl) Invalidate(key string) {
c.Lock()
delete(c.data, key)
if value, ok := c.data[key]; ok {
if c.onEvicted != nil {
c.onEvicted(key, value)
}
delete(c.data, key)
}
c.Unlock()
}

// InvalidateFn deletes multiple keys if predicate is true
func (c *loadingCacheImpl) InvalidateFn(fn func(key string) bool) {
c.Lock()
for key := range c.data {
for key, value := range c.data {
if fn(key) {
if c.onEvicted != nil {
c.onEvicted(key, value)
}
delete(c.data, key)
}
}
c.Unlock()
}

// Stat gets the current stats for cache
func (c *loadingCacheImpl) Stat() Stats {
// Keys return slice of current keys in the cache
func (c *loadingCacheImpl) Keys() []string {
c.Lock()
defer c.Unlock()
return c.stat
keys := make([]string, 0, len(c.data))
for k := range c.data {
keys = append(keys, k)
}
return keys
}

func (c *loadingCacheImpl) String() string {
return fmt.Sprintf("%+v (%0.1f%%)", c.Stat(), 100*float64(c.Stat().Hits)/float64(c.stat.Hits+c.stat.Misses))
// get value respecting the expiration, should be called with lock
func (c *loadingCacheImpl) getValue(key string) (interface{}, bool) {
value, ok := c.data[key]
if !ok {
return nil, false
}
if time.Now().After(c.data[key].expiresAt) {
return nil, false
}
return value.data, ok
}

// Purge clears the cache completely.
func (c *loadingCacheImpl) Purge() {
c.Lock()
defer c.Unlock()
c.purge(-1)
}

// DeleteExpired clears cache of expired items
func (c *loadingCacheImpl) DeleteExpired() {
c.Lock()
defer c.Unlock()
c.purge(0)
}

// ItemCount return count of items in cache
func (c *loadingCacheImpl) ItemCount() int {
c.Lock()
n := len(c.data)
c.Unlock()
return n
}

// Close cleans the cache and destroys running goroutines
func (c *loadingCacheImpl) Close() {
c.Lock()
defer c.Unlock()
close(c.done)
c.purge(-1)
}

// keysWithTs includes list of keys with ts. This is for sorting keys
Expand All @@ -161,24 +216,25 @@ type keysWithTs []struct {
ts time.Time
}

// purge records > max.size. Has to be called with lock!
func (c *loadingCacheImpl) purge(maxKeys int) {

// purge records > maxKeys. Has to be called with lock!
// call with maxKeys 0 will only clear expired entries, with -1 will clear everything.
func (c *loadingCacheImpl) purge(maxKeys int64) {
kts := keysWithTs{}

for key, cacheItem := range c.data {

for key, value := range c.data {
// ttl eviction
if cacheItem.hasExpired() {
if time.Now().After(c.data[key].expiresAt) {
if c.onEvicted != nil {
c.onEvicted(key, value)
}
delete(c.data, key)
c.stat.Evicted++
}

// prepare list of keysWithTs for size eviction
if maxKeys > 0 && len(c.data) > maxKeys {
ts := cacheItem.expiresAt // for no-LRU sort by expiration time
if c.isLRU { // for LRU sort by read time
ts = cacheItem.lastReadAt
if maxKeys == -1 || (maxKeys > 0 && int64(len(c.data)) > maxKeys) {
ts := c.data[key].expiresAt // for no-LRU sort by expiration time
if c.isLRU { // for LRU sort by read time
ts = c.data[key].lastReadAt
}

kts = append(kts, struct {
Expand All @@ -189,47 +245,24 @@ func (c *loadingCacheImpl) purge(maxKeys int) {
}

// size eviction
size := len(c.data)
size := int64(len(c.data))
if maxKeys == -1 { // clean everything in case maxKeys is -1
maxKeys = 0
}
if len(kts) > 0 {
sort.Slice(kts, func(i int, j int) bool { return kts[i].ts.Before(kts[j].ts) })
for d := 0; d < size-maxKeys; d++ {
delete(c.data, kts[d].key)
c.stat.Evicted++
for d := 0; int64(d) < size-maxKeys; d++ {
key := kts[d].key
if c.onEvicted != nil {
c.onEvicted(key, c.data[key])
}
delete(c.data, key)
}
}
c.stat.Size = len(c.data)
}

type cacheItem struct {
sync.Mutex
fun func() (data interface{}, err error)
expiresAt time.Time
lastReadAt time.Time
ttl time.Duration
data interface{}
err error
}

func (ci *cacheItem) value(allowError bool) (data interface{}, called bool, err error) {
ci.Lock()
defer ci.Unlock()
called = false
now := time.Now()
if ci.expiresAt.IsZero() || now.After(ci.expiresAt) {
ci.data, ci.err = ci.fun()
ci.expiresAt = now.Add(ci.ttl)

if ci.err != nil && !allowError { // don't cache error calls
ci.expiresAt = now
}
called = true
}
ci.lastReadAt = time.Now()
return ci.data, called, ci.err
}

func (ci *cacheItem) hasExpired() (b bool) {
ci.Lock()
defer ci.Unlock()
return !ci.expiresAt.IsZero() && time.Now().After(ci.expiresAt)
}
Loading

0 comments on commit 1a4431f

Please sign in to comment.