Skip to content

Commit

Permalink
Allow result memory to be allocated from per-call Allocator (#8)
Browse files Browse the repository at this point in the history
Remove the existing Client.Pool for pooling memory used by cache
results and replace it with an optional per-call Allocator. The
per-call allocator allows callers to determine the correct lifecycle
for anything allocated.

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Jan 4, 2023
1 parent 8ab22a7 commit c1cca81
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 51 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -13,9 +13,10 @@ jobs:

- name: Check out code
uses: actions/checkout@v3

- name: Install Memcached
run: sudo apt-get install -y memcached
- name: Run Tests
run: go test -race ./...
run: go test -v -race ./...

lint:
name: Lint
Expand Down
47 changes: 14 additions & 33 deletions memcache/memcache.go
Expand Up @@ -146,8 +146,6 @@ type Client struct {
// be set to a number higher than your peak parallel requests.
MaxIdleConns int

Pool BytesPool

selector ServerSelector

lk sync.Mutex
Expand Down Expand Up @@ -183,15 +181,6 @@ type conn struct {
c *Client
}

// BytesPool is a pool of bytes that can be reused.
type BytesPool interface {
// Get returns a new byte slice that has a capacity at least the same as the
// requested size.
Get(sz int) (*[]byte, error)
// Put returns a byte slice to the pool.
Put(b *[]byte)
}

// release returns this connection back to the client's free pool
func (cn *conn) release() {
cn.c.putFreeConn(cn.addr, cn)
Expand Down Expand Up @@ -326,9 +315,10 @@ func (c *Client) FlushAll() error {

// Get gets the item for the given key. ErrCacheMiss is returned for a
// memcache cache miss. The key must be at most 250 bytes in length.
func (c *Client) Get(key string) (item *Item, err error) {
func (c *Client) Get(key string, opts ...Option) (item *Item, err error) {
options := newOptions(opts...)
err = c.withKeyAddr(key, func(addr net.Addr) error {
return c.getFromAddr(addr, []string{key}, func(it *Item) { item = it })
return c.getFromAddr(addr, []string{key}, options, func(it *Item) { item = it })
})
if err == nil && item == nil {
err = ErrCacheMiss
Expand Down Expand Up @@ -373,15 +363,15 @@ func (c *Client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error {
})
}

func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error {
func (c *Client) getFromAddr(addr net.Addr, keys []string, opts *Options, cb func(*Item)) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
if _, err := fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
if err := c.parseGetResponse(rw.Reader, cb); err != nil {
if err := c.parseGetResponse(rw.Reader, opts, cb); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -465,7 +455,9 @@ func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) e
// items may have fewer elements than the input slice, due to memcache
// cache misses. Each key must be at most 250 bytes in length.
// If no error is returned, the returned map will also be non-nil.
func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
func (c *Client) GetMulti(keys []string, opts ...Option) (map[string]*Item, error) {
options := newOptions(opts...)

var lk sync.Mutex
m := make(map[string]*Item)
addItemToMap := func(it *Item) {
Expand All @@ -489,7 +481,7 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
ch := make(chan error, buffered)
for addr, keys := range keyMap {
go func(addr net.Addr, keys []string) {
ch <- c.getFromAddr(addr, keys, addItemToMap)
ch <- c.getFromAddr(addr, keys, options, addItemToMap)
}(addr, keys)
}

Expand All @@ -504,7 +496,7 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {

// parseGetResponse reads a GET response from r and calls cb for each
// read and allocated Item
func (c *Client) parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
func (c *Client) parseGetResponse(r *bufio.Reader, opts *Options, cb func(*Item)) error {
for {
line, err := r.ReadSlice('\n')
if err != nil {
Expand All @@ -519,26 +511,15 @@ func (c *Client) parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
return err
}
buffSize := size + 2
if c.Pool != nil {
v, err := c.Pool.Get(buffSize)
if err != nil {
return err
}
it.Value = (*v)[:buffSize]
} else {
it.Value = make([]byte, buffSize)
}
buff := opts.Alloc.Get(buffSize)
it.Value = (*buff)[:buffSize]
_, err = io.ReadFull(r, it.Value)
if err != nil {
if c.Pool != nil {
c.Pool.Put(&it.Value)
}
opts.Alloc.Put(buff)
return err
}
if !bytes.HasSuffix(it.Value, crlf) {
if c.Pool != nil {
c.Pool.Put(&it.Value)
}
opts.Alloc.Put(buff)
return fmt.Errorf("memcache: corrupt get result read")
}
it.Value = it.Value[:size]
Expand Down
46 changes: 30 additions & 16 deletions memcache/memcache_test.go
Expand Up @@ -308,42 +308,56 @@ func BenchmarkScanGetResponseLine(b *testing.B) {
func BenchmarkParseGetResponse(b *testing.B) {
valueSize := 500
response := strings.NewReader(fmt.Sprintf("VALUE foobar1234 0 %v 1234\r\n%s\r\nEND\r\n", valueSize, strings.Repeat("a", valueSize)))
c := &Client{
Pool: newTestPool(valueSize + 2),
}
var reader = bufio.NewReader(response)
var err error

opts := newOptions(WithAllocator(newTestAllocator(valueSize + 2)))
c := &Client{}
reader := bufio.NewReader(response)

for i := 0; i < b.N; i++ {
err = c.parseGetResponse(reader, func(it *Item) {
c.Pool.Put(&it.Value)
err := c.parseGetResponse(reader, opts, func(it *Item) {
opts.Alloc.Put(&it.Value)
})
if err != nil {
b.Fatal(err)
}
response.Seek(0, 0)
_, err = response.Seek(0, 0)
if err != nil {
b.Fatal(err)
}
reader.Reset(response)

}
}

type testPool struct {
pool sync.Pool
type testAllocator struct {
pool sync.Pool
expectedSize int
}

func newTestPool(dataSize int) BytesPool {
return &testPool{
func newTestAllocator(dataSize int) Allocator {
return &testAllocator{
expectedSize: dataSize,
pool: sync.Pool{
New: func() interface{} {
b := make([]byte, 0, dataSize)
b := make([]byte, dataSize)
return &b
},
},
}
}

func (p *testPool) Get(sz int) (*[]byte, error) {
return p.pool.Get().(*[]byte), nil
func (p *testAllocator) Get(sz int) *[]byte {
// NOTE: This assumes all entries in the pool are the same, correct size. This
// is fine because we are only using these values to benchmark the same data over
// and over again.
if p.expectedSize != sz {
panic("unexpected allocation size in test allocator")
}

bufPtr := p.pool.Get().(*[]byte)
return bufPtr
}

func (p *testPool) Put(b *[]byte) {
func (p *testAllocator) Put(b *[]byte) {
p.pool.Put(b)
}
59 changes: 59 additions & 0 deletions memcache/options.go
@@ -0,0 +1,59 @@
package memcache

var nopAllocator = &defaultAllocator{}

func newOptions(opts ...Option) *Options {
o := &Options{
Alloc: nopAllocator,
}

for _, opt := range opts {
opt(o)
}

return o
}

// Options are used to modify the behavior of an individual Get or GetMulti
// call made by the Client. They are constructed by applying Option callbacks
// passed to a Client method to a default Options instance.
type Options struct {
Alloc Allocator
}

// Option is a callback used to modify the Options that a particular Client
// method uses.
type Option func(opts *Options)

// WithAllocator creates a new Option that makes use of a specific memory Allocator
// for result values (Item.Value) loaded from memcached.
func WithAllocator(alloc Allocator) Option {
return func(opts *Options) {
opts.Alloc = alloc
}
}

// Allocator allows memory for memcached result values (Item.Value) to managed by
// callers of the Client instead of by the Client itself. For example, this can be
// used by callers to implement arena-style memory management. The default implementation
// used, when not otherwise overridden, uses `make` and relies on GC for cleanup.
type Allocator interface {
// Get returns a byte slice with at least sz capacity. Length of the slice is
// not guaranteed and so must be asserted by callers (the Client).
Get(sz int) *[]byte
// Put returns the byte slice to the underlying allocator. The Client will
// only call this method during error handling when allocated values are not
// returned to the caller as cache results.
Put(b *[]byte)
}

type defaultAllocator struct{}

func (d defaultAllocator) Get(sz int) *[]byte {
b := make([]byte, sz)
return &b
}

func (d defaultAllocator) Put(_ *[]byte) {
// no-op
}

0 comments on commit c1cca81

Please sign in to comment.