Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ccpgames/groupcache/v2
go 1.25.0

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/golang/protobuf v1.5.4
github.com/mailgun/holster/v4 v4.20.3
github.com/prometheus/client_golang v1.20.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/ahmetb/go-linq v3.0.0+incompatible h1:qQkjjOXKrKOTy83X8OpRmnKflXKQIL/
github.com/ahmetb/go-linq v3.0.0+incompatible/go.mod h1:PFffvbdbtw+QTB0WKRP0cNht7vnCfnGlEpak/DVg5cY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
241 changes: 153 additions & 88 deletions groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,22 @@ import (
pb "github.com/ccpgames/groupcache/v2/groupcachepb"
"github.com/ccpgames/groupcache/v2/lru"
"github.com/ccpgames/groupcache/v2/singleflight"
"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const (
remoteOperationTimeout = 30 * time.Second
remoteOperationTimeout = 30 * time.Second
maxRemoteCallErrorRetryTime = 250 * time.Millisecond
remoteCallErrorBackoffInitial = time.Millisecond
remoteCallErrorBackoffMaxInterval = 20 * time.Millisecond

otelAttributePeerURL = attribute.Key("app.groupcache.peer_url")
)

var logger Logger
var logger Logger = NoopLogger{}

// SetLogger - this is legacy to provide backwards compatibility with logrus.
func SetLogger(log *logrus.Entry) {
Expand Down Expand Up @@ -319,33 +327,19 @@ func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.T
}

_, err := g.setGroup.Do(ctx, key, func() (interface{}, error) {
// If remote peer owns this key
owner, ok := g.peers.PickPeer(key)
if ok {
// we don't own the key
// first, let's set the value remotely to the owner peer
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
return nil, err
}
// If hotCache is true, we set local hot cache.
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localHotCacheSet(key, value, expire)
}
// and then we clear the key from all the peers that are not key owner to prevent stale values from being served from those peers.
err := g.removeFromPeers(ctx, key, owner)
if err != nil {
return nil, err
}
executed, err := g.setIfRemoteOwner(ctx, key, value, expire, hotCache)
if err != nil {
return nil, err
}
if executed {
return nil, nil
}
// we own this key so we populate the main cache
g.localMainCacheSet(key, value, expire)
// and we always populate the hot cache to maximize chances of early cache hits
g.localHotCacheSet(key, value, expire)
// and then let's call all other peers to clear the key from their caches
err := g.removeFromPeers(ctx, key, nil)
err = g.removeFromPeers(ctx, key, nil)
if err != nil {
return nil, err
}
Expand All @@ -360,21 +354,15 @@ func (g *Group) Remove(ctx context.Context, key string) error {
g.peersOnce.Do(g.initPeers)

_, err := g.removeGroup.Do(ctx, key, func() (interface{}, error) {
var ownerRemoveErr error
remoteOwner, remoteOwnerRemoveErr := g.removeFromRemoteOwner(ctx, key)

// let's check who own the key
owner, ok := g.peers.PickPeer(key)
if ok {
// if we don't own the key let's attempt first to remotely remove it from the owner peer
ownerRemoveErr = g.removeFromPeer(ctx, owner, key)
}
// remove from our cache next
g.localRemove(key)

// then clear the key from all hot and main caches of peers
peersRemoveErr := g.removeFromPeers(ctx, key, owner)
peersRemoveErr := g.removeFromPeers(ctx, key, remoteOwner)

return nil, errors.Join(ownerRemoveErr, peersRemoveErr)
return nil, errors.Join(remoteOwnerRemoveErr, peersRemoveErr)
})
return err
}
Expand Down Expand Up @@ -452,62 +440,14 @@ func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
if peer, ok := g.peers.PickPeer(key); ok {
// we do not own the key
// let's check the hot cache again because it might got populated before we entered
// singleflight
if value, cacheHit := g.lookupHotCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
// metrics duration start
start := time.Now()

// get value from peers
value, err = g.getFromPeer(ctx, peer, key)

// metrics duration compute
duration := int64(time.Since(start)) / int64(time.Millisecond)

// metrics only store the slowest duration
if g.Stats.GetFromPeersLatencyLower.Get() < duration {
g.Stats.GetFromPeersLatencyLower.Store(duration)
}

if err == nil {
// populate hot cache with the value retrieved from peer
g.populateHotCache(key, value)
g.Stats.PeerLoads.Add(1)
return value, nil
}

if errors.Is(err, context.Canceled) {
return nil, err
}

if errors.Is(err, &ErrNotFound{}) {
return nil, err
}

if errors.Is(err, &ErrRemoteCall{}) {
return nil, err
}

if logger != nil {
logger.Error().
WithFields(map[string]interface{}{
"err": err,
"key": key,
"category": "groupcache",
}).Printf("error retrieving key from peer '%s'", peer.GetURL())
}

g.Stats.PeerErrors.Add(1)
if ctx != nil && ctx.Err() != nil {
// if the context has expired, we return the context error instead of the peer error to avoid retrying on peer errors
// when the context has already expired
return nil, ctx.Err()
}
var loadedFromRemote bool
loadedFromRemote, value, err = g.loadFromRemoteIfRemoteOwner(ctx, key)
if loadedFromRemote && err == nil {
return value, nil
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, &ErrNotFound{}) {
return nil, err
// otherwise we want to attempt to load locally
}

value, destPopulated, err = g.loadLocally(ctx, key, dest)
Expand All @@ -522,7 +462,7 @@ func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView
return
}

// load loads key either by invoking the getter locally or by sending it to another machine.
// loadLocally loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) loadLocally(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.LocalLoads.Add(1)
// Load locally:
Expand Down Expand Up @@ -588,6 +528,131 @@ func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView
return dest.view()
}

