From 4375d49f82b69bac831b5734c5f1cbb318e6f3c4 Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Thu, 11 Jan 2024 08:59:20 +0100 Subject: [PATCH] add v2 with generics --- .github/workflows/ci.yml | 19 +- README.md | 15 +- redis_cache.go | 11 +- v2/cache.go | 84 ++++ v2/cache_test.go | 687 ++++++++++++++++++++++++++++++++ v2/eventbus/pubsub.go | 24 ++ v2/eventbus/pubsub_test.go | 13 + v2/eventbus/redis.go | 79 ++++ v2/eventbus/redis_test.go | 39 ++ v2/expirable_cache.go | 175 ++++++++ v2/expirable_cache_test.go | 171 ++++++++ v2/go.mod | 25 ++ v2/go.sum | 40 ++ v2/internal/cache/cache.go | 251 ++++++++++++ v2/internal/cache/cache_test.go | 209 ++++++++++ v2/internal/cache/options.go | 50 +++ v2/lru_cache.go | 169 ++++++++ v2/lru_cache_test.go | 304 ++++++++++++++ v2/options.go | 114 ++++++ v2/redis_cache.go | 184 +++++++++ v2/redis_cache_test.go | 201 ++++++++++ v2/scache.go | 145 +++++++ v2/scache_test.go | 303 ++++++++++++++ v2/url.go | 137 +++++++ v2/url_test.go | 140 +++++++ 25 files changed, 3576 insertions(+), 13 deletions(-) create mode 100644 v2/cache.go create mode 100644 v2/cache_test.go create mode 100644 v2/eventbus/pubsub.go create mode 100644 v2/eventbus/pubsub_test.go create mode 100644 v2/eventbus/redis.go create mode 100644 v2/eventbus/redis_test.go create mode 100644 v2/expirable_cache.go create mode 100644 v2/expirable_cache_test.go create mode 100644 v2/go.mod create mode 100644 v2/go.sum create mode 100644 v2/internal/cache/cache.go create mode 100644 v2/internal/cache/cache_test.go create mode 100644 v2/internal/cache/options.go create mode 100644 v2/lru_cache.go create mode 100644 v2/lru_cache_test.go create mode 100644 v2/options.go create mode 100644 v2/redis_cache.go create mode 100644 v2/redis_cache_test.go create mode 100644 v2/scache.go create mode 100644 v2/scache_test.go create mode 100644 v2/url.go create mode 100644 v2/url_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1ce783a..90bd769 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,11 +33,28 @@ jobs: TZ: "America/Chicago" ENABLE_REDIS_TESTS: "true" + - name: build and test for v2 + run: | + go get -v + go test -timeout=60s -race -covermode=atomic -coverprofile=$GITHUB_WORKSPACE/profile.cov_tmp + # combine the coverage files + cat $GITHUB_WORKSPACE/profile.cov_tmp | grep -v "_mock.go" | grep -v "mode:" >> $GITHUB_WORKSPACE/profile.cov + go build -race + env: + TZ: "America/Chicago" + ENABLE_REDIS_TESTS: "true" + working-directory: v2 - name: golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v4 + with: + version: latest + + - name: golangci-lint for v2 + uses: golangci/golangci-lint-action@v4 with: version: latest + working-directory: v2 - name: submit coverage run: | diff --git a/README.md b/README.md index 6a553a1..7af5bca 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Loading Cache Wrapper [![Build Status](https://github.com/go-pkgz/lcw/workflows/build/badge.svg)](https://github.com/go-pkgz/lcw/actions) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/lcw/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/lcw?branch=master) [![godoc](https://godoc.org/github.com/go-pkgz/lcw?status.svg)](https://godoc.org/github.com/go-pkgz/lcw) +# Loading Cache Wrapper [![Build Status](https://github.com/go-pkgz/lcw/workflows/build/badge.svg)](https://github.com/go-pkgz/lcw/actions) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/lcw/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/lcw?branch=master) [![godoc](https://godoc.org/github.com/go-pkgz/lcw?status.svg)](https://godoc.org/github.com/go-pkgz/lcw/v2) The library adds a thin layer on top of [lru cache](https://github.com/hashicorp/golang-lru) and internal implementation of expirable cache. @@ -25,7 +25,7 @@ Main features: ## Install and update -`go get -u github.com/go-pkgz/lcw` +`go get -u github.com/go-pkgz/lcw/v2` ## Usage @@ -33,18 +33,19 @@ Main features: package main import ( - "github.com/go-pkgz/lcw" + "github.com/go-pkgz/lcw/v2" ) func main() { - cache, err := lcw.NewLruCache(lcw.MaxKeys(500), lcw.MaxCacheSize(65536), lcw.MaxValSize(200), lcw.MaxKeySize(32)) + o := lcw.NewOpts[int]() + cache, err := lcw.NewLruCache(o.MaxKeys(500), o.MaxCacheSize(65536), o.MaxValSize(200), o.MaxKeySize(32)) if err != nil { panic("failed to create cache") } defer cache.Close() - val, err := cache.Get("key123", func() (interface{}, error) { - res, err := getDataFromSomeSource(params) // returns string + val, err := cache.Get("key123", func() (int, error) { + res, err := getDataFromSomeSource(params) // returns int return res, err }) @@ -52,7 +53,7 @@ func main() { panic("failed to get data") } - s := val.(string) // cached value + s := val // cached value } ``` diff --git a/redis_cache.go b/redis_cache.go index dd68c30..5c693b7 100644 --- a/redis_cache.go +++ b/redis_cache.go @@ -2,6 +2,7 @@ package lcw import ( "context" + "errors" "fmt" "sync/atomic" "time" @@ -44,18 +45,18 @@ func NewRedisCache(backend *redis.Client, opts ...Option) (*RedisCache, error) { // Get gets value by key or load with fn if not found in cache func (c *RedisCache) Get(key string, fn func() (interface{}, error)) (data interface{}, err error) { v, getErr := c.backend.Get(context.Background(), key).Result() - switch getErr { + switch { // RedisClient returns nil when find a key in DB - case nil: + case getErr == nil: atomic.AddInt64(&c.Hits, 1) return v, nil - // RedisClient returns redis.Nil when doesn't find a key in DB - case redis.Nil: + // RedisClient returns redis.Nil when doesn't find a key in DB + case errors.Is(getErr, redis.Nil): if data, err = fn(); err != nil { atomic.AddInt64(&c.Errors, 1) return data, err } - // RedisClient returns !nil when something goes wrong while get data + // RedisClient returns !nil when something goes wrong while get data default: atomic.AddInt64(&c.Errors, 1) return v, getErr diff --git a/v2/cache.go b/v2/cache.go new file mode 100644 index 0000000..58e259b --- /dev/null +++ b/v2/cache.go @@ -0,0 +1,84 @@ +// Package lcw adds a thin layer on top of lru and expirable cache providing more limits and common interface. +// The primary method to get (and set) data to/from the cache is LoadingCache.Get returning stored data for a given key or +// call provided func to retrieve and store, similar to Guava loading cache. +// Limits allow max values for key size, number of keys, value size and total size of values in the cache. +// CacheStat gives general stats on cache performance. +// 3 flavors of cache provided - NoP (do-nothing cache), ExpirableCache (TTL based), and LruCache +package lcw + +import ( + "fmt" +) + +// Sizer allows to perform size-based restrictions, optional. +// If not defined both maxValueSize and maxCacheSize checks will be ignored +type Sizer interface { + Size() int +} + +// LoadingCache defines guava-like cache with Get method returning cached value ao retrieving it if not in cache +type LoadingCache[V any] interface { + Get(key string, fn func() (V, error)) (val V, err error) // load or get from cache + Peek(key string) (V, bool) // get from cache by key + Invalidate(fn func(key string) bool) // invalidate items for func(key) == true + Delete(key string) // delete by key + Purge() // clear cache + Stat() CacheStat // cache stats + Keys() []string // list of all keys + Close() error // close open connections +} + +// CacheStat represent stats values +type CacheStat struct { + Hits int64 + Misses int64 + Keys int + Size int64 + Errors int64 +} + +// String formats cache stats +func (s CacheStat) String() string { + ratio := 0.0 + if s.Hits+s.Misses > 0 { + ratio = float64(s.Hits) / float64(s.Hits+s.Misses) + } + return fmt.Sprintf("{hits:%d, misses:%d, ratio:%.2f, keys:%d, size:%d, errors:%d}", + s.Hits, s.Misses, ratio, s.Keys, s.Size, s.Errors) +} + +// Nop is do-nothing implementation of LoadingCache +type Nop[V any] struct{} + +// NewNopCache makes new do-nothing cache +func NewNopCache[V any]() *Nop[V] { + return &Nop[V]{} +} + +// Get calls fn without any caching +func (n *Nop[V]) Get(_ string, fn func() (V, error)) (V, error) { return fn() } + +// Peek does nothing and always returns false +func (n *Nop[V]) Peek(string) (V, bool) { var emptyValue V; return emptyValue, false } + +// Invalidate does nothing for nop cache +func (n *Nop[V]) Invalidate(func(key string) bool) {} + +// Purge does nothing for nop cache +func (n *Nop[V]) Purge() {} + +// Delete does nothing for nop cache +func (n *Nop[V]) Delete(string) {} + +// Keys does nothing for nop cache +func (n *Nop[V]) Keys() []string { return nil } + +// Stat always 0s for nop cache +func (n *Nop[V]) Stat() CacheStat { + return CacheStat{} +} + +// Close does nothing for nop cache +func (n *Nop[V]) Close() error { + return nil +} diff --git a/v2/cache_test.go b/v2/cache_test.go new file mode 100644 index 0000000..77df628 --- /dev/null +++ b/v2/cache_test.go @@ -0,0 +1,687 @@ +package lcw + +import ( + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNop_Get(t *testing.T) { + var coldCalls int32 + var c LoadingCache[string] = NewNopCache[string]() + res, err := c.Get("key1", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result", res) + assert.Equal(t, int32(1), atomic.LoadInt32(&coldCalls)) + + res, err = c.Get("key1", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result2", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result2", res) + assert.Equal(t, int32(2), atomic.LoadInt32(&coldCalls)) + + assert.Equal(t, CacheStat{}, c.Stat()) +} + +func TestNop_Peek(t *testing.T) { + var coldCalls int32 + c := NewNopCache[string]() + res, err := c.Get("key1", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result", res) + assert.Equal(t, int32(1), atomic.LoadInt32(&coldCalls)) + + _, ok := c.Peek("key1") + assert.False(t, ok) +} + +func TestStat_String(t *testing.T) { + s := CacheStat{Keys: 100, Hits: 60, Misses: 10, Size: 12345, Errors: 5} + assert.Equal(t, "{hits:60, misses:10, ratio:0.86, keys:100, size:12345, errors:5}", s.String()) +} + +func TestCache_Get(t *testing.T) { + caches, teardown := cachesTestList[string](t) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + var coldCalls int32 + res, err := c.Get("key", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result", res) + assert.Equal(t, int32(1), atomic.LoadInt32(&coldCalls)) + + res, err = c.Get("key", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result2", nil + }) + + assert.NoError(t, err) + assert.Equal(t, "result", res) + assert.Equal(t, int32(1), atomic.LoadInt32(&coldCalls), "cache hit") + + _, err = c.Get("key-2", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result2", fmt.Errorf("some error") + }) + assert.Error(t, err) + assert.Equal(t, int32(2), atomic.LoadInt32(&coldCalls), "cache hit") + + _, err = c.Get("key-2", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result2", fmt.Errorf("some error") + }) + assert.Error(t, err) + assert.Equal(t, int32(3), atomic.LoadInt32(&coldCalls), "cache hit") + }) + } +} + +func TestCache_MaxValueSize(t *testing.T) { + o := NewOpts[sizedString]() + caches, teardown := cachesTestList(t, o.MaxKeys(5), o.MaxValSize(10), o.StrToV(func(s string) sizedString { return sizedString(s) })) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + // put good size value to cache and make sure it cached + res, err := c.Get("key-Z", func() (sizedString, error) { + return "result-Z", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("result-Z"), res) + + res, err = c.Get("key-Z", func() (sizedString, error) { + return "result-Zzzz", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("result-Z"), res, "got cached value") + + // put too big value to cache and make sure it is not cached + res, err = c.Get("key-Big", func() (sizedString, error) { + return "1234567890", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("1234567890"), res) + + res, err = c.Get("key-Big", func() (sizedString, error) { + return "result-big", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("result-big"), res, "got not cached value") + + // put too big value to cache + res, err = c.Get("key-Big2", func() (sizedString, error) { + return "1234567890", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("1234567890"), res) + + res, err = c.Get("key-Big2", func() (sizedString, error) { + return "xyz", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("xyz"), res, "too long, but not Sizer. from cache") + }) + } +} + +func TestCache_MaxCacheSize(t *testing.T) { + o := NewOpts[sizedString]() + caches, teardown := cachesTestList(t, o.MaxKeys(50), o.MaxCacheSize(20), o.StrToV(func(s string) sizedString { return sizedString(s) })) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + // put good size value to cache and make sure it cached + res, err := c.Get("key-Z", func() (sizedString, error) { + return "result-Z", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("result-Z"), res) + res, err = c.Get("key-Z", func() (sizedString, error) { + return "result-Zzzz", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("result-Z"), res, "got cached value") + if _, ok := c.(*RedisCache[sizedString]); !ok { + assert.Equal(t, int64(8), c.size()) + } + _, err = c.Get("key-Z2", func() (sizedString, error) { + return "result-Y", nil + }) + assert.NoError(t, err) + if _, ok := c.(*RedisCache[sizedString]); !ok { + assert.Equal(t, int64(16), c.size()) + } + + // this will cause removal + _, err = c.Get("key-Z3", func() (sizedString, error) { + return "result-Z", nil + }) + assert.NoError(t, err) + if _, ok := c.(*RedisCache[sizedString]); !ok { + assert.Equal(t, int64(16), c.size()) + // Due RedisCache[sizedString] does not support MaxCacheSize this assert should be skipped + assert.Equal(t, 2, c.keys()) + } + }) + } +} + +func TestCache_MaxCacheSizeParallel(t *testing.T) { + o := NewOpts[sizedString]() + caches, teardown := cachesTestList(t, o.MaxCacheSize(123), o.MaxKeys(10000), o.StrToV(func(s string) sizedString { return sizedString(s) })) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + wg := sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + i := i + go func() { + //nolint:gosec // not used for security purpose + time.Sleep(time.Duration(rand.Intn(100)) * time.Nanosecond) + defer wg.Done() + res, err := c.Get(fmt.Sprintf("key-%d", i), func() (sizedString, error) { + return sizedString(fmt.Sprintf("result-%d", i)), nil + }) + require.NoError(t, err) + require.Equal(t, sizedString(fmt.Sprintf("result-%d", i)), res) + }() + } + wg.Wait() + assert.True(t, c.size() < 123 && c.size() >= 0) + t.Log("size", c.size()) + }) + } + +} + +func TestCache_MaxKeySize(t *testing.T) { + o := NewOpts[sizedString]() + caches, teardown := cachesTestList(t, o.MaxKeySize(5), o.StrToV(func(s string) sizedString { return sizedString(s) })) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + res, err := c.Get("key", func() (sizedString, error) { + return "value", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("value"), res) + + res, err = c.Get("key", func() (sizedString, error) { + return "valueXXX", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("value"), res, "cached") + + res, err = c.Get("key1234", func() (sizedString, error) { + return "value", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("value"), res) + + res, err = c.Get("key1234", func() (sizedString, error) { + return "valueXYZ", nil + }) + assert.NoError(t, err) + assert.Equal(t, sizedString("valueXYZ"), res, "not cached") + }) + } +} + +func TestCache_Peek(t *testing.T) { + caches, teardown := cachesTestList[string](t) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + var coldCalls int32 + res, err := c.Get("key", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result", res) + assert.Equal(t, int32(1), atomic.LoadInt32(&coldCalls)) + + r, ok := c.Peek("key") + assert.True(t, ok) + assert.Equal(t, "result", r) + }) + } +} + +func TestLruCache_ParallelHits(t *testing.T) { + caches, teardown := cachesTestList[string](t) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + var coldCalls int32 + + res, err := c.Get("key", func() (string, error) { + return "value", nil + }) + assert.NoError(t, err) + assert.Equal(t, "value", res) + + wg := sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + res, err := c.Get("key", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + require.NoError(t, err) + require.Equal(t, "value", res) + }() + } + wg.Wait() + assert.Equal(t, int32(0), atomic.LoadInt32(&coldCalls)) + }) + } +} + +func TestCache_Purge(t *testing.T) { + caches, teardown := cachesTestList[string](t) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + var coldCalls int32 + // fill cache + for i := 0; i < 1000; i++ { + i := i + _, err := c.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + require.NoError(t, err) + } + assert.Equal(t, int32(1000), atomic.LoadInt32(&coldCalls)) + assert.Equal(t, 1000, c.keys()) + + c.Purge() + assert.Equal(t, 0, c.keys(), "all keys removed") + }) + } +} + +func TestCache_Invalidate(t *testing.T) { + caches, teardown := cachesTestList[string](t) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + var coldCalls int32 + + // fill cache + for i := 0; i < 1000; i++ { + i := i + _, err := c.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + require.NoError(t, err) + } + assert.Equal(t, int32(1000), atomic.LoadInt32(&coldCalls)) + assert.Equal(t, 1000, c.keys()) + + c.Invalidate(func(key string) bool { + return strings.HasSuffix(key, "0") + }) + + assert.Equal(t, 900, c.keys(), "100 keys removed") + res, err := c.Get("key-1", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result-xxx", nil + }) + require.NoError(t, err) + assert.Equal(t, "result-1", res, "from the cache") + + res, err = c.Get("key-10", func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return "result-xxx", nil + }) + require.NoError(t, err) + assert.Equal(t, "result-xxx", res, "not from the cache") + }) + } +} + +func TestCache_Delete(t *testing.T) { + o := NewOpts[sizedString]() + caches, teardown := cachesTestList[sizedString](t, o.StrToV(func(s string) sizedString { return sizedString(s) })) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + // fill cache + for i := 0; i < 1000; i++ { + i := i + _, err := c.Get(fmt.Sprintf("key-%d", i), func() (sizedString, error) { + return sizedString(fmt.Sprintf("result-%d", i)), nil + }) + require.NoError(t, err) + } + assert.Equal(t, 1000, c.Stat().Keys) + if _, ok := c.(*RedisCache[sizedString]); !ok { + assert.Equal(t, int64(9890), c.Stat().Size) + } + c.Delete("key-2") + assert.Equal(t, 999, c.Stat().Keys) + if _, ok := c.(*RedisCache[sizedString]); !ok { + assert.Equal(t, int64(9890-8), c.Stat().Size) + } + }) + } +} + +func TestCache_DeleteWithEvent(t *testing.T) { + var evKey string + var evVal sizedString + var evCount int + onEvict := func(key string, value sizedString) { + evKey = key + evVal = value + evCount++ + } + + o := NewOpts[sizedString]() + caches, teardown := cachesTestList(t, o.OnEvicted(onEvict), o.StrToV(func(s string) sizedString { return sizedString(s) })) + defer teardown() + + for _, c := range caches { + c := c + + evKey, evVal, evCount = "", "", 0 + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + if _, ok := c.(*RedisCache[sizedString]); ok { + t.Skip("RedisCache[sizedString] doesn't support delete events") + } + // fill cache + for i := 0; i < 1000; i++ { + i := i + _, err := c.Get(fmt.Sprintf("key-%d", i), func() (sizedString, error) { + return sizedString(fmt.Sprintf("result-%d", i)), nil + }) + require.NoError(t, err) + } + assert.Equal(t, 1000, c.Stat().Keys) + assert.Equal(t, int64(9890), c.Stat().Size) + + c.Delete("key-2") + assert.Equal(t, 999, c.Stat().Keys) + assert.Equal(t, "key-2", evKey) + assert.Equal(t, sizedString("result-2"), evVal) + assert.Equal(t, 1, evCount) + }) + } +} + +func TestCache_Stats(t *testing.T) { + o := NewOpts[sizedString]() + caches, teardown := cachesTestList[sizedString](t, o.StrToV(func(s string) sizedString { return sizedString(s) })) + defer teardown() + + for _, c := range caches { + c := c + t.Run(strings.Replace(fmt.Sprintf("%T", c), "*lcw.", "", 1), func(t *testing.T) { + // fill cache + for i := 0; i < 100; i++ { + i := i + _, err := c.Get(fmt.Sprintf("key-%d", i), func() (sizedString, error) { + return sizedString(fmt.Sprintf("result-%d", i)), nil + }) + require.NoError(t, err) + } + stats := c.Stat() + switch c.(type) { + case *RedisCache[sizedString]: + assert.Equal(t, CacheStat{Hits: 0, Misses: 100, Keys: 100, Size: 0}, stats) + default: + assert.Equal(t, CacheStat{Hits: 0, Misses: 100, Keys: 100, Size: 890}, stats) + } + + _, err := c.Get("key-1", func() (sizedString, error) { + return "xyz", nil + }) + require.NoError(t, err) + switch c.(type) { + case *RedisCache[sizedString]: + assert.Equal(t, CacheStat{Hits: 1, Misses: 100, Keys: 100, Size: 0}, c.Stat()) + default: + assert.Equal(t, CacheStat{Hits: 1, Misses: 100, Keys: 100, Size: 890}, c.Stat()) + } + + _, err = c.Get("key-1123", func() (sizedString, error) { + return sizedString("xyz"), nil + }) + require.NoError(t, err) + switch c.(type) { + case *RedisCache[sizedString]: + assert.Equal(t, CacheStat{Hits: 1, Misses: 101, Keys: 101, Size: 0}, c.Stat()) + default: + assert.Equal(t, CacheStat{Hits: 1, Misses: 101, Keys: 101, Size: 893}, c.Stat()) + } + + _, err = c.Get("key-9999", func() (sizedString, error) { + return "", fmt.Errorf("err") + }) + require.Error(t, err) + switch c.(type) { + case *RedisCache[sizedString]: + assert.Equal(t, CacheStat{Hits: 1, Misses: 101, Keys: 101, Size: 0, Errors: 1}, c.Stat()) + default: + assert.Equal(t, CacheStat{Hits: 1, Misses: 101, Keys: 101, Size: 893, Errors: 1}, c.Stat()) + } + }) + } +} + +// ExampleLoadingCache_Get illustrates creation of a cache and loading value from it +func ExampleLoadingCache_Get() { + o := NewOpts[string]() + c, err := NewExpirableCache(o.MaxKeys(10), o.TTL(time.Minute*30)) // make expirable cache (30m o.TTL) with up to 10 keys + if err != nil { + panic("can' make cache") + } + defer c.Close() + + // try to get from cache and because mykey is not in will put it + _, _ = c.Get("mykey", func() (string, error) { + fmt.Println("cache miss 1") + return "myval-1", nil + }) + + // get from cache, func won't run because mykey in + v, err := c.Get("mykey", func() (string, error) { + fmt.Println("cache miss 2") + return "myval-2", nil + }) + + if err != nil { + panic("can't get from cache") + } + fmt.Printf("got %s from cache, stats: %s", v, c.Stat()) + // Output: cache miss 1 + // got myval-1 from cache, stats: {hits:1, misses:1, ratio:0.50, keys:1, size:0, errors:0} +} + +// ExampleLoadingCache_Delete illustrates cache value eviction and OnEvicted function usage. +func ExampleLoadingCache_Delete() { + // make expirable cache (30m TTL) with up to 10 keys. Set callback on eviction event + o := NewOpts[string]() + c, err := NewExpirableCache(o.MaxKeys(10), o.TTL(time.Minute*30), o.OnEvicted(func(key string, _ string) { + fmt.Println("key " + key + " evicted") + })) + if err != nil { + panic("can' make cache") + } + defer c.Close() + + // try to get from cache and because mykey is not in will put it + _, _ = c.Get("mykey", func() (string, error) { + return "myval-1", nil + }) + + c.Delete("mykey") + fmt.Println("stats: " + c.Stat().String()) + // Output: key mykey evicted + // stats: {hits:0, misses:1, ratio:0.00, keys:0, size:0, errors:0} +} + +// nolint:govet //false positive due to example name +// ExampleLoadingCacheMutability illustrates changing mutable stored item outside of cache, works only for non-Redis cache. +func Example_loadingCacheMutability() { + o := NewOpts[[]string]() + c, err := NewExpirableCache(o.MaxKeys(10), o.TTL(time.Minute*30)) // make expirable cache (30m o.TTL) with up to 10 keys + if err != nil { + panic("can' make cache") + } + defer c.Close() + + mutableSlice := []string{"key1", "key2"} + + // put mutableSlice in "mutableSlice" cache key + _, _ = c.Get("mutableSlice", func() ([]string, error) { + return mutableSlice, nil + }) + + // get from cache, func won't run because mutableSlice is cached + // value is original now + v, _ := c.Get("mutableSlice", func() ([]string, error) { + return nil, nil + }) + fmt.Printf("got %v slice from cache\n", v) + + mutableSlice[0] = "another_key_1" + mutableSlice[1] = "another_key_2" + + // get from cache, func won't run because mutableSlice is cached + // value is changed inside the cache now because mutableSlice stored as-is, in mutable state + v, _ = c.Get("mutableSlice", func() ([]string, error) { + return nil, nil + }) + fmt.Printf("got %v slice from cache after it's change outside of cache\n", v) + + // Output: + // got [key1 key2] slice from cache + // got [another_key_1 another_key_2] slice from cache after it's change outside of cache +} + +type counts interface { + size() int64 // cache size in bytes + keys() int // number of keys in cache +} + +type countedCache[V any] interface { + LoadingCache[V] + counts +} + +func cachesTestList[V any](t *testing.T, opts ...Option[V]) (c []countedCache[V], teardown func()) { + var caches []countedCache[V] + ec, err := NewExpirableCache(opts...) + require.NoError(t, err, "can't make exp cache") + caches = append(caches, ec) + lc, err := NewLruCache(opts...) + require.NoError(t, err, "can't make lru cache") + caches = append(caches, lc) + + server := newTestRedisServer() + client := redis.NewClient(&redis.Options{ + Addr: server.Addr()}) + rc, err := NewRedisCache(client, opts...) + require.NoError(t, err, "can't make redis cache") + caches = append(caches, rc) + + return caches, func() { + _ = client.Close() + _ = ec.Close() + _ = lc.Close() + _ = rc.Close() + server.Close() + } +} + +type sizedString string + +func (s sizedString) Size() int { return len(s) } + +func (s sizedString) MarshalBinary() (data []byte, err error) { + return []byte(s), nil +} + +type mockPubSub struct { + calledKeys []string + fns []func(fromID, key string) + sync.Mutex + sync.WaitGroup +} + +func (m *mockPubSub) CalledKeys() []string { + m.Lock() + defer m.Unlock() + return m.calledKeys +} + +func (m *mockPubSub) Subscribe(fn func(fromID, key string)) error { + m.Lock() + defer m.Unlock() + m.fns = append(m.fns, fn) + return nil +} + +func (m *mockPubSub) Publish(fromID, key string) error { + m.Lock() + defer m.Unlock() + m.calledKeys = append(m.calledKeys, key) + for _, fn := range m.fns { + fn := fn + m.Add(1) + // run in goroutine to prevent deadlock + go func() { + fn(fromID, key) + m.Done() + }() + } + return nil +} diff --git a/v2/eventbus/pubsub.go b/v2/eventbus/pubsub.go new file mode 100644 index 0000000..c0afcfb --- /dev/null +++ b/v2/eventbus/pubsub.go @@ -0,0 +1,24 @@ +// Package eventbus provides PubSub interface used for distributed cache invalidation, +// as well as NopPubSub and RedisPubSub implementations. +package eventbus + +// PubSub interface is used for distributed cache invalidation. +// Publish is called on each entry invalidation, +// Subscribe is used for subscription for these events. +type PubSub interface { + Publish(fromID, key string) error + Subscribe(fn func(fromID, key string)) error +} + +// NopPubSub implements default do-nothing pub-sub (event bus) +type NopPubSub struct{} + +// Subscribe does nothing for NopPubSub +func (n *NopPubSub) Subscribe(func(fromID string, key string)) error { + return nil +} + +// Publish does nothing for NopPubSub +func (n *NopPubSub) Publish(string, string) error { + return nil +} diff --git a/v2/eventbus/pubsub_test.go b/v2/eventbus/pubsub_test.go new file mode 100644 index 0000000..a154733 --- /dev/null +++ b/v2/eventbus/pubsub_test.go @@ -0,0 +1,13 @@ +package eventbus + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNopPubSub(t *testing.T) { + nopPubSub := NopPubSub{} + assert.NoError(t, nopPubSub.Subscribe(nil)) + assert.NoError(t, nopPubSub.Publish("", "")) +} diff --git a/v2/eventbus/redis.go b/v2/eventbus/redis.go new file mode 100644 index 0000000..5762abb --- /dev/null +++ b/v2/eventbus/redis.go @@ -0,0 +1,79 @@ +package eventbus + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/redis/go-redis/v9" + + "github.com/hashicorp/go-multierror" +) + +// NewRedisPubSub creates new RedisPubSub with given parameters. +// Returns an error in case of problems with creating PubSub client for specified channel. +func NewRedisPubSub(addr, channel string) (*RedisPubSub, error) { + client := redis.NewClient(&redis.Options{Addr: addr}) + pubSub := client.Subscribe(context.Background(), channel) + // wait for subscription to be created and ignore the message + if _, err := pubSub.Receive(context.Background()); err != nil { + _ = client.Close() + return nil, fmt.Errorf("problem subscribing to channel %s on address %s: %w", channel, addr, err) + } + return &RedisPubSub{client: client, pubSub: pubSub, channel: channel, done: make(chan struct{})}, nil +} + +// RedisPubSub provides Redis implementation for PubSub interface +type RedisPubSub struct { + client *redis.Client + pubSub *redis.PubSub + channel string + + done chan struct{} +} + +// Subscribe calls provided function on subscription channel provided on new RedisPubSub instance creation. +// Should not be called more than once. Spawns a goroutine and does not return an error. +func (m *RedisPubSub) Subscribe(fn func(fromID, key string)) error { + go func(done <-chan struct{}, pubsub *redis.PubSub) { + for { + select { + case <-done: + return + default: + } + msg, err := pubsub.ReceiveTimeout(context.Background(), time.Second*10) + if err != nil { + continue + } + + // Process the message + if msg, ok := msg.(*redis.Message); ok { + payload := strings.Split(msg.Payload, "$") + fn(payload[0], strings.Join(payload[1:], "$")) + } + } + }(m.done, m.pubSub) + + return nil +} + +// Publish publishes provided message to channel provided on new RedisPubSub instance creation +func (m *RedisPubSub) Publish(fromID, key string) error { + return m.client.Publish(context.Background(), m.channel, fromID+"$"+key).Err() +} + +// Close cleans up running goroutines and closes Redis clients +func (m *RedisPubSub) Close() error { + close(m.done) + + errs := new(multierror.Error) + if err := m.pubSub.Close(); err != nil { + errs = multierror.Append(errs, fmt.Errorf("problem closing pubSub client: %w", err)) + } + if err := m.client.Close(); err != nil { + errs = multierror.Append(errs, fmt.Errorf("problem closing redis client: %w", err)) + } + return errs.ErrorOrNil() +} diff --git a/v2/eventbus/redis_test.go b/v2/eventbus/redis_test.go new file mode 100644 index 0000000..231f9ba --- /dev/null +++ b/v2/eventbus/redis_test.go @@ -0,0 +1,39 @@ +package eventbus + +import ( + "math/rand" + "os" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewRedisPubSub_Error(t *testing.T) { + redisPubSub, err := NewRedisPubSub("127.0.0.1:99999", "test") + require.Error(t, err) + require.Nil(t, redisPubSub) +} + +func TestRedisPubSub(t *testing.T) { + if _, ok := os.LookupEnv("ENABLE_REDIS_TESTS"); !ok { + t.Skip("ENABLE_REDIS_TESTS env variable is not set, not expecting Redis to be ready at 127.0.0.1:6379") + } + + //nolint:gosec // not used for security purpose + channel := "lcw-test-" + strconv.Itoa(rand.Intn(1000000)) + redisPubSub, err := NewRedisPubSub("127.0.0.1:6379", channel) + require.NoError(t, err) + require.NotNil(t, redisPubSub) + var called []string + assert.Nil(t, redisPubSub.Subscribe(func(fromID, key string) { + called = append(called, fromID, key) + })) + assert.NoError(t, redisPubSub.Publish("test_fromID", "$test$key$")) + // Sleep which waits for Subscribe goroutine to pick up published changes + time.Sleep(time.Second) + assert.NoError(t, redisPubSub.Close()) + assert.Equal(t, []string{"test_fromID", "$test$key$"}, called) +} diff --git a/v2/expirable_cache.go b/v2/expirable_cache.go new file mode 100644 index 0000000..540fc75 --- /dev/null +++ b/v2/expirable_cache.go @@ -0,0 +1,175 @@ +package lcw + +import ( + "fmt" + "sync/atomic" + "time" + + "github.com/google/uuid" +) + +import ( + "github.com/go-pkgz/lcw/v2/eventbus" + "github.com/go-pkgz/lcw/v2/internal/cache" +) + +// ExpirableCache implements LoadingCache with TTL. +type ExpirableCache[V any] struct { + Workers[V] + CacheStat + currentSize int64 + id string + backend *cache.LoadingCache[V] +} + +// NewExpirableCache makes expirable LoadingCache implementation, 1000 max keys by default and 5m TTL +func NewExpirableCache[V any](opts ...Option[V]) (*ExpirableCache[V], error) { + res := ExpirableCache[V]{ + Workers: Workers[V]{ + maxKeys: 1000, + maxValueSize: 0, + ttl: 5 * time.Minute, + eventBus: &eventbus.NopPubSub{}, + }, + id: uuid.New().String(), + } + + for _, opt := range opts { + if err := opt(&res.Workers); err != nil { + return nil, fmt.Errorf("failed to set cache option: %w", err) + } + } + + if err := res.eventBus.Subscribe(res.onBusEvent); err != nil { + return nil, fmt.Errorf("can't subscribe to event bus: %w", err) + } + + o := cache.NewOpts[V]() + backend, err := cache.NewLoadingCache( + o.MaxKeys(res.maxKeys), + o.TTL(res.ttl), + o.PurgeEvery(res.ttl/2), + o.OnEvicted(func(key string, value V) { + if res.onEvicted != nil { + res.onEvicted(key, value) + } + if s, ok := any(value).(Sizer); ok { + size := s.Size() + atomic.AddInt64(&res.currentSize, -1*int64(size)) + } + // ignore the error on Publish as we don't have log inside the module and + // there is no other way to handle it: we publish the cache invalidation + // and hope for the best + _ = res.eventBus.Publish(res.id, key) + }), + ) + if err != nil { + return nil, fmt.Errorf("error creating backend: %w", err) + } + res.backend = backend + + return &res, nil +} + +// Get gets value by key or load with fn if not found in cache +func (c *ExpirableCache[V]) Get(key string, fn func() (V, error)) (data V, err error) { + if v, ok := c.backend.Get(key); ok { + atomic.AddInt64(&c.Hits, 1) + return v, nil + } + + if data, err = fn(); err != nil { + atomic.AddInt64(&c.Errors, 1) + return data, err + } + atomic.AddInt64(&c.Misses, 1) + + if !c.allowed(key, data) { + return data, nil + } + + if s, ok := any(data).(Sizer); ok { + if c.maxCacheSize > 0 && atomic.LoadInt64(&c.currentSize)+int64(s.Size()) >= c.maxCacheSize { + c.backend.DeleteExpired() + return data, nil + } + atomic.AddInt64(&c.currentSize, int64(s.Size())) + } + + c.backend.Set(key, data) + + return data, nil +} + +// Invalidate removes keys with passed predicate fn, i.e. fn(key) should be true to get evicted +func (c *ExpirableCache[V]) Invalidate(fn func(key string) bool) { + c.backend.InvalidateFn(fn) +} + +// Peek returns the key value (or undefined if not found) without updating the "recently used"-ness of the key. +func (c *ExpirableCache[V]) Peek(key string) (V, bool) { + return c.backend.Peek(key) +} + +// Purge clears the cache completely. +func (c *ExpirableCache[V]) Purge() { + c.backend.Purge() + atomic.StoreInt64(&c.currentSize, 0) +} + +// Delete cache item by key +func (c *ExpirableCache[V]) Delete(key string) { + c.backend.Invalidate(key) +} + +// Keys returns cache keys +func (c *ExpirableCache[V]) Keys() (res []string) { + return c.backend.Keys() +} + +// Stat returns cache statistics +func (c *ExpirableCache[V]) Stat() CacheStat { + return CacheStat{ + Hits: c.Hits, + Misses: c.Misses, + Size: c.size(), + Keys: c.keys(), + Errors: c.Errors, + } +} + +// Close kills cleanup goroutine +func (c *ExpirableCache[V]) Close() error { + c.backend.Close() + return nil +} + +// onBusEvent reacts on invalidation message triggered by event bus from another cache instance +func (c *ExpirableCache[V]) onBusEvent(id, key string) { + if id != c.id { + c.backend.Invalidate(key) + } +} + +func (c *ExpirableCache[V]) size() int64 { + return atomic.LoadInt64(&c.currentSize) +} + +func (c *ExpirableCache[V]) keys() int { + return c.backend.ItemCount() +} + +func (c *ExpirableCache[V]) allowed(key string, data V) bool { + if c.backend.ItemCount() >= c.maxKeys { + return false + } + if c.maxKeySize > 0 && len(key) > c.maxKeySize { + return false + } + if s, ok := any(data).(Sizer); ok { + if c.maxValueSize > 0 && s.Size() >= c.maxValueSize { + return false + } + } + return true +} diff --git a/v2/expirable_cache_test.go b/v2/expirable_cache_test.go new file mode 100644 index 0000000..061d1d0 --- /dev/null +++ b/v2/expirable_cache_test.go @@ -0,0 +1,171 @@ +package lcw + +import ( + "fmt" + "sort" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExpirableCache(t *testing.T) { + o := NewOpts[string]() + lc, err := NewExpirableCache(o.MaxKeys(5), o.TTL(time.Millisecond*100)) + require.NoError(t, err) + for i := 0; i < 5; i++ { + i := i + _, e := lc.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + time.Sleep(10 * time.Millisecond) + } + + assert.Equal(t, 5, lc.Stat().Keys) + assert.Equal(t, int64(5), lc.Stat().Misses) + + keys := lc.Keys() + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + assert.EqualValues(t, []string{"key-0", "key-1", "key-2", "key-3", "key-4"}, keys) + + _, e := lc.Get("key-xx", func() (string, error) { + return "result-xx", nil + }) + assert.NoError(t, e) + assert.Equal(t, 5, lc.Stat().Keys) + assert.Equal(t, int64(6), lc.Stat().Misses) + + // let key-0 expire, GitHub Actions friendly way + for lc.Stat().Keys > 4 { + lc.backend.DeleteExpired() // enforce DeleteExpired for GitHub earlier than o.TTL/2 + time.Sleep(time.Millisecond * 10) + } + assert.Equal(t, 4, lc.Stat().Keys) + + time.Sleep(210 * time.Millisecond) + assert.Equal(t, 0, lc.keys()) + assert.Equal(t, []string{}, lc.Keys()) + + assert.NoError(t, lc.Close()) +} + +func TestExpirableCache_MaxKeys(t *testing.T) { + var coldCalls int32 + o := NewOpts[string]() + lc, err := NewExpirableCache(o.MaxKeys(5), o.MaxValSize(10)) + require.NoError(t, err) + + // put 5 keys to cache + for i := 0; i < 5; i++ { + i := i + res, e := lc.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + assert.Equal(t, fmt.Sprintf("result-%d", i), res) + assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) + } + + // check if really cached + res, err := lc.Get("key-3", func() (string, error) { + return "result-blah", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-3", res, "should be cached") + + // try to cache after maxKeys reached + res, err = lc.Get("key-X", func() (string, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res) + assert.Equal(t, 5, lc.keys()) + + // put to cache and make sure it cached + res, err = lc.Get("key-Z", func() (string, error) { + return "result-Z", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-Z", res) + + res, err = lc.Get("key-Z", func() (string, error) { + return "result-Zzzz", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-Zzzz", res, "got non-cached value") + assert.Equal(t, 5, lc.keys()) + + assert.NoError(t, lc.Close()) +} + +func TestExpirableCache_BadOptions(t *testing.T) { + o := NewOpts[string]() + _, err := NewExpirableCache(o.MaxCacheSize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max cache size") + + _, err = NewExpirableCache(o.MaxKeySize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max key size") + + _, err = NewExpirableCache(o.MaxKeys(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max keys") + + _, err = NewExpirableCache(o.MaxValSize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max value size") + + _, err = NewExpirableCache(o.TTL(-1)) + assert.EqualError(t, err, "failed to set cache option: negative ttl") +} + +func TestExpirableCacheWithBus(t *testing.T) { + ps := &mockPubSub{} + o := NewOpts[string]() + lc1, err := NewExpirableCache(o.MaxKeys(5), o.TTL(time.Millisecond*100), o.EventBus(ps)) + require.NoError(t, err) + defer lc1.Close() + + lc2, err := NewExpirableCache(o.MaxKeys(50), o.TTL(time.Millisecond*5000), o.EventBus(ps)) + require.NoError(t, err) + defer lc2.Close() + + // add 5 keys to the first node cache + for i := 0; i < 5; i++ { + i := i + _, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + time.Sleep(10 * time.Millisecond) + } + + assert.Equal(t, 0, len(ps.CalledKeys()), "no events") + assert.Equal(t, 5, lc1.Stat().Keys) + assert.Equal(t, int64(5), lc1.Stat().Misses) + + // add key-1 key to the second node + _, e := lc2.Get("key-1", func() (string, error) { + return "result-111", nil + }) + assert.NoError(t, e) + assert.Equal(t, 1, lc2.Stat().Keys) + assert.Equal(t, int64(1), lc2.Stat().Misses, lc2.Stat()) + + // let key-0 expire, GitHub Actions friendly way + for lc1.Stat().Keys > 4 { + lc1.backend.DeleteExpired() // enforce DeleteExpired for GitHub earlier than TTL/2 + ps.Wait() // wait for onBusEvent goroutines to finish + time.Sleep(time.Millisecond * 10) + } + assert.Equal(t, 4, lc1.Stat().Keys) + assert.Equal(t, 1, lc2.Stat().Keys, "key-1 still in cache2") + assert.Equal(t, 1, len(ps.CalledKeys())) + + time.Sleep(210 * time.Millisecond) // let all keys expire + ps.Wait() // wait for onBusEvent goroutines to finish + assert.Equal(t, 6, len(ps.CalledKeys()), "6 events, key-1 expired %+v", ps.calledKeys) + assert.Equal(t, 0, lc1.Stat().Keys) + assert.Equal(t, 0, lc2.Stat().Keys, "key-1 removed from cache2") +} diff --git a/v2/go.mod b/v2/go.mod new file mode 100644 index 0000000..7cc1156 --- /dev/null +++ b/v2/go.mod @@ -0,0 +1,25 @@ +module github.com/go-pkgz/lcw/v2 + +go 1.21 + +toolchain go1.21.6 + +require ( + github.com/alicebob/miniredis/v2 v2.31.1 + github.com/google/uuid v1.5.0 + github.com/hashicorp/go-multierror v1.1.1 + github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/redis/go-redis/v9 v9.4.0 + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/yuin/gopher-lua v1.1.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/v2/go.sum b/v2/go.sum new file mode 100644 index 0000000..fe8d608 --- /dev/null +++ b/v2/go.sum @@ -0,0 +1,40 @@ +github.com/DmitriyVTitov/size v1.5.0/go.mod h1:le6rNI4CoLQV1b9gzp1+3d7hMAD/uu2QcJ+aYbNgiU0= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.31.1 h1:7XAt0uUg3DtwEKW5ZAGa+K7FZV2DdKQo5K/6TTnfX8Y= +github.com/alicebob/miniredis/v2 v2.31.1/go.mod h1:UB/T2Uztp7MlFSDakaX1sTXUv5CASoprx0wulRT6HBg= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= +github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= +github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/v2/internal/cache/cache.go b/v2/internal/cache/cache.go new file mode 100644 index 0000000..5bf5584 --- /dev/null +++ b/v2/internal/cache/cache.go @@ -0,0 +1,251 @@ +// Package cache implements LoadingCache. +// +// Support LRC TTL-based eviction. +package cache + +import ( + "fmt" + "sort" + "sync" + "time" +) + +// LoadingCache provides expirable loading cache with LRC eviction. +type LoadingCache[V any] struct { + purgeEvery time.Duration + ttl time.Duration + maxKeys int64 + done chan struct{} + onEvicted func(key string, value V) + + mu sync.Mutex + data map[string]*cacheItem[V] +} + +// noEvictionTTL - very long ttl to prevent eviction +const noEvictionTTL = time.Hour * 24 * 365 * 10 + +// NewLoadingCache returns a new expirable LRC cache, activates purge with purgeEvery (0 to never purge). +// Default MaxKeys is unlimited (0). +func NewLoadingCache[V any](options ...Option[V]) (*LoadingCache[V], error) { + res := LoadingCache[V]{ + data: map[string]*cacheItem[V]{}, + ttl: noEvictionTTL, + purgeEvery: 0, + maxKeys: 0, + done: make(chan struct{}), + } + + for _, opt := range options { + if err := opt(&res); err != nil { + return nil, fmt.Errorf("failed to set cache option: %w", err) + } + } + + if res.maxKeys > 0 || res.purgeEvery > 0 { + if res.purgeEvery == 0 { + res.purgeEvery = time.Minute * 5 // non-zero purge enforced because maxKeys defined + } + go func(done <-chan struct{}) { + ticker := time.NewTicker(res.purgeEvery) + for { + select { + case <-done: + return + case <-ticker.C: + res.mu.Lock() + res.purge(res.maxKeys) + res.mu.Unlock() + } + } + }(res.done) + } + return &res, nil +} + +// Set key +func (c *LoadingCache[V]) Set(key string, value V) { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + if _, ok := c.data[key]; !ok { + c.data[key] = &cacheItem[V]{} + } + c.data[key].data = value + c.data[key].expiresAt = now.Add(c.ttl) + + // Enforced purge call in addition the one from the ticker + // to limit the worst-case scenario with a lot of sets in the + // short period of time (between two timed purge calls) + if c.maxKeys > 0 && int64(len(c.data)) >= c.maxKeys*2 { + c.purge(c.maxKeys) + } +} + +// Get returns the key value +func (c *LoadingCache[V]) Get(key string) (V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + value, ok := c.getValue(key) + if !ok { + var emptyValue V + return emptyValue, false + } + return value, ok +} + +// Peek returns the key value (or undefined if not found) without updating the "recently used"-ness of the key. +func (c *LoadingCache[V]) Peek(key string) (V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + value, ok := c.getValue(key) + if !ok { + var emptyValue V + return emptyValue, false + } + return value, ok +} + +// Invalidate key (item) from the cache +func (c *LoadingCache[V]) Invalidate(key string) { + c.mu.Lock() + if value, ok := c.data[key]; ok { + delete(c.data, key) + if c.onEvicted != nil { + c.onEvicted(key, value.data) + } + } + c.mu.Unlock() +} + +// InvalidateFn deletes multiple keys if predicate is true +func (c *LoadingCache[V]) InvalidateFn(fn func(key string) bool) { + c.mu.Lock() + for key, value := range c.data { + if fn(key) { + delete(c.data, key) + if c.onEvicted != nil { + c.onEvicted(key, value.data) + } + } + } + c.mu.Unlock() +} + +// Keys return slice of current keys in the cache +func (c *LoadingCache[V]) Keys() []string { + c.mu.Lock() + defer c.mu.Unlock() + keys := make([]string, 0, len(c.data)) + for k := range c.data { + keys = append(keys, k) + } + return keys +} + +// get value respecting the expiration, should be called with lock +func (c *LoadingCache[V]) getValue(key string) (V, bool) { + value, ok := c.data[key] + if !ok { + var emptyValue V + return emptyValue, false + } + if time.Now().After(c.data[key].expiresAt) { + var emptyValue V + return emptyValue, false + } + return value.data, ok +} + +// Purge clears the cache completely. +func (c *LoadingCache[V]) Purge() { + c.mu.Lock() + defer c.mu.Unlock() + for k, v := range c.data { + delete(c.data, k) + if c.onEvicted != nil { + c.onEvicted(k, v.data) + } + } +} + +// DeleteExpired clears cache of expired items +func (c *LoadingCache[V]) DeleteExpired() { + c.mu.Lock() + defer c.mu.Unlock() + c.purge(0) +} + +// ItemCount return count of items in cache +func (c *LoadingCache[V]) ItemCount() int { + c.mu.Lock() + n := len(c.data) + c.mu.Unlock() + return n +} + +// Close cleans the cache and destroys running goroutines +func (c *LoadingCache[V]) Close() { + c.mu.Lock() + defer c.mu.Unlock() + // don't panic in case service is already closed + select { + case <-c.done: + return + default: + } + close(c.done) +} + +// keysWithTS includes list of keys with ts. This is for sorting keys +// in order to provide least recently added sorting for size-based eviction +type keysWithTS []struct { + key string + ts time.Time +} + +// purge records > maxKeys. Has to be called with lock! +// call with maxKeys 0 will only clear expired entries. +func (c *LoadingCache[V]) purge(maxKeys int64) { + kts := keysWithTS{} + + for key, value := range c.data { + // ttl eviction + if time.Now().After(c.data[key].expiresAt) { + delete(c.data, key) + if c.onEvicted != nil { + c.onEvicted(key, value.data) + } + } + + // prepare list of keysWithTS for size eviction + if maxKeys > 0 && int64(len(c.data)) > maxKeys { + ts := c.data[key].expiresAt + + kts = append(kts, struct { + key string + ts time.Time + }{key, ts}) + } + } + + // size eviction + size := int64(len(c.data)) + if len(kts) > 0 { + sort.Slice(kts, func(i int, j int) bool { return kts[i].ts.Before(kts[j].ts) }) + for d := 0; int64(d) < size-maxKeys; d++ { + key := kts[d].key + value := c.data[key].data + delete(c.data, key) + if c.onEvicted != nil { + c.onEvicted(key, value) + } + } + } +} + +type cacheItem[V any] struct { + expiresAt time.Time + data V +} diff --git a/v2/internal/cache/cache_test.go b/v2/internal/cache/cache_test.go new file mode 100644 index 0000000..a3d89cc --- /dev/null +++ b/v2/internal/cache/cache_test.go @@ -0,0 +1,209 @@ +package cache + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLoadingCacheNoPurge(t *testing.T) { + lc, err := NewLoadingCache[string]() + assert.NoError(t, err) + defer lc.Close() + + lc.Set("key1", "val1") + assert.Equal(t, 1, lc.ItemCount()) + + v, ok := lc.Peek("key1") + assert.Equal(t, "val1", v) + assert.True(t, ok) + + v, ok = lc.Peek("key2") + assert.Empty(t, v) + assert.False(t, ok) + + assert.Equal(t, []string{"key1"}, lc.Keys()) +} + +func TestLoadingCacheWithPurge(t *testing.T) { + var evicted []string + o := NewOpts[string]() + lc, err := NewLoadingCache( + o.PurgeEvery(time.Millisecond*100), + o.TTL(150*time.Millisecond), + o.OnEvicted(func(key string, value string) { evicted = append(evicted, key, value) }), + ) + assert.NoError(t, err) + defer lc.Close() + + lc.Set("key1", "val1") + + time.Sleep(100 * time.Millisecond) // not enough to expire + assert.Equal(t, 1, lc.ItemCount()) + + v, ok := lc.Get("key1") + assert.Equal(t, "val1", v) + assert.True(t, ok) + + time.Sleep(200 * time.Millisecond) // expire + v, ok = lc.Get("key1") + assert.False(t, ok) + assert.Zero(t, v) + + assert.Equal(t, 0, lc.ItemCount()) + assert.Equal(t, []string{"key1", "val1"}, evicted) + + // add new entry + lc.Set("key2", "val2") + assert.Equal(t, 1, lc.ItemCount()) + + time.Sleep(200 * time.Millisecond) // expire key2 + + // DeleteExpired, key2 deleted + lc.DeleteExpired() + assert.Equal(t, 0, lc.ItemCount()) + assert.Equal(t, []string{"key1", "val1", "key2", "val2"}, evicted) + + // add third entry + lc.Set("key3", "val3") + assert.Equal(t, 1, lc.ItemCount()) + + // Purge, cache should be clean + lc.Purge() + assert.Equal(t, 0, lc.ItemCount()) + assert.Equal(t, []string{"key1", "val1", "key2", "val2", "key3", "val3"}, evicted) +} + +func TestLoadingCacheWithPurgeEnforcedBySize(t *testing.T) { + o := NewOpts[string]() + lc, err := NewLoadingCache(o.MaxKeys(10)) + assert.NoError(t, err) + defer lc.Close() + + for i := 0; i < 100; i++ { + i := i + lc.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("val%d", i)) + v, ok := lc.Get(fmt.Sprintf("key%d", i)) + assert.Equal(t, fmt.Sprintf("val%d", i), v) + assert.True(t, ok) + assert.True(t, lc.ItemCount() < 20) + } + + assert.Equal(t, 10, lc.ItemCount()) +} + +func TestLoadingCacheWithPurgeMax(t *testing.T) { + o := NewOpts[string]() + lc, err := NewLoadingCache(o.PurgeEvery(time.Millisecond*50), o.MaxKeys(2)) + assert.NoError(t, err) + defer lc.Close() + + lc.Set("key1", "val1") + lc.Set("key2", "val2") + lc.Set("key3", "val3") + assert.Equal(t, 3, lc.ItemCount()) + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 2, lc.ItemCount()) + + _, found := lc.Get("key1") + assert.False(t, found, "key1 should be deleted") +} + +func TestLoadingCacheConcurrency(t *testing.T) { + lc, err := NewLoadingCache[string]() + assert.NoError(t, err) + defer lc.Close() + wg := sync.WaitGroup{} + wg.Add(1000) + for i := 0; i < 1000; i++ { + go func(i int) { + lc.Set(fmt.Sprintf("key-%d", i/10), fmt.Sprintf("val-%d", i/10)) + wg.Done() + }(i) + } + wg.Wait() + assert.Equal(t, 100, lc.ItemCount()) +} + +func TestLoadingCacheInvalidateAndEvict(t *testing.T) { + var evicted int + o := NewOpts[string]() + lc, err := NewLoadingCache(o.OnEvicted(func(_ string, _ string) { evicted++ })) + assert.NoError(t, err) + defer lc.Close() + + lc.Set("key1", "val1") + lc.Set("key2", "val2") + + val, ok := lc.Get("key1") + assert.True(t, ok) + assert.Equal(t, "val1", val) + assert.Equal(t, 0, evicted) + + lc.Invalidate("key1") + assert.Equal(t, 1, evicted) + val, ok = lc.Get("key1") + assert.Empty(t, val) + assert.False(t, ok) + + val, ok = lc.Get("key2") + assert.True(t, ok) + assert.Equal(t, "val2", val) + + lc.InvalidateFn(func(key string) bool { + return key == "key2" + }) + assert.Equal(t, 2, evicted) + _, ok = lc.Get("key2") + assert.False(t, ok) + assert.Equal(t, 0, lc.ItemCount()) +} + +func TestLoadingCacheBadOption(t *testing.T) { + lc, err := NewLoadingCache(func(_ *LoadingCache[string]) error { + return fmt.Errorf("mock err") + }) + assert.EqualError(t, err, "failed to set cache option: mock err") + assert.Nil(t, lc) +} + +func TestLoadingExpired(t *testing.T) { + o := NewOpts[string]() + lc, err := NewLoadingCache(o.TTL(time.Millisecond * 5)) + assert.NoError(t, err) + defer lc.Close() + + lc.Set("key1", "val1") + assert.Equal(t, 1, lc.ItemCount()) + + v, ok := lc.Peek("key1") + assert.Equal(t, v, "val1") + assert.True(t, ok) + + v, ok = lc.Get("key1") + assert.Equal(t, v, "val1") + assert.True(t, ok) + + time.Sleep(time.Millisecond * 10) // wait for entry to expire + assert.Equal(t, 1, lc.ItemCount()) // but not purged + + v, ok = lc.Peek("key1") + assert.Empty(t, v) + assert.False(t, ok) + + v, ok = lc.Get("key1") + assert.Empty(t, v) + assert.False(t, ok) +} + +func TestDoubleClose(t *testing.T) { + o := NewOpts[string]() + lc, err := NewLoadingCache(o.TTL(time.Millisecond * 5)) + assert.NoError(t, err) + lc.Close() + lc.Close() // don't panic in case service is already closed +} diff --git a/v2/internal/cache/options.go b/v2/internal/cache/options.go new file mode 100644 index 0000000..4aff94b --- /dev/null +++ b/v2/internal/cache/options.go @@ -0,0 +1,50 @@ +package cache + +import "time" + +// Option func type +type Option[V any] func(lc *LoadingCache[V]) error + +// WorkerOptions holds the option setting methods +type WorkerOptions[T any] struct{} + +// NewOpts creates a new WorkerOptions instance +func NewOpts[T any]() *WorkerOptions[T] { + return &WorkerOptions[T]{} +} + +// OnEvicted called automatically for expired and manually deleted entries +func (o *WorkerOptions[V]) OnEvicted(fn func(key string, value V)) Option[V] { + return func(lc *LoadingCache[V]) error { + lc.onEvicted = fn + return nil + } +} + +// PurgeEvery functional option defines purge interval +// by default it is 0, i.e. never. If MaxKeys set to any non-zero this default will be 5minutes +func (o *WorkerOptions[V]) PurgeEvery(interval time.Duration) Option[V] { + return func(lc *LoadingCache[V]) error { + lc.purgeEvery = interval + return nil + } +} + +// MaxKeys functional option defines how many keys to keep. +// By default, it is 0, which means unlimited. +// If any non-zero MaxKeys set, default PurgeEvery will be set to 5 minutes +func (o *WorkerOptions[V]) MaxKeys(max int) Option[V] { + return func(lc *LoadingCache[V]) error { + lc.maxKeys = int64(max) + return nil + } +} + +// TTL functional option defines TTL for all cache entries. +// By default, it is set to 10 years, sane option for expirable cache might be 5 minutes. +func (o *WorkerOptions[V]) TTL(ttl time.Duration) Option[V] { + return func(lc *LoadingCache[V]) error { + lc.ttl = ttl + return nil + } +} diff --git a/v2/lru_cache.go b/v2/lru_cache.go new file mode 100644 index 0000000..b69990b --- /dev/null +++ b/v2/lru_cache.go @@ -0,0 +1,169 @@ +package lcw + +import ( + "fmt" + "sync/atomic" + + "github.com/go-pkgz/lcw/v2/eventbus" + "github.com/google/uuid" + lru "github.com/hashicorp/golang-lru/v2" +) + +// LruCache wraps lru.LruCache with loading cache Get and size limits +type LruCache[V any] struct { + Workers[V] + CacheStat + backend *lru.Cache[string, V] + currentSize int64 + id string // uuid identifying cache instance +} + +// NewLruCache makes LRU LoadingCache implementation, 1000 max keys by default +func NewLruCache[V any](opts ...Option[V]) (*LruCache[V], error) { + res := LruCache[V]{ + Workers: Workers[V]{ + maxKeys: 1000, + maxValueSize: 0, + eventBus: &eventbus.NopPubSub{}, + }, + id: uuid.New().String(), + } + for _, opt := range opts { + if err := opt(&res.Workers); err != nil { + return nil, fmt.Errorf("failed to set cache option: %w", err) + } + } + + err := res.init() + return &res, err +} + +func (c *LruCache[V]) init() error { + if err := c.eventBus.Subscribe(c.onBusEvent); err != nil { + return fmt.Errorf("can't subscribe to event bus: %w", err) + } + + onEvicted := func(key string, value V) { + if c.onEvicted != nil { + c.onEvicted(key, value) + } + if s, ok := any(value).(Sizer); ok { + size := s.Size() + atomic.AddInt64(&c.currentSize, -1*int64(size)) + } + _ = c.eventBus.Publish(c.id, key) // signal invalidation to other nodes + } + + var err error + // OnEvicted called automatically for expired and manually deleted + if c.backend, err = lru.NewWithEvict[string, V](c.maxKeys, onEvicted); err != nil { + return fmt.Errorf("failed to make lru cache backend: %w", err) + } + + return nil +} + +// Get gets value by key or load with fn if not found in cache +func (c *LruCache[V]) Get(key string, fn func() (V, error)) (data V, err error) { + if v, ok := c.backend.Get(key); ok { + atomic.AddInt64(&c.Hits, 1) + return v, nil + } + + if data, err = fn(); err != nil { + atomic.AddInt64(&c.Errors, 1) + return data, err + } + + atomic.AddInt64(&c.Misses, 1) + + if !c.allowed(key, data) { + return data, nil + } + + c.backend.Add(key, data) + + if s, ok := any(data).(Sizer); ok { + atomic.AddInt64(&c.currentSize, int64(s.Size())) + if c.maxCacheSize > 0 && atomic.LoadInt64(&c.currentSize) > c.maxCacheSize { + for atomic.LoadInt64(&c.currentSize) > c.maxCacheSize { + c.backend.RemoveOldest() + } + } + } + + return data, nil +} + +// Peek returns the key value (or undefined if not found) without updating the "recently used"-ness of the key. +func (c *LruCache[V]) Peek(key string) (V, bool) { + return c.backend.Peek(key) +} + +// Purge clears the cache completely. +func (c *LruCache[V]) Purge() { + c.backend.Purge() + atomic.StoreInt64(&c.currentSize, 0) +} + +// Invalidate removes keys with passed predicate fn, i.e. fn(key) should be true to get evicted +func (c *LruCache[V]) Invalidate(fn func(key string) bool) { + for _, k := range c.backend.Keys() { // Keys() returns copy of cache's key, safe to remove directly + if fn(k) { + c.backend.Remove(k) + } + } +} + +// Delete cache item by key +func (c *LruCache[V]) Delete(key string) { + c.backend.Remove(key) +} + +// Keys returns cache keys +func (c *LruCache[V]) Keys() (res []string) { + return c.backend.Keys() +} + +// Stat returns cache statistics +func (c *LruCache[V]) Stat() CacheStat { + return CacheStat{ + Hits: c.Hits, + Misses: c.Misses, + Size: c.size(), + Keys: c.keys(), + Errors: c.Errors, + } +} + +// Close does nothing for this type of cache +func (c *LruCache[V]) Close() error { + return nil +} + +// onBusEvent reacts on invalidation message triggered by event bus from another cache instance +func (c *LruCache[V]) onBusEvent(id, key string) { + if id != c.id && c.backend.Contains(key) { // prevent reaction on event from this cache + c.backend.Remove(key) + } +} + +func (c *LruCache[V]) size() int64 { + return atomic.LoadInt64(&c.currentSize) +} + +func (c *LruCache[V]) keys() int { + return c.backend.Len() +} + +func (c *LruCache[V]) allowed(key string, data V) bool { + if c.maxKeySize > 0 && len(key) > c.maxKeySize { + return false + } + if s, ok := any(data).(Sizer); ok { + if c.maxValueSize > 0 && s.Size() >= c.maxValueSize { + return false + } + } + return true +} diff --git a/v2/lru_cache_test.go b/v2/lru_cache_test.go new file mode 100644 index 0000000..684b03f --- /dev/null +++ b/v2/lru_cache_test.go @@ -0,0 +1,304 @@ +package lcw + +import ( + "fmt" + "io" + "log" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "sort" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/go-pkgz/lcw/v2/eventbus" +) + +func TestLruCache_MaxKeys(t *testing.T) { + var coldCalls int32 + o := NewOpts[string]() + lc, err := NewLruCache(o.MaxKeys(5), o.MaxValSize(10)) + require.NoError(t, err) + + // put 5 keys to cache + for i := 0; i < 5; i++ { + i := i + res, e := lc.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + assert.Equal(t, fmt.Sprintf("result-%d", i), res) + assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) + } + + keys := lc.Keys() + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + assert.EqualValues(t, []string{"key-0", "key-1", "key-2", "key-3", "key-4"}, keys) + + // check if really cached + res, err := lc.Get("key-3", func() (string, error) { + return "result-blah", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-3", res, "should be cached") + + // try to cache after maxKeys reached + res, err = lc.Get("key-X", func() (string, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res) + assert.Equal(t, 5, lc.backend.Len()) + + // put to cache and make sure it cached + res, err = lc.Get("key-Z", func() (string, error) { + return "result-Z", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-Z", res) + + res, err = lc.Get("key-Z", func() (string, error) { + return "result-Zzzz", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-Z", res, "got cached value") + assert.Equal(t, 5, lc.backend.Len()) +} + +func TestLruCache_BadOptions(t *testing.T) { + o := NewOpts[string]() + _, err := NewLruCache(o.MaxCacheSize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max cache size") + + _, err = NewLruCache(o.MaxKeySize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max key size") + + _, err = NewLruCache(o.MaxKeys(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max keys") + + _, err = NewLruCache(o.MaxValSize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max value size") + + _, err = NewLruCache(o.TTL(-1)) + assert.EqualError(t, err, "failed to set cache option: negative ttl") +} + +func TestLruCache_MaxKeysWithBus(t *testing.T) { + ps := &mockPubSub{} + o := NewOpts[string]() + + var coldCalls int32 + lc1, err := NewLruCache(o.MaxKeys(5), o.MaxValSize(10), o.EventBus(ps)) + require.NoError(t, err) + defer lc1.Close() + + lc2, err := NewLruCache(o.MaxKeys(50), o.MaxValSize(100), o.EventBus(ps)) + require.NoError(t, err) + defer lc2.Close() + + // put 5 keys to cache1 + for i := 0; i < 5; i++ { + i := i + res, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + assert.Equal(t, fmt.Sprintf("result-%d", i), res) + assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) + } + // check if really cached + res, err := lc1.Get("key-3", func() (string, error) { + return "result-blah", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-3", res, "should be cached") + + assert.Equal(t, 0, len(ps.CalledKeys()), "no events") + + // put 1 key to cache2 + res, e := lc2.Get("key-1", func() (string, error) { + return "result-111", nil + }) + assert.NoError(t, e) + assert.Equal(t, "result-111", res) + + // try to cache1 after maxKeys reached, will remove key-0 + res, err = lc1.Get("key-X", func() (string, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res) + assert.Equal(t, 5, lc1.backend.Len()) + + assert.Equal(t, 1, len(ps.CalledKeys()), "1 event, key-0 expired") + + assert.Equal(t, 1, lc2.backend.Len(), "cache2 still has key-1") + + // try to cache1 after maxKeys reached, will remove key-1 + res, err = lc1.Get("key-X2", func() (string, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res) + + assert.Equal(t, 2, len(ps.CalledKeys()), "2 events, key-1 expired") + + // wait for onBusEvent goroutines to finish + ps.Wait() + + assert.Equal(t, 0, lc2.backend.Len(), "cache2 removed key-1") +} + +func TestLruCache_MaxKeysWithRedis(t *testing.T) { + if _, ok := os.LookupEnv("ENABLE_REDIS_TESTS"); !ok { + t.Skip("ENABLE_REDIS_TESTS env variable is not set, not expecting Redis to be ready at 127.0.0.1:6379") + } + + var coldCalls int32 + + //nolint:gosec // not used for security purpose + channel := "lcw-test-" + strconv.Itoa(rand.Intn(1000000)) + + redisPubSub1, err := eventbus.NewRedisPubSub("127.0.0.1:6379", channel) + require.NoError(t, err) + o := NewOpts[string]() + lc1, err := NewLruCache(o.MaxKeys(5), o.MaxValSize(10), o.EventBus(redisPubSub1)) + require.NoError(t, err) + defer lc1.Close() + + redisPubSub2, err := eventbus.NewRedisPubSub("127.0.0.1:6379", channel) + require.NoError(t, err) + lc2, err := NewLruCache(o.MaxKeys(50), o.MaxValSize(100), o.EventBus(redisPubSub2)) + require.NoError(t, err) + defer lc2.Close() + + // put 5 keys to cache1 + for i := 0; i < 5; i++ { + i := i + res, e := lc1.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + assert.Equal(t, fmt.Sprintf("result-%d", i), res) + assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) + } + // check if really cached + res, err := lc1.Get("key-3", func() (string, error) { + return "result-blah", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-3", res, "should be cached") + + // put 1 key to cache2 + res, e := lc2.Get("key-1", func() (string, error) { + return "result-111", nil + }) + assert.NoError(t, e) + assert.Equal(t, "result-111", res) + + // try to cache1 after maxKeys reached, will remove key-0 + res, err = lc1.Get("key-X", func() (string, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res) + assert.Equal(t, 5, lc1.backend.Len()) + + assert.Equal(t, 1, lc2.backend.Len(), "cache2 still has key-1") + + // try to cache1 after maxKeys reached, will remove key-1 + res, err = lc1.Get("key-X2", func() (string, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res) + + time.Sleep(time.Second) + assert.Equal(t, 0, lc2.backend.Len(), "cache2 removed key-1") + assert.NoError(t, redisPubSub1.Close()) + assert.NoError(t, redisPubSub2.Close()) +} + +// LruCache illustrates the use of LRU loading cache +func ExampleLruCache() { + // set up test server for single response + var hitCount int + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() == "/post/42" && hitCount == 0 { + _, _ = w.Write([]byte("test response")) + return + } + w.WriteHeader(404) + })) + + // load page function + loadURL := func(url string) (string, error) { + resp, err := http.Get(url) // nolint + if err != nil { + return "", err + } + b, err := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return "", err + } + return string(b), nil + } + + // fixed size LRU cache, 100 items, up to 10k in total size + o := NewOpts[string]() + cache, err := NewLruCache(o.MaxKeys(100), o.MaxCacheSize(10*1024)) + if err != nil { + log.Printf("can't make lru cache, %v", err) + } + + // url not in cache, load data + url := ts.URL + "/post/42" + val, err := cache.Get(url, func() (val string, err error) { + return loadURL(url) + }) + if err != nil { + log.Fatalf("can't load url %s, %v", url, err) + } + fmt.Println(val) + + // url not in cache, load data + val, err = cache.Get(url, func() (val string, err error) { + return loadURL(url) + }) + if err != nil { + log.Fatalf("can't load url %s, %v", url, err) + } + fmt.Println(val) + + // url cached, skip load and get from the cache + val, err = cache.Get(url, func() (val string, err error) { + return loadURL(url) + }) + if err != nil { + log.Fatalf("can't load url %s, %v", url, err) + } + fmt.Println(val) + + // get cache stats + stats := cache.Stat() + fmt.Printf("%+v\n", stats) + + // close test HTTP server after all log.Fatalf are passed + ts.Close() + + // Output: + // test response + // test response + // test response + // {hits:2, misses:1, ratio:0.67, keys:1, size:0, errors:0} +} diff --git a/v2/options.go b/v2/options.go new file mode 100644 index 0000000..c960d13 --- /dev/null +++ b/v2/options.go @@ -0,0 +1,114 @@ +package lcw + +import ( + "fmt" + "time" + + "github.com/go-pkgz/lcw/v2/eventbus" +) + +type Workers[V any] struct { + maxKeys int + maxValueSize int + maxKeySize int + maxCacheSize int64 + ttl time.Duration + onEvicted func(key string, value V) + eventBus eventbus.PubSub + strToV func(string) V +} + +// Option func type +type Option[V any] func(o *Workers[V]) error + +// WorkerOptions holds the option setting methods +type WorkerOptions[T any] struct{} + +// NewOpts creates a new WorkerOptions instance +func NewOpts[T any]() *WorkerOptions[T] { + return &WorkerOptions[T]{} +} + +// MaxValSize functional option defines the largest value's size allowed to be cached +// By default it is 0, which means unlimited. +func (o *WorkerOptions[V]) MaxValSize(max int) Option[V] { + return func(o *Workers[V]) error { + if max < 0 { + return fmt.Errorf("negative max value size") + } + o.maxValueSize = max + return nil + } +} + +// MaxKeySize functional option defines the largest key's size allowed to be used in cache +// By default it is 0, which means unlimited. +func (o *WorkerOptions[V]) MaxKeySize(max int) Option[V] { + return func(o *Workers[V]) error { + if max < 0 { + return fmt.Errorf("negative max key size") + } + o.maxKeySize = max + return nil + } +} + +// MaxKeys functional option defines how many keys to keep. +// By default, it is 0, which means unlimited. +func (o *WorkerOptions[V]) MaxKeys(max int) Option[V] { + return func(o *Workers[V]) error { + if max < 0 { + return fmt.Errorf("negative max keys") + } + o.maxKeys = max + return nil + } +} + +// MaxCacheSize functional option defines the total size of cached data. +// By default, it is 0, which means unlimited. +func (o *WorkerOptions[V]) MaxCacheSize(max int64) Option[V] { + return func(o *Workers[V]) error { + if max < 0 { + return fmt.Errorf("negative max cache size") + } + o.maxCacheSize = max + return nil + } +} + +// TTL functional option defines duration. +// Works for ExpirableCache only +func (o *WorkerOptions[V]) TTL(ttl time.Duration) Option[V] { + return func(o *Workers[V]) error { + if ttl < 0 { + return fmt.Errorf("negative ttl") + } + o.ttl = ttl + return nil + } +} + +// OnEvicted sets callback on invalidation event +func (o *WorkerOptions[V]) OnEvicted(fn func(key string, value V)) Option[V] { + return func(o *Workers[V]) error { + o.onEvicted = fn + return nil + } +} + +// EventBus sets PubSub for distributed cache invalidation +func (o *WorkerOptions[V]) EventBus(pubSub eventbus.PubSub) Option[V] { + return func(o *Workers[V]) error { + o.eventBus = pubSub + return nil + } +} + +// StrToV sets strToV function for RedisCache +func (o *WorkerOptions[V]) StrToV(fn func(string) V) Option[V] { + return func(o *Workers[V]) error { + o.strToV = fn + return nil + } +} diff --git a/v2/redis_cache.go b/v2/redis_cache.go new file mode 100644 index 0000000..5f78f79 --- /dev/null +++ b/v2/redis_cache.go @@ -0,0 +1,184 @@ +package lcw + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync/atomic" + "time" + + "github.com/redis/go-redis/v9" +) + +// RedisValueSizeLimit is maximum allowed value size in Redis +const RedisValueSizeLimit = 512 * 1024 * 1024 + +// RedisCache implements LoadingCache for Redis. +type RedisCache[V any] struct { + Workers[V] + CacheStat + backend *redis.Client +} + +// NewRedisCache makes Redis LoadingCache implementation. +// Supports only string and string-based types and will return error otherwise. +func NewRedisCache[V any](backend *redis.Client, opts ...Option[V]) (*RedisCache[V], error) { + // check if V is string, not underlying type but directly, and otherwise return error if strToV is nil as it should be defined + + res := RedisCache[V]{ + Workers: Workers[V]{ + ttl: 5 * time.Minute, + }, + } + for _, opt := range opts { + if err := opt(&res.Workers); err != nil { + return nil, fmt.Errorf("failed to set cache option: %w", err) + } + } + + // check if underlying type is string, so we can safely store it in Redis + var v V + if reflect.TypeOf(v).Kind() != reflect.String { + return nil, fmt.Errorf("can't store non-string types in Redis cache") + } + switch any(v).(type) { + case string: + // check strToV option only for string-like but non string types + default: + if res.strToV == nil { + return nil, fmt.Errorf("StrToV option should be set for string-like type") + } + } + + if res.maxValueSize <= 0 || res.maxValueSize > RedisValueSizeLimit { + res.maxValueSize = RedisValueSizeLimit + } + + res.backend = backend + + return &res, nil +} + +// Get gets value by key or load with fn if not found in cache +func (c *RedisCache[V]) Get(key string, fn func() (V, error)) (data V, err error) { + v, getErr := c.backend.Get(context.Background(), key).Result() + switch { + // RedisClient returns nil when find a key in DB + case getErr == nil: + atomic.AddInt64(&c.Hits, 1) + switch any(data).(type) { + case string: + return any(v).(V), nil + default: + return c.strToV(v), nil + } + // RedisClient returns redis.Nil when doesn't find a key in DB + case errors.Is(getErr, redis.Nil): + if data, err = fn(); err != nil { + atomic.AddInt64(&c.Errors, 1) + return data, err + } + // RedisClient returns !nil when something goes wrong while get data + default: + atomic.AddInt64(&c.Errors, 1) + switch any(data).(type) { + case string: + return any(v).(V), getErr + default: + return any(v).(V), getErr + } + } + atomic.AddInt64(&c.Misses, 1) + + if !c.allowed(key, data) { + return data, nil + } + + _, setErr := c.backend.Set(context.Background(), key, data, c.ttl).Result() + if setErr != nil { + atomic.AddInt64(&c.Errors, 1) + return data, setErr + } + + return data, nil +} + +// Invalidate removes keys with passed predicate fn, i.e. fn(key) should be true to get evicted +func (c *RedisCache[V]) Invalidate(fn func(key string) bool) { + for _, key := range c.backend.Keys(context.Background(), "*").Val() { // Keys() returns copy of cache's key, safe to remove directly + if fn(key) { + c.backend.Del(context.Background(), key) + } + } +} + +// Peek returns the key value (or undefined if not found) without updating the "recently used"-ness of the key. +func (c *RedisCache[V]) Peek(key string) (data V, found bool) { + ret, err := c.backend.Get(context.Background(), key).Result() + if err != nil { + var emptyValue V + return emptyValue, false + } + switch any(data).(type) { + case string: + return any(ret).(V), true + default: + return any(ret).(V), true + } +} + +// Purge clears the cache completely. +func (c *RedisCache[V]) Purge() { + c.backend.FlushDB(context.Background()) + +} + +// Delete cache item by key +func (c *RedisCache[V]) Delete(key string) { + c.backend.Del(context.Background(), key) +} + +// Keys gets all keys for the cache +func (c *RedisCache[V]) Keys() (res []string) { + return c.backend.Keys(context.Background(), "*").Val() +} + +// Stat returns cache statistics +func (c *RedisCache[V]) Stat() CacheStat { + return CacheStat{ + Hits: c.Hits, + Misses: c.Misses, + Size: c.size(), + Keys: c.keys(), + Errors: c.Errors, + } +} + +// Close closes underlying connections +func (c *RedisCache[V]) Close() error { + return c.backend.Close() +} + +func (c *RedisCache[V]) size() int64 { + return 0 +} + +func (c *RedisCache[V]) keys() int { + return int(c.backend.DBSize(context.Background()).Val()) +} + +func (c *RedisCache[V]) allowed(key string, data V) bool { + if c.maxKeys > 0 && c.backend.DBSize(context.Background()).Val() >= int64(c.maxKeys) { + return false + } + if c.maxKeySize > 0 && len(key) > c.maxKeySize { + return false + } + if s, ok := any(data).(Sizer); ok { + if c.maxValueSize > 0 && (s.Size() >= c.maxValueSize) { + return false + } + } + return true +} diff --git a/v2/redis_cache_test.go b/v2/redis_cache_test.go new file mode 100644 index 0000000..ebc0b9a --- /dev/null +++ b/v2/redis_cache_test.go @@ -0,0 +1,201 @@ +package lcw + +import ( + "context" + "fmt" + "sort" + "sync/atomic" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newTestRedis returns a redis.Cmdable. +func newTestRedisServer() *miniredis.Miniredis { + mr, err := miniredis.Run() + if err != nil { + panic(err) + } + + return mr +} + +func TestExpirableRedisCache(t *testing.T) { + server := newTestRedisServer() + defer server.Close() + client := redis.NewClient(&redis.Options{ + Addr: server.Addr()}) + defer client.Close() + o := NewOpts[string]() + rc, err := NewRedisCache(client, o.MaxKeys(5), o.TTL(time.Second*6)) + require.NoError(t, err) + defer rc.Close() + require.NoError(t, err) + for i := 0; i < 5; i++ { + i := i + _, e := rc.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + server.FastForward(1000 * time.Millisecond) + } + + assert.Equal(t, 5, rc.Stat().Keys) + assert.Equal(t, int64(5), rc.Stat().Misses) + + keys := rc.Keys() + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + assert.EqualValues(t, []string{"key-0", "key-1", "key-2", "key-3", "key-4"}, keys) + + _, e := rc.Get("key-xx", func() (string, error) { + return "result-xx", nil + }) + assert.NoError(t, e) + assert.Equal(t, 5, rc.Stat().Keys) + assert.Equal(t, int64(6), rc.Stat().Misses) + + server.FastForward(1000 * time.Millisecond) + assert.Equal(t, 4, rc.Stat().Keys) + + server.FastForward(4000 * time.Millisecond) + assert.Equal(t, 0, rc.keys()) + +} + +func TestRedisCache(t *testing.T) { + var coldCalls int32 + + server := newTestRedisServer() + defer server.Close() + client := redis.NewClient(&redis.Options{ + Addr: server.Addr()}) + defer client.Close() + o := NewOpts[string]() + rc, err := NewRedisCache(client, o.MaxKeys(5), o.MaxValSize(10), o.MaxKeySize(10)) + require.NoError(t, err) + defer rc.Close() + // put 5 keys to cache + for i := 0; i < 5; i++ { + i := i + res, e := rc.Get(fmt.Sprintf("key-%d", i), func() (string, error) { + atomic.AddInt32(&coldCalls, 1) + return fmt.Sprintf("result-%d", i), nil + }) + assert.NoError(t, e) + assert.Equal(t, fmt.Sprintf("result-%d", i), res) + assert.Equal(t, int32(i+1), atomic.LoadInt32(&coldCalls)) + } + + // check if really cached + res, err := rc.Get("key-3", func() (string, error) { + return "result-blah", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-3", res, "should be cached") + + // try to cache after maxKeys reached + res, err = rc.Get("key-X", func() (string, error) { + return "result-X", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-X", res) + assert.Equal(t, int64(5), rc.backend.DBSize(context.Background()).Val()) + + // put to cache and make sure it cached + res, err = rc.Get("key-Z", func() (string, error) { + return "result-Z", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-Z", res) + + res, err = rc.Get("key-Z", func() (string, error) { + return "result-Zzzz", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-Zzzz", res, "got non-cached value") + assert.Equal(t, 5, rc.keys()) + + res, err = rc.Get("key-Zzzzzzz", func() (string, error) { + return "result-Zzzz", nil + }) + assert.NoError(t, err) + assert.Equal(t, "result-Zzzz", res, "got non-cached value") + assert.Equal(t, 5, rc.keys()) + + res, ok := rc.Peek("error-key-Z2") + assert.False(t, ok) + assert.Empty(t, res) +} + +func TestRedisCacheErrors(t *testing.T) { + server := newTestRedisServer() + defer server.Close() + client := redis.NewClient(&redis.Options{ + Addr: server.Addr()}) + defer client.Close() + rc, err := NewRedisCache[string](client) + require.NoError(t, err) + defer rc.Close() + + res, err := rc.Get("error-key-Z", func() (string, error) { + return "error-result-Z", fmt.Errorf("some error") + }) + assert.Error(t, err) + assert.Equal(t, "error-result-Z", res) + assert.Equal(t, int64(1), rc.Stat().Errors) +} + +// should not work with non-string types +func TestRedisCacheCreationErrors(t *testing.T) { + // string case, no error + // no close is needed as it will call client.Close(), which will cause panic + rcString, err := NewRedisCache[string](nil) + require.NoError(t, err) + assert.NotNil(t, rcString) + // string-based type but no StrToV option, error expected + rcSizedString, err := NewRedisCache[sizedString](nil) + require.EqualError(t, err, "StrToV option should be set for string-like type") + assert.Nil(t, rcSizedString) + // string-based type with StrToV option, no error + // no close is needed as it will call client.Close(), which will cause panic + o := NewOpts[sizedString]() + rcSizedString, err = NewRedisCache[sizedString](nil, o.StrToV(func(s string) sizedString { return sizedString(s) })) + require.NoError(t, err) + assert.NotNil(t, rcSizedString) + // non-string based type, error expected + rcInt, err := NewRedisCache[int](nil) + require.EqualError(t, err, "can't store non-string types in Redis cache") + assert.Nil(t, rcInt) +} + +func TestRedisCache_BadOptions(t *testing.T) { + server := newTestRedisServer() + defer server.Close() + client := redis.NewClient(&redis.Options{ + Addr: server.Addr()}) + defer client.Close() + + o := NewOpts[string]() + _, err := NewRedisCache(client, o.MaxCacheSize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max cache size") + + _, err = NewRedisCache(client, o.MaxCacheSize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max cache size") + + _, err = NewRedisCache(client, o.MaxKeys(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max keys") + + _, err = NewRedisCache(client, o.MaxValSize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max value size") + + _, err = NewRedisCache(client, o.TTL(-1)) + assert.EqualError(t, err, "failed to set cache option: negative ttl") + + _, err = NewRedisCache(client, o.MaxKeySize(-1)) + assert.EqualError(t, err, "failed to set cache option: negative max key size") + +} diff --git a/v2/scache.go b/v2/scache.go new file mode 100644 index 0000000..d60cfc3 --- /dev/null +++ b/v2/scache.go @@ -0,0 +1,145 @@ +package lcw + +import ( + "fmt" + "strings" +) + +// Scache wraps LoadingCache with partitions (sub-system), and scopes. +// Simplified interface with just 4 funcs - Get, Flush, Stats and Close +type Scache[V any] struct { + lc LoadingCache[V] +} + +// NewScache creates Scache on top of LoadingCache +func NewScache[V any](lc LoadingCache[V]) *Scache[V] { + return &Scache[V]{lc: lc} +} + +// Get retrieves a key from underlying backend +func (m *Scache[V]) Get(key Key, fn func() (V, error)) (data V, err error) { + keyStr := key.String() + val, err := m.lc.Get(keyStr, func() (value V, e error) { + return fn() + }) + return val, err +} + +// Stat delegates the call to the underlying cache backend +func (m *Scache[V]) Stat() CacheStat { + return m.lc.Stat() +} + +// Close calls Close function of the underlying cache +func (m *Scache[V]) Close() error { + return m.lc.Close() +} + +// Flush clears cache and calls postFlushFn async +func (m *Scache[V]) Flush(req FlusherRequest) { + if len(req.scopes) == 0 { + m.lc.Purge() + return + } + + // check if fullKey has matching scopes + inScope := func(fullKey string) bool { + key, err := parseKey(fullKey) + if err != nil { + return false + } + for _, s := range req.scopes { + for _, ks := range key.scopes { + if ks == s { + return true + } + } + } + return false + } + + for _, k := range m.lc.Keys() { + if inScope(k) { + m.lc.Delete(k) // Keys() returns copy of cache's key, safe to remove directly + } + } +} + +// Key for scoped cache. Created foe given partition (can be empty) and set with ID and Scopes. +// example: k := NewKey("sys1").ID(postID).Scopes("last_posts", customer_id) +type Key struct { + id string // the primary part of the key, i.e. usual cache's key + partition string // optional id for a subsystem or cache partition + scopes []string // list of scopes to use in invalidation +} + +// NewKey makes base key for given partition. Partition can be omitted. +func NewKey(partition ...string) Key { + if len(partition) == 0 { + return Key{partition: ""} + } + return Key{partition: partition[0]} +} + +// ID sets key id +func (k Key) ID(id string) Key { + k.id = id + return k +} + +// Scopes of the key +func (k Key) Scopes(scopes ...string) Key { + k.scopes = scopes + return k +} + +// String makes full string key from primary key, partition and scopes +// key string made as @@@@$$.... +func (k Key) String() string { + bld := strings.Builder{} + _, _ = bld.WriteString(k.partition) + _, _ = bld.WriteString("@@") + _, _ = bld.WriteString(k.id) + _, _ = bld.WriteString("@@") + _, _ = bld.WriteString(strings.Join(k.scopes, "$$")) + return bld.String() +} + +// parseKey gets compound key string created by Key func and split it to the actual key, partition and scopes +// key string made as @@@@$$.... +func parseKey(keyStr string) (Key, error) { + elems := strings.Split(keyStr, "@@") + if len(elems) != 3 { + return Key{}, fmt.Errorf("can't parse cache key %s, invalid number of segments %d", keyStr, len(elems)) + } + + scopes := strings.Split(elems[2], "$$") + if len(scopes) == 1 && scopes[0] == "" { + scopes = []string{} + } + key := Key{ + partition: elems[0], + id: elems[1], + scopes: scopes, + } + + return key, nil +} + +// FlusherRequest used as input for cache.Flush +type FlusherRequest struct { + partition string + scopes []string +} + +// Flusher makes new FlusherRequest with empty scopes +func Flusher(partition string) FlusherRequest { + res := FlusherRequest{partition: partition} + return res +} + +// Scopes adds scopes to FlusherRequest +func (f FlusherRequest) Scopes(scopes ...string) FlusherRequest { + f.scopes = scopes + return f +} diff --git a/v2/scache_test.go b/v2/scache_test.go new file mode 100644 index 0000000..d7ca092 --- /dev/null +++ b/v2/scache_test.go @@ -0,0 +1,303 @@ +package lcw + +import ( + "fmt" + "io" + "log" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestScache_Get(t *testing.T) { + lru, err := NewLruCache[[]byte]() + require.NoError(t, err) + lc := NewScache[[]byte](lru) + defer lc.Close() + + var coldCalls int32 + + res, err := lc.Get(NewKey("site").ID("key"), func() ([]byte, error) { + atomic.AddInt32(&coldCalls, 1) + return []byte("result"), nil + }) + assert.NoError(t, err) + assert.Equal(t, "result", string(res)) + assert.Equal(t, int32(1), atomic.LoadInt32(&coldCalls)) + + res, err = lc.Get(NewKey("site").ID("key"), func() ([]byte, error) { + atomic.AddInt32(&coldCalls, 1) + return []byte("result"), nil + }) + assert.NoError(t, err) + assert.Equal(t, "result", string(res)) + assert.Equal(t, int32(1), atomic.LoadInt32(&coldCalls)) + + lc.Flush(Flusher("site")) + time.Sleep(100 * time.Millisecond) // let postFn to do its thing + + _, err = lc.Get(NewKey("site").ID("key"), func() ([]byte, error) { + return nil, fmt.Errorf("err") + }) + assert.Error(t, err) +} + +func TestScache_Scopes(t *testing.T) { + lru, err := NewLruCache[[]byte]() + require.NoError(t, err) + lc := NewScache[[]byte](lru) + defer lc.Close() + + res, err := lc.Get(NewKey("site").ID("key").Scopes("s1", "s2"), func() ([]byte, error) { + return []byte("value"), nil + }) + assert.NoError(t, err) + assert.Equal(t, "value", string(res)) + + res, err = lc.Get(NewKey("site").ID("key2").Scopes("s2"), func() ([]byte, error) { + return []byte("value2"), nil + }) + assert.NoError(t, err) + assert.Equal(t, "value2", string(res)) + + assert.Equal(t, 2, len(lc.lc.Keys())) + lc.Flush(Flusher("site").Scopes("s1")) + assert.Equal(t, 1, len(lc.lc.Keys())) + + _, err = lc.Get(NewKey("site").ID("key2").Scopes("s2"), func() ([]byte, error) { + assert.Fail(t, "should stay") + return nil, nil + }) + assert.NoError(t, err) + res, err = lc.Get(NewKey("site").ID("key").Scopes("s1", "s2"), func() ([]byte, error) { + return []byte("value-upd"), nil + }) + assert.NoError(t, err) + assert.Equal(t, "value-upd", string(res), "was deleted, update") + + assert.Equal(t, CacheStat{Hits: 1, Misses: 3, Keys: 2, Size: 0, Errors: 0}, lc.Stat()) +} + +func TestScache_Flush(t *testing.T) { + lru, err := NewLruCache[[]byte]() + require.NoError(t, err) + lc := NewScache[[]byte](lru) + + addToCache := func(id string, scopes ...string) { + res, err := lc.Get(NewKey("site").ID(id).Scopes(scopes...), func() ([]byte, error) { + return []byte("value" + id), nil + }) + require.NoError(t, err) + require.Equal(t, "value"+id, string(res)) + } + + init := func() { + lc.Flush(Flusher("site")) + addToCache("key1", "s1", "s2") + addToCache("key2", "s1", "s2", "s3") + addToCache("key3", "s1", "s2", "s3") + addToCache("key4", "s2", "s3") + addToCache("key5", "s2") + addToCache("key6") + addToCache("key7", "s4", "s3") + require.Equal(t, 7, len(lc.lc.Keys()), "cache init") + } + + tbl := []struct { + scopes []string + left int + msg string + }{ + {[]string{}, 0, "full flush, no scopes"}, + {[]string{"s0"}, 7, "flush wrong scope"}, + {[]string{"s1"}, 4, "flush s1 scope"}, + {[]string{"s2", "s1"}, 2, "flush s2+s1 scope"}, + {[]string{"s1", "s2"}, 2, "flush s1+s2 scope"}, + {[]string{"s1", "s2", "s4"}, 1, "flush s1+s2+s4 scope"}, + {[]string{"s1", "s2", "s3"}, 1, "flush s1+s2+s3 scope"}, + {[]string{"s1", "s2", "ss"}, 2, "flush s1+s2+wrong scope"}, + } + + for i, tt := range tbl { + tt := tt + i := i + t.Run(tt.msg, func(t *testing.T) { + init() + lc.Flush(Flusher("site").Scopes(tt.scopes...)) + assert.Equal(t, tt.left, len(lc.lc.Keys()), "keys size, %s #%d", tt.msg, i) + }) + } +} + +func TestScache_FlushFailed(t *testing.T) { + lru, err := NewLruCache[[]byte]() + require.NoError(t, err) + lc := NewScache[[]byte](lru) + + val, err := lc.Get(NewKey("site").ID("invalid-composite"), func() ([]byte, error) { + return []byte("value"), nil + }) + assert.NoError(t, err) + assert.Equal(t, "value", string(val)) + assert.Equal(t, 1, len(lc.lc.Keys())) + + lc.Flush(Flusher("site").Scopes("invalid-composite")) + assert.Equal(t, 1, len(lc.lc.Keys())) +} + +func TestScope_Key(t *testing.T) { + tbl := []struct { + key string + partition string + scopes []string + full string + }{ + {"key1", "p1", []string{"s1"}, "p1@@key1@@s1"}, + {"key2", "p2", []string{"s11", "s2"}, "p2@@key2@@s11$$s2"}, + {"key3", "", []string{}, "@@key3@@"}, + {"key3", "", []string{"xx", "yyy"}, "@@key3@@xx$$yyy"}, + } + + for _, tt := range tbl { + tt := tt + t.Run(tt.full, func(t *testing.T) { + k := NewKey(tt.partition).ID(tt.key).Scopes(tt.scopes...) + assert.Equal(t, tt.full, k.String()) + k, err := parseKey(tt.full) + require.NoError(t, err) + assert.Equal(t, tt.partition, k.partition) + assert.Equal(t, tt.key, k.id) + assert.Equal(t, tt.scopes, k.scopes) + }) + } + + // without partition + k := NewKey().ID("id1").Scopes("s1", "s2") + assert.Equal(t, "@@id1@@s1$$s2", k.String()) + + // parse invalid key strings + _, err := parseKey("abc") + assert.Error(t, err) + _, err = parseKey("") + assert.Error(t, err) +} + +func TestScache_Parallel(t *testing.T) { + var coldCalls int32 + lru, err := NewLruCache[[]byte]() + require.NoError(t, err) + lc := NewScache[[]byte](lru) + + res, err := lc.Get(NewKey("site").ID("key"), func() ([]byte, error) { + return []byte("value"), nil + }) + assert.NoError(t, err) + assert.Equal(t, "value", string(res)) + + wg := sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + res, err := lc.Get(NewKey("site").ID("key"), func() ([]byte, error) { + atomic.AddInt32(&coldCalls, 1) + return []byte(fmt.Sprintf("result-%d", i)), nil + }) + require.NoError(t, err) + require.Equal(t, "value", string(res)) + }() + } + wg.Wait() + assert.Equal(t, int32(0), atomic.LoadInt32(&coldCalls)) +} + +// LruCache illustrates the use of LRU loading cache +func ExampleScache() { + // set up test server for single response + var hitCount int + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() == "/post/42" && hitCount == 0 { + _, _ = w.Write([]byte("test response")) + return + } + w.WriteHeader(404) + })) + + // load page function + loadURL := func(url string) ([]byte, error) { + resp, err := http.Get(url) // nolint + if err != nil { + return nil, err + } + b, err := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return nil, err + } + return b, nil + } + + // fixed size LRU cache, 100 items, up to 10k in total size + o := NewOpts[[]byte]() + backend, err := NewLruCache(o.MaxKeys(100), o.MaxCacheSize(10*1024)) + if err != nil { + log.Fatalf("can't make lru cache, %v", err) + } + + cache := NewScache[[]byte](backend) + + // url not in cache, load data + url := ts.URL + "/post/42" + key := NewKey().ID(url).Scopes("test") + val, err := cache.Get(key, func() (val []byte, err error) { + return loadURL(url) + }) + if err != nil { + log.Fatalf("can't load url %s, %v", url, err) + } + fmt.Println(string(val)) + + // url not in cache, load data + key = NewKey().ID(url).Scopes("test") + val, err = cache.Get(key, func() (val []byte, err error) { + return loadURL(url) + }) + if err != nil { + log.Fatalf("can't load url %s, %v", url, err) + } + fmt.Println(string(val)) + + // url cached, skip load and get from the cache + key = NewKey().ID(url).Scopes("test") + val, err = cache.Get(key, func() (val []byte, err error) { + return loadURL(url) + }) + if err != nil { + log.Fatalf("can't load url %s, %v", url, err) + } + fmt.Println(string(val)) + + // get cache stats + stats := cache.Stat() + fmt.Printf("%+v\n", stats) + + // close cache and test HTTP server after all log.Fatalf are passed + ts.Close() + err = cache.Close() + if err != nil { + log.Fatalf("can't close cache %v", err) + } + + // Output: + // test response + // test response + // test response + // {hits:2, misses:1, ratio:0.67, keys:1, size:0, errors:0} +} diff --git a/v2/url.go b/v2/url.go new file mode 100644 index 0000000..8328696 --- /dev/null +++ b/v2/url.go @@ -0,0 +1,137 @@ +package lcw + +import ( + "fmt" + "net/url" + "strconv" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/redis/go-redis/v9" +) + +// New parses uri and makes any of supported caches +// supported URIs: +// - redis://:?db=123&max_keys=10 +// - mem://lru?max_keys=10&max_cache_size=1024 +// - mem://expirable?ttl=30s&max_val_size=100 +// - nop:// +func New[V any](uri string) (LoadingCache[V], error) { + u, err := url.Parse(uri) + if err != nil { + return nil, fmt.Errorf("parse cache uri %s: %w", uri, err) + } + + query := u.Query() + opts, err := optionsFromQuery[V](query) + if err != nil { + return nil, fmt.Errorf("parse uri options %s: %w", uri, err) + } + + switch u.Scheme { + case "redis": + redisOpts, e := redisOptionsFromURL(u) + if e != nil { + return nil, e + } + res, e := NewRedisCache(redis.NewClient(redisOpts), opts...) + if e != nil { + return nil, fmt.Errorf("make redis for %s: %w", uri, e) + } + return res, nil + case "mem": + switch u.Hostname() { + case "lru": + return NewLruCache[V](opts...) + case "expirable": + return NewExpirableCache[V](opts...) + default: + return nil, fmt.Errorf("unsupported mem cache type %s", u.Hostname()) + } + case "nop": + return NewNopCache[V](), nil + } + return nil, fmt.Errorf("unsupported cache type %s", u.Scheme) +} + +func optionsFromQuery[V any](q url.Values) (opts []Option[V], err error) { + errs := new(multierror.Error) + o := NewOpts[V]() + + if v := q.Get("max_val_size"); v != "" { + vv, e := strconv.Atoi(v) + if e != nil { + errs = multierror.Append(errs, fmt.Errorf("max_val_size query param %s: %w", v, e)) + } else { + opts = append(opts, o.MaxValSize(vv)) + } + } + + if v := q.Get("max_key_size"); v != "" { + vv, e := strconv.Atoi(v) + if e != nil { + errs = multierror.Append(errs, fmt.Errorf("max_key_size query param %s: %w", v, e)) + } else { + opts = append(opts, o.MaxKeySize(vv)) + } + } + + if v := q.Get("max_keys"); v != "" { + vv, e := strconv.Atoi(v) + if e != nil { + errs = multierror.Append(errs, fmt.Errorf("max_keys query param %s: %w", v, e)) + } else { + opts = append(opts, o.MaxKeys(vv)) + } + } + + if v := q.Get("max_cache_size"); v != "" { + vv, e := strconv.ParseInt(v, 10, 64) + if e != nil { + errs = multierror.Append(errs, fmt.Errorf("max_cache_size query param %s: %w", v, e)) + } else { + opts = append(opts, o.MaxCacheSize(vv)) + } + } + + if v := q.Get("ttl"); v != "" { + vv, e := time.ParseDuration(v) + if e != nil { + errs = multierror.Append(errs, fmt.Errorf("ttl query param %s: %w", v, e)) + } else { + opts = append(opts, o.TTL(vv)) + } + } + + return opts, errs.ErrorOrNil() +} + +func redisOptionsFromURL(u *url.URL) (*redis.Options, error) { + query := u.Query() + + db, err := strconv.Atoi(query.Get("db")) + if err != nil { + return nil, fmt.Errorf("db from %s: %w", u, err) + } + + res := &redis.Options{ + Addr: u.Hostname() + ":" + u.Port(), + DB: db, + Password: query.Get("password"), + Network: query.Get("network"), + } + + if dialTimeout, err := time.ParseDuration(query.Get("dial_timeout")); err == nil { + res.DialTimeout = dialTimeout + } + + if readTimeout, err := time.ParseDuration(query.Get("read_timeout")); err == nil { + res.ReadTimeout = readTimeout + } + + if writeTimeout, err := time.ParseDuration(query.Get("write_timeout")); err == nil { + res.WriteTimeout = writeTimeout + } + + return res, nil +} diff --git a/v2/url_test.go b/v2/url_test.go new file mode 100644 index 0000000..ee5e0b2 --- /dev/null +++ b/v2/url_test.go @@ -0,0 +1,140 @@ +package lcw + +import ( + "fmt" + "net/url" + "strconv" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUrl_optionsFromQuery(t *testing.T) { + tbl := []struct { + url string + num int + fail bool + }{ + {"mem://lru?ttl=26s&max_keys=100&max_val_size=1024&max_key_size=64&max_cache_size=111", 5, false}, + {"mem://lru?ttl=26s&max_keys=100&foo=bar", 2, false}, + {"mem://lru?ttl=xx26s&max_keys=100&foo=bar", 0, true}, + {"mem://lru?foo=bar", 0, false}, + {"mem://lru?foo=bar&max_keys=abcd", 0, true}, + {"mem://lru?foo=bar&max_val_size=abcd", 0, true}, + {"mem://lru?foo=bar&max_cache_size=abcd", 0, true}, + {"mem://lru?foo=bar&max_key_size=abcd", 0, true}, + } + + for i, tt := range tbl { + tt := tt + t.Run(strconv.Itoa(i), func(t *testing.T) { + u, err := url.Parse(tt.url) + require.NoError(t, err) + r, err := optionsFromQuery[string](u.Query()) + if tt.fail { + require.Error(t, err) + return + } + assert.Equal(t, tt.num, len(r)) + }) + } +} + +func TestUrl_redisOptionsFromURL(t *testing.T) { + tbl := []struct { + url string + fail bool + opts redis.Options + }{ + {"redis://127.0.0.1:12345?db=xa19", true, redis.Options{}}, + {"redis://127.0.0.1:12345?foo=bar&max_keys=abcd&db=19", false, redis.Options{Addr: "127.0.0.1:12345", DB: 19}}, + { + "redis://127.0.0.1:12345?db=19&password=xyz&network=tcp4&dial_timeout=1s&read_timeout=2s&write_timeout=3m", + false, redis.Options{Addr: "127.0.0.1:12345", DB: 19, Password: "xyz", Network: "tcp4", + DialTimeout: 1 * time.Second, ReadTimeout: 2 * time.Second, WriteTimeout: 3 * time.Minute}, + }, + } + + for i, tt := range tbl { + tt := tt + t.Run(strconv.Itoa(i), func(t *testing.T) { + u, err := url.Parse(tt.url) + require.NoError(t, err) + r, err := redisOptionsFromURL(u) + if tt.fail { + require.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tt.opts, *r) + }) + } +} + +func TestUrl_NewLru(t *testing.T) { + u := "mem://lru?max_keys=10" + res, err := New[string](u) + require.NoError(t, err) + r, ok := res.(*LruCache[string]) + require.True(t, ok) + assert.Equal(t, 10, r.maxKeys) +} + +func TestUrl_NewExpirable(t *testing.T) { + u := "mem://expirable?max_keys=10&ttl=30m" + res, err := New[string](u) + require.NoError(t, err) + defer res.Close() + r, ok := res.(*ExpirableCache[string]) + require.True(t, ok) + assert.Equal(t, 10, r.maxKeys) + assert.Equal(t, 30*time.Minute, r.ttl) +} + +func TestUrl_NewNop(t *testing.T) { + u := "nop://" + res, err := New[string](u) + require.NoError(t, err) + _, ok := res.(*Nop[string]) + require.True(t, ok) +} + +func TestUrl_NewRedis(t *testing.T) { + srv := newTestRedisServer() + defer srv.Close() + u := fmt.Sprintf("redis://%s?db=1&ttl=10s", srv.Addr()) + res, err := New[string](u) + require.NoError(t, err) + defer res.Close() + r, ok := res.(*RedisCache[string]) + require.True(t, ok) + assert.Equal(t, 10*time.Second, r.ttl) + + u = fmt.Sprintf("redis://%s?db=1&ttl=zz10s", srv.Addr()) + _, err = New[string](u) + require.Error(t, err) + assert.Contains(t, err.Error(), "ttl query param zz10s: time: invalid duration") + + _, err = New[string]("redis://localhost:xxx?db=1") + require.Error(t, err) + assert.Contains(t, err.Error(), "parse cache uri redis://localhost:xxx?db=1: parse") + assert.Contains(t, err.Error(), "redis://localhost:xxx?db=1") + assert.Contains(t, err.Error(), "invalid port \":xxx\" after host") +} + +func TestUrl_NewFailed(t *testing.T) { + u := "blah://ip?foo=bar" + _, err := New[string](u) + require.EqualError(t, err, "unsupported cache type blah") + + u = "mem://blah?foo=bar" + _, err = New[string](u) + require.EqualError(t, err, "unsupported mem cache type blah") + + u = "mem://lru?max_keys=xyz" + _, err = New[string](u) + require.EqualError(t, err, "parse uri options mem://lru?max_keys=xyz: 1 error occurred:\n\t* max_keys query param xyz: strconv.Atoi: parsing \"xyz\": invalid syntax\n\n") +}