Skip to content

Commit

Permalink
Introduce support for per-call memory allocator for caching
Browse files Browse the repository at this point in the history
This change adds the ability to pass a memory allocator to caching
methods via their context argument. This allows the underlying client
to use memory allocators better suited to their workloads than GC.
This is only used by the Memcached client at the moment.

See grafana/mimir#3772

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Jan 5, 2023
1 parent 356662f commit 829f347
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* [ENHANCEMENT] Add the ability to define custom gRPC health checks. #227
* [ENHANCEMENT] Import Bytes type, DeleteAll function and DNS package from Thanos. #228
* [ENHANCEMENT] Execute health checks in ring client pool concurrently. #237
* [ENHANCEMENT] Cache: Add the ability to use a custom memory allocator for cache results. #249
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
37 changes: 32 additions & 5 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
_ RemoteCacheClient = (*memcachedClient)(nil)
_ RemoteCacheClient = (*redisClient)(nil)
type contextKey int

_ Cache = (*MemcachedCache)(nil)
_ Cache = (*RedisCache)(nil)
const (
contextKeyAllocator contextKey = 0
)

// RemoteCacheClient is a high level client to interact with remote cache.
Expand Down Expand Up @@ -52,6 +50,35 @@ type Cache interface {
Name() string
}

// WithAllocator returns a Context that makes use of a specific memory Allocator
// for result values loaded by a cache.
func WithAllocator(ctx context.Context, alloc Allocator) context.Context {
return context.WithValue(ctx, contextKeyAllocator, alloc)
}

// GetAllocator returns the Allocator set for this particular context, if any.
func GetAllocator(ctx context.Context) Allocator {
val := ctx.Value(contextKeyAllocator)
if val != nil {
return val.(Allocator)
}

return nil
}

// Allocator allows memory for cache result values to be managed by callers instead of by
// a cache client itself. For example, this can be used by callers to implement arena-style
// memory management if a workload tends to be request-centric.
type Allocator interface {
// Get returns a byte slice with at least sz capacity. Length of the slice is
// not guaranteed and so must be asserted by callers (cache clients).
Get(sz int) *[]byte
// Put returns the byte slice to the underlying allocator. The cache clients
// will only call this method during error handling when allocated values are
// not returned to the caller as cache results.
Put(b *[]byte)
}

const (
BackendMemcached = "memcached"
BackendRedis = "redis"
Expand Down
18 changes: 18 additions & 0 deletions cache/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"net"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -146,6 +147,23 @@ func (c *baseClient) setAsync(ctx context.Context, key string, value []byte, ttl
return err
}

// wait submits an async task and blocks until it completes. This can be used during tests
// to ensure that async "sets" have completed before attempting to read them.
func (c *baseClient) wait() error {
var wg sync.WaitGroup

wg.Add(1)
err := c.asyncQueue.submit(func() {
wg.Done()
})
if err != nil {
return err
}

wg.Wait()
return nil
}

func (c *baseClient) delete(ctx context.Context, key string, f func(ctx context.Context, key string) error) error {
errCh := make(chan error, 1)

Expand Down
20 changes: 14 additions & 6 deletions cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")

_ RemoteCacheClient = (*memcachedClient)(nil)
)

// MemcachedClient for compatible.
Expand Down Expand Up @@ -249,7 +251,13 @@ func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[strin
return nil
}

batches, err := c.getMultiBatched(ctx, keys)
alloc := GetAllocator(ctx)
var opts []memcache.Option
if alloc != nil {
opts = append(opts, memcache.WithAllocator(alloc))
}

batches, err := c.getMultiBatched(ctx, keys, opts...)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to fetch items from memcached", "numKeys", len(keys), "firstKey", keys[0], "err", err)

Expand Down Expand Up @@ -285,7 +293,7 @@ func (c *memcachedClient) Delete(ctx context.Context, key string) error {
})
}

func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([]map[string]*memcache.Item, error) {
func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string, opts ...memcache.Option) ([]map[string]*memcache.Item, error) {
// Do not batch if the input keys are less than the max batch size.
if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) {
// Even if we're not splitting the input into batches, make sure that our single request
Expand All @@ -298,7 +306,7 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
defer c.getMultiGate.Done()
}

items, err := c.getMultiSingle(ctx, keys)
items, err := c.getMultiSingle(ctx, keys, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -340,7 +348,7 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
batchKeys := sortedKeys[startIndex:endIndex]

res := &memcachedGetMultiResult{}
res.items, res.err = c.getMultiSingle(ctx, batchKeys)
res.items, res.err = c.getMultiSingle(ctx, batchKeys, opts...)

results <- res
return nil
Expand All @@ -364,7 +372,7 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
return items, lastErr
}