func (g *Group) loadFromRemoteIfRemoteOwner(ctx context.Context, key string) (loaded bool, value ByteView, err error) {
loaded, _, err = g.callRemoteIfRemoteOwner(ctx, key, func(ctx context.Context, peer ProtoGetter) error {
// we do not own the key
// let's check the hot cache again because it might got populated before we entered
// singleflight
var cacheHit bool
if value, cacheHit = g.lookupHotCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return nil
}

// metrics duration start
start := time.Now()

// get value from peers
value, err = g.getFromPeer(ctx, peer, key)

// metrics duration compute
duration := int64(time.Since(start)) / int64(time.Millisecond)

// metrics only store the slowest duration
if g.Stats.GetFromPeersLatencyLower.Get() < duration {
g.Stats.GetFromPeersLatencyLower.Store(duration)
}
if err == nil {
// populate hot cache with the value retrieved from peer
g.populateHotCache(key, value)
g.Stats.PeerLoads.Add(1)
return nil
}
return err
})
return loaded, value, err
}

func (g *Group) setIfRemoteOwner(ctx context.Context, key string, value []byte, expire time.Time, hotCache bool) (bool, error) {
remoteOwner, owner, err := g.callRemoteIfRemoteOwner(ctx, key, func(ctx context.Context, peer ProtoGetter) error {
// we do not own the key, so we set the value remotely to the owner peer
return g.setFromPeer(ctx, peer, key, value, expire)
})
if err != nil {
return remoteOwner, err
}
if remoteOwner {
// If hotCache is true, we set local hot cache.
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localHotCacheSet(key, value, expire)
}
// and then we clear the key from all the peers that are not key owner to prevent stale values from being served from those peers.
err := g.removeFromPeers(ctx, key, owner)
if err != nil {
return true, err
}
return true, nil
}
return false, nil
}

