Skip to content

Commit

Permalink
Add cache policy
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Apr 7, 2024
1 parent 9e1f1c7 commit e6e6984
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 2 deletions.
121 changes: 121 additions & 0 deletions cachepolicy/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package cachepolicy

import (
"github.com/failsafe-go/failsafe-go"
"github.com/failsafe-go/failsafe-go/policy"
)

// CacheKey is a key to use with a Context that stores the key to be used when getting or setting items in a cache.
var CacheKey = "failsafe-cachepolicy"

// Cache is a simple interface for cached values that can be adapted to different cache backends.
type Cache[R any] interface {
// Get gets and returns a cache entry along with a flag indicating if it's present.
Get(key string) (R, bool)

// Set stores a value for the key in the cache.
Set(key string, value R)
}

// CachePolicy is a read through cachePolicy that returns a cached result for the CacheKey in the context provided with the execution, if it exists.
//
// This type is concurrency safe.
type CachePolicy[R any] interface {
failsafe.Policy[R]
}

// CachePolicyBuilder builds CachePolicy instances. In order for the cache policy to be used, a key must be provided via
// WithKey, or via a Context when the execution is performed using a value stored under the CacheKey in the Context. A
// cache key stored in a Context under the CacheKey takes precedence over a cache key configured via WithKey.
//
// This type is not concurrency safe.
type CachePolicyBuilder[R any] interface {
//failsafe.FailurePolicyBuilder[CachePolicyBuilder[R], R]

// WithKey builds caches that store successful execution results in a cache with the key.
WithKey(key string) CachePolicyBuilder[R]

// CacheIf specifies that a value result should only be cached if it satisfies the predicate.
CacheIf(predicate func(R, error) bool) CachePolicyBuilder[R]

// OnCacheHit registers the listener to be called when the cachePolicy entry is hit during an execution.
OnCacheHit(listener func(event failsafe.ExecutionEvent[R])) CachePolicyBuilder[R]

// OnCacheMiss registers the listener to be called when the cachePolicy entry is missed during an execution.
OnCacheMiss(listener func(event failsafe.ExecutionEvent[R])) CachePolicyBuilder[R]

// OnResultCached registers the listener to be called when a result is cached.
OnResultCached(listener func(event failsafe.ExecutionEvent[R])) CachePolicyBuilder[R]

// Build returns a new CachePolicy using the builder's configuration.
Build() CachePolicy[R]
}

type cachePolicyConfig[R any] struct {
*policy.BaseFailurePolicy[R]
cache Cache[R]
key string
onHit func(failsafe.ExecutionEvent[R])
onMiss func(failsafe.ExecutionEvent[R])
}

var _ CachePolicyBuilder[any] = &cachePolicyConfig[any]{}

type cachePolicy[R any] struct {
config *cachePolicyConfig[R]
}

// With returns a new CachePolicy.
func With[R any](cache Cache[R]) CachePolicy[R] {
return Builder[R](cache).Build()
}

// Builder returns a CachePolicyBuilder.
func Builder[R any](cache Cache[R]) CachePolicyBuilder[R] {
return &cachePolicyConfig[R]{
BaseFailurePolicy: &policy.BaseFailurePolicy[R]{},
cache: cache,
}
}

func (c *cachePolicyConfig[R]) CacheIf(predicate func(R, error) bool) CachePolicyBuilder[R] {
c.BaseFailurePolicy.HandleIf(predicate)
return c
}

func (c *cachePolicyConfig[R]) WithKey(key string) CachePolicyBuilder[R] {
c.key = key
return c
}

func (c *cachePolicyConfig[R]) OnResultCached(listener func(event failsafe.ExecutionEvent[R])) CachePolicyBuilder[R] {
c.BaseFailurePolicy.OnSuccess(listener)
return c
}

func (c *cachePolicyConfig[R]) OnCacheHit(listener func(event failsafe.ExecutionEvent[R])) CachePolicyBuilder[R] {
c.onHit = listener
return c
}

func (c *cachePolicyConfig[R]) OnCacheMiss(listener func(event failsafe.ExecutionEvent[R])) CachePolicyBuilder[R] {
c.onMiss = listener
return c
}

func (c *cachePolicyConfig[R]) Build() CachePolicy[R] {
return &cachePolicy[R]{
config: c, // TODO copy base fields
}
}

func (c *cachePolicy[R]) ToExecutor(_ R) any {
ce := &cacheExecutor[R]{
BaseExecutor: &policy.BaseExecutor[R]{
BaseFailurePolicy: c.config.BaseFailurePolicy,
},
cachePolicy: c,
}
ce.Executor = ce
return ce
}
59 changes: 59 additions & 0 deletions cachepolicy/cacheexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package cachepolicy

import (
"context"

"github.com/failsafe-go/failsafe-go"
"github.com/failsafe-go/failsafe-go/common"
"github.com/failsafe-go/failsafe-go/policy"
)

// cacheExecutor is a policy.Executor that handles failures according to a CachePolicy.
type cacheExecutor[R any] struct {
*policy.BaseExecutor[R]
*cachePolicy[R]
}

var _ policy.Executor[any] = &cacheExecutor[any]{}

func (e *cacheExecutor[R]) PreExecute(exec policy.ExecutionInternal[R]) *common.PolicyResult[R] {
execInternal := exec.(policy.ExecutionInternal[R])
if cacheKey := e.getCacheKey(exec.Context()); cacheKey != "" {
if cacheResult, found := e.config.cache.Get(cacheKey); found {
result := &common.PolicyResult[R]{
Result: cacheResult,
Done: true,
Success: true,
SuccessAll: true,
}
if e.config.onHit != nil {
e.config.onHit(failsafe.ExecutionEvent[R]{
ExecutionAttempt: execInternal.CopyWithResult(result),
})
}
return result
}
}
if e.config.onMiss != nil {
e.config.onMiss(failsafe.ExecutionEvent[R]{
ExecutionAttempt: execInternal,
})
}
return nil
}

