Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed cache janitor logic #23

Merged
merged 3 commits into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 52 additions & 69 deletions cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"runtime"
"sync"
"time"
Expand All @@ -13,15 +14,6 @@ import (
"github.com/Code-Hex/go-generics-cache/policy/simple"
)

// janitor for collecting expired items and cleaning them
// this object is inspired from
// https://github.com/patrickmn/go-cache/blob/46f407853014144407b6c2ec7ccc76bf67958d93/cache.go
// many thanks to go-cache project
type janitor struct {
Interval time.Duration
stop chan bool
}

// Interface is a common-cache interface.
type Interface[K comparable, V any] interface {
Get(key K) (value V, ok bool)
Expand All @@ -45,7 +37,15 @@ var (
type Item[K comparable, V any] struct {
Key K
Value V
Expiration int64
Expiration time.Time
}

// Expired returns true if the item has expired.
func (item *Item[K, V]) Expired() bool {
if item.Expiration.IsZero() {
return false
}
return nowFunc().After(item.Expiration)
}

var nowFunc = time.Now
Expand All @@ -54,23 +54,14 @@ var nowFunc = time.Now
type ItemOption func(*itemOptions)

type itemOptions struct {
expiration int64 // default none
}

// Expired returns true if the item has expired.
func (item itemOptions) Expired() bool {
if item.expiration == 0 {
return false
}

return nowFunc().UnixNano() > item.expiration
expiration time.Time // default none
}

// WithExpiration is an option to set expiration time for any items.
// If the expiration is zero or negative value, it treats as w/o expiration.
func WithExpiration(exp time.Duration) ItemOption {
return func(o *itemOptions) {
o.expiration = nowFunc().Add(exp).UnixNano()
o.expiration = nowFunc().Add(exp)
}
}

Expand Down Expand Up @@ -100,12 +91,14 @@ type Cache[K comparable, V any] struct {
type Option[K comparable, V any] func(*options[K, V])

type options[K comparable, V any] struct {
cache Interface[K, *Item[K, V]]
cache Interface[K, *Item[K, V]]
janitorInterval time.Duration
}

func newOptions[K comparable, V any]() *options[K, V] {
return &options[K, V]{
cache: simple.NewCache[K, *Item[K, V]](),
cache: simple.NewCache[K, *Item[K, V]](),
janitorInterval: time.Minute,
}
}

Expand Down Expand Up @@ -144,56 +137,48 @@ func AsClock[K comparable, V any](opts ...clock.Option) Option[K, V] {
}
}

// WithJanitorInterval is an option to specify how often cache should delete expired items.
//
// Default is 1 minute.
func WithJanitorInterval[K comparable, V any](d time.Duration) Option[K, V] {
return func(o *options[K, V]) {
o.janitorInterval = d
}
}

// New creates a new thread safe Cache.
// This function will be stopped an internal janitor when the cache is
// no longer referenced anywhere.
//
// There are several Cache replacement policies available with you specified any options.
func New[K comparable, V any](opts ...Option[K, V]) *Cache[K, V] {
o := newOptions[K, V]()
for _, optFunc := range opts {
optFunc(o)
}

ctx, cancel := context.WithCancel(context.Background())
cache := &Cache[K, V]{
cache: o.cache,
cache: o.cache,
janitor: newJanitor(ctx, o.janitorInterval),
}

// @TODO change the ticker timer default value
cache.runJanitor(cache, time.Minute)
runtime.SetFinalizer(cache, cache.stopJanitor)

runtime.SetFinalizer(cache, func(self *Cache[K, V]) {
cancel()
})
return cache
}

func (_ *Cache[K, V]) stopJanitor(c *Cache[K, V]) {
if c.janitor != nil {
c.janitor.stop <- true
// NewContext creates a new thread safe Cache with context.
//
// There are several Cache replacement policies available with you specified any options.
func NewContext[K comparable, V any](ctx context.Context, opts ...Option[K, V]) *Cache[K, V] {
o := newOptions[K, V]()
for _, optFunc := range opts {
optFunc(o)
}

c.janitor = nil
}

func (_ *Cache[K, V]) runJanitor(c *Cache[K, V], ci time.Duration) {
c.stopJanitor(c)

j := &janitor{
Interval: ci,
stop: make(chan bool),
return &Cache[K, V]{
cache: o.cache,
janitor: newJanitor(ctx, o.janitorInterval),
}

c.janitor = j

go func() {
ticker := time.NewTicker(j.Interval)
for {
select {
case <-ticker.C:
c.DeleteExpired()
case <-j.stop:
ticker.Stop()
return
}
}
}()
}

// Get looks up a key's value from the cache.
Expand All @@ -206,9 +191,9 @@ func (c *Cache[K, V]) Get(key K) (value V, ok bool) {
return
}

// if is expired, delete is and return nil instead
if item.Expiration > 0 && nowFunc().UnixNano() > item.Expiration {
c.cache.Delete(key)
// Returns nil if the item has been expired.
// Do not delete here and leave it to an external process such as Janitor.
if item.Expired() {
return value, false
}

Expand All @@ -217,9 +202,12 @@ func (c *Cache[K, V]) Get(key K) (value V, ok bool) {

// DeleteExpired all expired items from the cache.
func (c *Cache[K, V]) DeleteExpired() {
for _, keys := range c.cache.Keys() {
// delete all expired items by using get method
_, _ = c.Get(keys)
for _, key := range c.cache.Keys() {
// if is expired, delete it and return nil instead
item, ok := c.cache.Get(key)
if ok && item.Expired() {
c.cache.Delete(key)
}
}
}

Expand All @@ -228,11 +216,6 @@ func (c *Cache[K, V]) Set(key K, val V, opts ...ItemOption) {
c.mu.Lock()
defer c.mu.Unlock()
item := newItem(key, val, opts...)
if item.Expiration <= 0 {
c.cache.Set(key, item)
return
}

c.cache.Set(key, item)
}

Expand Down
28 changes: 28 additions & 0 deletions cache_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cache

import (
"context"
"testing"
"time"
)

func TestDeletedCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

nc := NewContext[string, int](ctx)
key := "key"
nc.Set(key, 1, WithExpiration(-time.Second))

_, ok := nc.cache.Get(key)
if !ok {
t.Fatal("want true")
}

nc.DeleteExpired()

_, ok = nc.cache.Get(key)
if ok {
t.Fatal("want false")
}
}
18 changes: 0 additions & 18 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"math/rand"
"sync"
"testing"
"time"

cache "github.com/Code-Hex/go-generics-cache"
"github.com/Code-Hex/go-generics-cache/policy/clock"
Expand All @@ -14,23 +13,6 @@ import (
"github.com/Code-Hex/go-generics-cache/policy/mru"
)

func TestExpiration(t *testing.T) {
nc := cache.New[string, int]()
nc.Set("hello", 1, cache.WithExpiration(3*time.Second))

time.Sleep(time.Second * 1)
result, got := nc.Get("hello")
if !got || result != 1 {
t.Errorf("no, expiration must exists")
}

time.Sleep(time.Second * 3)
result, got = nc.Get("hello")
if got || result == 1 {
t.Errorf("no, expiration must not exists")
}
}

func TestMultiThreadIncr(t *testing.T) {
nc := cache.NewNumber[string, int]()
nc.Set("counter", 0)
Expand Down
18 changes: 18 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache_test

import (
"context"
"fmt"
"time"

Expand All @@ -20,6 +21,23 @@ func ExampleCache() {
// 0 false
}

func ExampleNewContext() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// use simple cache algorithm without options.
// an internal janitor will be stopped if specified the context is cancelled.
c := cache.NewContext(ctx, cache.WithJanitorInterval[string, int](3*time.Second))
c.Set("a", 1)
gota, aok := c.Get("a")
gotb, bok := c.Get("b")
fmt.Println(gota, aok)
fmt.Println(gotb, bok)
// Output:
// 1 true
// 0 false
}

func ExampleAsClock() {
// use clock cache algorithm.
c := cache.New(cache.AsClock[string, int]())
Expand Down
46 changes: 46 additions & 0 deletions janitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cache

import (
"context"
"sync"
"time"
)

// janitor for collecting expired items and cleaning them.
type janitor struct {
ctx context.Context
interval time.Duration
done chan struct{}
once sync.Once
}

func newJanitor(ctx context.Context, interval time.Duration) *janitor {
j := &janitor{
ctx: ctx,
interval: interval,
done: make(chan struct{}),
}
return j
}

func (j *janitor) stop() {
j.once.Do(func() { close(j.done) })
}

func (j *janitor) run(cleanup func()) {
go func() {
ticker := time.NewTicker(j.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cleanup()
case <-j.done:
cleanup() // last call
return
case <-j.ctx.Done():
j.stop()
}
}
}()
}
36 changes: 36 additions & 0 deletions janitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cache

import (
"context"
"sync/atomic"
"testing"
"time"
)

func TestJanitor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

janitor := newJanitor(ctx, time.Millisecond)

checkDone := make(chan struct{})
janitor.done = checkDone

calledClean := int64(0)
janitor.run(func() { atomic.AddInt64(&calledClean, 1) })

// waiting for cleanup
time.Sleep(10 * time.Millisecond)
cancel()

select {
case <-checkDone:
case <-time.After(time.Second):
t.Fatalf("failed to call done channel")
}

got := atomic.LoadInt64(&calledClean)
if got <= 1 {
t.Fatalf("failed to call clean callback in janitor: %d", got)
}
}