func (g *Group) removeFromRemoteOwner(ctx context.Context, key string) (ProtoGetter, error) {
remoteOwner, owner, err := g.callRemoteIfRemoteOwner(ctx, key, func(ctx context.Context, peer ProtoGetter) error {
// we do not own the key, so we remove the key remotely from the owner peer
return g.removeFromPeer(ctx, peer, key)
})
if err != nil {
return owner, err
}
if remoteOwner {
return owner, nil
}
return nil, nil
}

// callRemoteIfRemoteOwner calls the given function if the key is owned by a remote peer. It retries the call if there is an remote call error while calling the remote peer
// until maxRemoteCallErrorRetryTime is reached.
// It returns a boolean indicating whether the owner is remote, the ProtoGetter of the remote owner if the owner is remote, and an error if any error occurs during the call.
func (g *Group) callRemoteIfRemoteOwner(ctx context.Context, key string, fn func(ctx context.Context, peer ProtoGetter) error) (bool, ProtoGetter, error) {
eb := backoff.NewExponentialBackOff()
eb.InitialInterval = remoteCallErrorBackoffInitial
eb.MaxInterval = remoteCallErrorBackoffMaxInterval
eb.MaxElapsedTime = maxRemoteCallErrorRetryTime
boCtx := backoff.WithContext(eb, ctx)

var remoteOwner bool
var owner ProtoGetter

err := backoff.Retry(func() error {
var ok bool
owner, ok = g.peers.PickPeer(key)
if !ok {
remoteOwner = false
return nil
}
remoteOwner = true

err := fn(ctx, owner)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, &ErrNotFound{}) {
// we don't want to record or log them, this is not really an error
return backoff.Permanent(err)
}
g.Stats.PeerErrors.Add(1)
trace.SpanFromContext(ctx).RecordError(err, trace.WithAttributes(otelAttributePeerURL.String(owner.GetURL())))
if errors.Is(err, &ErrRemoteCall{}) {
logger.Info().WithFields(map[string]interface{}{
"err": err,
"key": key,
"category": "groupcache",
}).Printf("error calling peer '%s'", owner.GetURL())
return err
}
logger.Error().WithFields(map[string]interface{}{
"err": err,
"key": key,
"category": "groupcache",
}).Printf("unexpected error calling peer '%s'", owner.GetURL())
return backoff.Permanent(err)
}

return nil
}, boCtx)
return remoteOwner, owner, err
}

func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
ctx, cancel := context.WithTimeout(ctx, remoteOperationTimeout)
defer cancel()
Expand Down
19 changes: 12 additions & 7 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ import (
const defaultBasePath = "/_groupcache/"
const defaultReplicas = 50

var (
errPeerClosed = errors.New("groupcache: peer is closed")
)

// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
// this peer's base URL, e.g. "https://example.net:8000"
Expand Down Expand Up @@ -137,8 +133,8 @@ func (p *HTTPPool) Set(peers ...string) {
if _, ok := peerSet[peer]; !ok {
// close the peers being removed to terminate all ongoing requests to them
v.Close()
delete(p.httpGetters, peer)
}
delete(p.httpGetters, peer)
}

for _, peer := range peers {
Expand Down Expand Up @@ -311,10 +307,19 @@ func (h *httpGetter) makeRequest(ctx context.Context, m string, in request, b io
url.PathEscape(in.GetGroup()),
url.PathEscape(in.GetKey()),
)

ctx, cancel := context.WithCancelCause(ctx)
context.AfterFunc(h.peerLifetimeCtx, func() {
cancel(errPeerClosed)
defer cancel(nil)
stop := context.AfterFunc(h.peerLifetimeCtx, func() {
cancel(&ErrRemoteCall{Msg: "peer was closed, likely removed from the peer pool"})
})
defer stop()
select {
case <-h.peerLifetimeCtx.Done():
return &ErrRemoteCall{Msg: "peer was closed, likely removed from the peer pool"}
default:
}

req, err := http.NewRequestWithContext(ctx, m, u, b)
if err != nil {
return err
Expand Down
Loading