func (e *cacheExecutor[R]) OnSuccess(exec policy.ExecutionInternal[R], result *common.PolicyResult[R]) {
if cacheKey := e.getCacheKey(exec.Context()); cacheKey != "" {
e.config.cache.Set(cacheKey, result.Result)
e.BaseExecutor.OnSuccess(exec, result)
}
}

func (e *cacheExecutor[R]) getCacheKey(ctx context.Context) string {
if untypedKey := ctx.Value(CacheKey); untypedKey != nil {
if typedKey, ok := untypedKey.(string); ok {
return typedKey
}
}
return e.config.key
}
2 changes: 2 additions & 0 deletions cachepolicy/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package cachepolicy provides a CachePolicy policy.
package cachepolicy
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE=
github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 h1:k5II8e6QD8mITdi+okbbmR/cIyEbeXLBhy5Ha4nevyc=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
30 changes: 30 additions & 0 deletions internal/policytesting/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package policytesting

import (
"github.com/dgraph-io/ristretto"

cache "github.com/failsafe-go/failsafe-go/cachepolicy"
)

type ristrettoCache[R any] struct {
cache *ristretto.Cache
}

func (r *ristrettoCache[R]) Get(key string) (R, bool) {
if result, found := r.cache.Get(key); found {
if typedResult, ok := result.(R); ok {
return typedResult, true
}
}
return *(new(R)), false
}

func (r *ristrettoCache[R]) Set(key string, value R) {
r.cache.Set(key, value, 0)
}

func NewCache[R any](cache *ristretto.Cache) cache.Cache[R] {
return &ristrettoCache[R]{
cache: cache,
}
}
37 changes: 36 additions & 1 deletion internal/policytesting/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/failsafe-go/failsafe-go"
"github.com/failsafe-go/failsafe-go/bulkhead"
"github.com/failsafe-go/failsafe-go/cachepolicy"
"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/failsafe-go/failsafe-go/fallback"
"github.com/failsafe-go/failsafe-go/hedgepolicy"
Expand Down Expand Up @@ -118,7 +119,7 @@ func WithHedgeStatsAndLogs[R any](hp hedgepolicy.HedgePolicyBuilder[R], stats *S
return hp
}

func BulkheadStatsAndLogs[R any](bh bulkhead.BulkheadBuilder[R], stats *Stats, withLogging bool) bulkhead.BulkheadBuilder[R] {
func WithBulkheadStatsAndLogs[R any](bh bulkhead.BulkheadBuilder[R], stats *Stats, withLogging bool) bulkhead.BulkheadBuilder[R] {
bh.OnFull(func(event failsafe.ExecutionEvent[R]) {
if withLogging {
stats.fullCount.Add(1)
Expand All @@ -128,6 +129,17 @@ func BulkheadStatsAndLogs[R any](bh bulkhead.BulkheadBuilder[R], stats *Stats, w
return bh
}

func WithCacheStats[R any](cp cachepolicy.CachePolicyBuilder[R], stats *Stats) cachepolicy.CachePolicyBuilder[R] {
cp.OnCacheHit(func(e failsafe.ExecutionEvent[R]) {
stats.cacheHits.Add(1)
}).OnCacheMiss(func(e failsafe.ExecutionEvent[R]) {
stats.cacheMisses.Add(1)
}).OnResultCached(func(event failsafe.ExecutionEvent[R]) {
stats.caches.Add(1)
})
return cp
}

func withStatsAndLogs[P any, R any](policy failsafe.FailurePolicyBuilder[P, R], stats *Stats, withLogging bool) {
policy.OnSuccess(func(e failsafe.ExecutionEvent[R]) {
stats.executionCount.Add(1)
Expand Down Expand Up @@ -162,6 +174,12 @@ type Stats struct {

// Bulkhead specific stats
fullCount atomic.Int32

// Cache specific stats
caches atomic.Int32
cacheHits atomic.Int32
cacheMisses atomic.Int32
cachedCount atomic.Int32
}

func (s *Stats) Executions() int {
Expand Down Expand Up @@ -196,6 +214,18 @@ func (s *Stats) Fulls() int {
return int(s.fullCount.Load())
}

func (s *Stats) CacheHits() int {
return int(s.cacheHits.Load())
}

func (s *Stats) CacheMisses() int {
return int(s.cacheMisses.Load())
}

func (s *Stats) Caches() int {
return int(s.caches.Load())
}

func (s *Stats) Reset() {
s.executionCount.Store(0)
s.successCount.Store(0)
Expand All @@ -211,6 +241,11 @@ func (s *Stats) Reset() {

// Bulkhead specific stats
s.fullCount.Store(0)

// Cache specific stats
s.caches.Store(0)
s.cacheHits.Store(0)
s.cacheMisses.Store(0)
}

func SetupFn(stats *Stats) func() context.Context {
Expand Down
2 changes: 1 addition & 1 deletion test/bulkhead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestBulkheadPermitAcquiredAfterWait(t *testing.T) {
func TestBulkheadFull(t *testing.T) {
// Given
stats := &policytesting.Stats{}
bh := policytesting.BulkheadStatsAndLogs(bulkhead.Builder[any](2), stats, true).Build()
bh := policytesting.WithBulkheadStatsAndLogs(bulkhead.Builder[any](2), stats, true).Build()
bh.TryAcquirePermit()
bh.TryAcquirePermit() // bulkhead should be full

Expand Down
Loading

0 comments on commit e6e6984

Please sign in to comment.