func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (items map[string]*memcache.Item, err error) {
func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string, opts ...memcache.Option) (items map[string]*memcache.Item, err error) {
start := time.Now()
c.metrics.operations.WithLabelValues(opGetMulti).Inc()

Expand All @@ -374,7 +382,7 @@ func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (it
// cache client backend.
return nil, ctx.Err()
default:
items, err = c.client.GetMulti(keys)
items, err = c.client.GetMulti(keys, opts...)
}

if err != nil {
Expand Down
119 changes: 119 additions & 0 deletions cache/memcached_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package cache

import (
"context"
"errors"
"net"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/gomemcache/memcache"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func TestMemcachedClient_GetMulti(t *testing.T) {
setup := func() (*memcachedClient, *mockMemcachedClientBackend, error) {
backend := newMockMemcachedClientBackend()
client, err := newMemcachedClient(
log.NewNopLogger(),
backend,
&mockServerSelector{},
MemcachedClientConfig{
Addresses: []string{"localhost"},
MaxAsyncConcurrency: 1,
MaxAsyncBufferSize: 10,
DNSProviderUpdateInterval: 10 * time.Second,
},
prometheus.NewPedanticRegistry(),
"test",
)

return client, backend, err
}

t.Run("no allocator", func(t *testing.T) {
client, backend, err := setup()
require.NoError(t, err)
require.NoError(t, client.SetAsync(context.Background(), "foo", []byte("bar"), 10*time.Second))
require.NoError(t, client.wait())

ctx := context.Background()
res := client.GetMulti(ctx, []string{"foo"})
require.Equal(t, map[string][]byte{"foo": []byte("bar")}, res)
require.Equal(t, 0, backend.allocations)
})

t.Run("with allocator", func(t *testing.T) {
client, backend, err := setup()
require.NoError(t, err)
require.NoError(t, client.SetAsync(context.Background(), "foo", []byte("bar"), 10*time.Second))
require.NoError(t, client.wait())

ctx := WithAllocator(context.Background(), &nopAllocator{})
res := client.GetMulti(ctx, []string{"foo"})
require.Equal(t, map[string][]byte{"foo": []byte("bar")}, res)
require.Equal(t, 1, backend.allocations)
})
}

type mockMemcachedClientBackend struct {
allocations int
values map[string]*memcache.Item
}

func newMockMemcachedClientBackend() *mockMemcachedClientBackend {
return &mockMemcachedClientBackend{
values: make(map[string]*memcache.Item),
}
}

func (m *mockMemcachedClientBackend) GetMulti(keys []string, opts ...memcache.Option) (map[string]*memcache.Item, error) {
options := &memcache.Options{}
for _, opt := range opts {
opt(options)
}

if options.Alloc != nil {
m.allocations += 1
}

out := make(map[string]*memcache.Item)
for _, k := range keys {
if v, ok := m.values[k]; ok {
out[k] = v
}
}

return out, nil
}

func (m *mockMemcachedClientBackend) Set(item *memcache.Item) error {
m.values[item.Key] = item
return nil
}

func (m *mockMemcachedClientBackend) Delete(key string) error {
delete(m.values, key)
return nil
}

type mockServerSelector struct{}

func (m mockServerSelector) SetServers(_ ...string) error {
return nil
}

func (m mockServerSelector) PickServer(key string) (net.Addr, error) {
return nil, errors.New("mock server selector")
}

func (m mockServerSelector) Each(f func(net.Addr) error) error {
return nil
}

type nopAllocator struct{}

func (n nopAllocator) Get(sz int) *[]byte { return nil }
func (n nopAllocator) Put(b *[]byte) {}
5 changes: 5 additions & 0 deletions cache/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"go.uber.org/atomic"
)

var (
_ Cache = (*MockCache)(nil)
_ Cache = (*InstrumentedMockCache)(nil)
)

type MockCache struct {
mu sync.Mutex
cache map[string]Item
Expand Down
2 changes: 2 additions & 0 deletions cache/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
var (
errRedisConfigNoEndpoint = errors.New("no redis endpoint provided")
errRedisMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")

_ RemoteCacheClient = (*redisClient)(nil)
)

// RedisClientConfig is the config accepted by RedisClient.
Expand Down
5 changes: 5 additions & 0 deletions cache/remote_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
_ Cache = (*MemcachedCache)(nil)
_ Cache = (*RedisCache)(nil)
)

// MemcachedCache is a memcached-based cache.
type MemcachedCache struct {
*remoteCache
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/snappy v0.0.4
github.com/grafana/gomemcache v0.0.0-20230104170548-c1cca813817a
github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/hashicorp/consul/api v1.15.3
github.com/hashicorp/go-cleanhttp v0.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grafana/gomemcache v0.0.0-20230104170548-c1cca813817a h1:q7s1Se3EataEmdPWUtP/PE+UVRxaS0MlprU0jUiYn8A=
github.com/grafana/gomemcache v0.0.0-20230104170548-c1cca813817a/go.mod h1:6fkC8bkriadatJOc7Pvjcvqr2xh9C79BYRRfE3WWoo0=
github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f h1:ANwIMe7kOiMNTK88tusoNDb840pWVskI4rCrdoMv5i0=
github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ=
github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg=
Expand Down

0 comments on commit 829f347

Please sign in to comment.