Skip to content

Commit

Permalink
Store gateway mmap removal: use a shared pool of buffers across all D…
Browse files Browse the repository at this point in the history
…ecbuf instances (#3691)

* Use a shared pool of buffers across all DecbufFactory instances.

This results in a small improvement in performance and heap
consumption:

name                                         old time/op    new time/op    delta
NewStreamBinaryReader/20Names100Values-10       758µs ± 7%     726µs ± 1%    ~     (p=0.111 n=5+4)
NewStreamBinaryReader/20Names500Values-10      2.12ms ± 4%    2.15ms ± 4%    ~     (p=0.548 n=5+5)
NewStreamBinaryReader/20Names1000Values-10     3.49ms ± 2%    3.44ms ± 2%    ~     (p=0.222 n=5+5)
NewStreamBinaryReader/50Names100Values-10      1.54ms ± 4%    1.51ms ± 5%    ~     (p=0.421 n=5+5)
NewStreamBinaryReader/50Names500Values-10      4.33ms ± 1%    4.39ms ± 8%    ~     (p=1.000 n=5+5)
NewStreamBinaryReader/50Names1000Values-10     7.88ms ± 3%    7.84ms ± 3%    ~     (p=0.690 n=5+5)
NewStreamBinaryReader/100Names100Values-10     2.73ms ± 5%    2.57ms ± 5%  -6.15%  (p=0.032 n=5+5)
NewStreamBinaryReader/100Names500Values-10     8.12ms ± 3%    8.02ms ± 2%    ~     (p=0.548 n=5+5)
NewStreamBinaryReader/100Names1000Values-10    15.3ms ± 3%    14.8ms ± 1%  -3.43%  (p=0.016 n=5+5)
NewStreamBinaryReader/200Names100Values-10     4.87ms ± 7%    4.84ms ± 4%    ~     (p=0.841 n=5+5)
NewStreamBinaryReader/200Names500Values-10     15.9ms ± 2%    15.9ms ± 1%    ~     (p=1.000 n=5+5)
NewStreamBinaryReader/200Names1000Values-10    30.0ms ± 2%    30.2ms ± 3%    ~     (p=1.000 n=5+5)

name                                         old alloc/op   new alloc/op   delta
NewStreamBinaryReader/20Names100Values-10      3.36MB ± 0%    3.35MB ± 0%  -0.29%  (p=0.000 n=5+4)
NewStreamBinaryReader/20Names500Values-10      4.00MB ± 0%    3.99MB ± 0%  -0.25%  (p=0.016 n=5+4)
NewStreamBinaryReader/20Names1000Values-10     4.80MB ± 0%    4.79MB ± 0%  -0.21%  (p=0.008 n=5+5)
NewStreamBinaryReader/50Names100Values-10      3.63MB ± 0%    3.62MB ± 0%  -0.27%  (p=0.016 n=4+5)
NewStreamBinaryReader/50Names500Values-10      5.21MB ± 0%    5.20MB ± 0%  -0.19%  (p=0.008 n=5+5)
NewStreamBinaryReader/50Names1000Values-10     7.23MB ± 0%    7.22MB ± 0%  -0.14%  (p=0.008 n=5+5)
NewStreamBinaryReader/100Names100Values-10     4.08MB ± 0%    4.07MB ± 0%  -0.24%  (p=0.008 n=5+5)
NewStreamBinaryReader/100Names500Values-10     7.24MB ± 0%    7.23MB ± 0%  -0.14%  (p=0.008 n=5+5)
NewStreamBinaryReader/100Names1000Values-10    11.3MB ± 0%    11.3MB ± 0%  -0.09%  (p=0.029 n=4+4)
NewStreamBinaryReader/200Names100Values-10     4.97MB ± 0%    4.96MB ± 0%  -0.20%  (p=0.008 n=5+5)
NewStreamBinaryReader/200Names500Values-10     11.6MB ± 0%    11.6MB ± 0%  -0.08%  (p=0.008 n=5+5)
NewStreamBinaryReader/200Names1000Values-10    20.1MB ± 0%    20.1MB ± 0%  -0.05%  (p=0.016 n=4+5)

name                                         old allocs/op  new allocs/op  delta
NewStreamBinaryReader/20Names100Values-10       6.23k ± 0%     6.22k ± 0%  -0.11%  (p=0.029 n=4+4)
NewStreamBinaryReader/20Names500Values-10       30.3k ± 0%     30.3k ± 0%  -0.02%  (p=0.029 n=4+4)
NewStreamBinaryReader/20Names1000Values-10      60.3k ± 0%     60.3k ± 0%  -0.01%  (p=0.029 n=4+4)
NewStreamBinaryReader/50Names100Values-10       15.5k ± 0%     15.5k ± 0%  -0.05%  (p=0.029 n=4+4)
NewStreamBinaryReader/50Names500Values-10       75.6k ± 0%     75.6k ± 0%    ~     (p=0.079 n=4+5)
NewStreamBinaryReader/50Names1000Values-10       151k ± 0%      151k ± 0%  -0.00%  (p=0.029 n=4+4)
NewStreamBinaryReader/100Names100Values-10      31.0k ± 0%     31.0k ± 0%  -0.02%  (p=0.000 n=4+5)
NewStreamBinaryReader/100Names500Values-10       151k ± 0%      151k ± 0%  -0.00%  (p=0.000 n=5+4)
NewStreamBinaryReader/100Names1000Values-10      301k ± 0%      301k ± 0%  -0.00%  (p=0.016 n=4+5)
NewStreamBinaryReader/200Names100Values-10      61.9k ± 0%     61.9k ± 0%  -0.01%  (p=0.008 n=5+5)
NewStreamBinaryReader/200Names500Values-10       302k ± 0%      302k ± 0%  -0.00%  (p=0.008 n=5+5)
NewStreamBinaryReader/200Names1000Values-10      602k ± 0%      602k ± 0%  -0.00%  (p=0.016 n=5+4)

* Move buffer pool management into FileReader.

* Add note about how pooled buffers are cleaned up.
  • Loading branch information
charleskorn committed Dec 13, 2022
1 parent 5b65335 commit a0c2de3
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
18 changes: 2 additions & 16 deletions pkg/storegateway/indexheader/encoding/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
package encoding

import (
"bufio"
"encoding/binary"
"hash/crc32"
"os"
"sync"

"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"
Expand All @@ -19,17 +17,11 @@ const readerBufferSize = 4096

// DecbufFactory creates new file-backed decoding buffer instances for a specific index-header file.
type DecbufFactory struct {
pool sync.Pool
path string
}

func NewDecbufFactory(path string) *DecbufFactory {
return &DecbufFactory{
pool: sync.Pool{
New: func() any {
return bufio.NewReaderSize(nil, readerBufferSize)
},
},
path: path,
}
}
Expand Down Expand Up @@ -65,10 +57,9 @@ func (df *DecbufFactory) NewDecbufAtChecked(offset int, table *crc32.Table) Decb
return Decbuf{E: errors.Wrapf(ErrInvalidSize, "insufficient bytes read for size (got %d, wanted %d)", n, 4)}
}

bufReader := df.pool.Get().(*bufio.Reader)
contentLength := int(binary.BigEndian.Uint32(lengthBytes))
bufferLength := len(lengthBytes) + contentLength + crc32.Size
r, err := NewFileReader(f, offset, bufferLength, bufReader)
r, err := NewFileReader(f, offset, bufferLength)
if err != nil {
return Decbuf{E: errors.Wrap(err, "create file reader")}
}
Expand Down Expand Up @@ -124,9 +115,8 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf {
return Decbuf{E: errors.Wrap(err, "stat file for decbuf")}
}

bufReader := df.pool.Get().(*bufio.Reader)
fileSize := stat.Size()
reader, err := NewFileReader(f, 0, int(fileSize), bufReader)
reader, err := NewFileReader(f, 0, int(fileSize))
if err != nil {
return Decbuf{E: errors.Wrap(err, "file reader for decbuf")}
}
Expand All @@ -137,10 +127,6 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf {

// Close cleans up any resources associated with the Decbuf
func (df *DecbufFactory) Close(d Decbuf) error {
if d.r != nil {
df.pool.Put(d.r.buf)
}

return d.close()
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/storegateway/indexheader/encoding/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"os"
"sync"
)

type FileReader struct {
Expand All @@ -18,12 +19,18 @@ type FileReader struct {
pos int
}

var bufferPool = sync.Pool{
New: func() any {
return bufio.NewReaderSize(nil, readerBufferSize)
},
}

// NewFileReader creates a new FileReader for the segment of file beginning at base bytes
// extending length bytes using the supplied buffered reader.
func NewFileReader(file *os.File, base, length int, buf *bufio.Reader) (*FileReader, error) {
func NewFileReader(file *os.File, base, length int) (*FileReader, error) {
f := &FileReader{
file: file,
buf: buf,
buf: bufferPool.Get().(*bufio.Reader),
base: base,
length: length,
}
Expand Down Expand Up @@ -143,5 +150,9 @@ func (f *FileReader) Len() int {
// is unexported to ensure that all resource management is handled by DecbufFactory
// which pools resources.
func (f *FileReader) close() error {
// Note that we don't do anything to clean up the buffer before returning it to the pool here:
// we reset the buffer when we retrieve it from the pool instead.
bufferPool.Put(f.buf)

return f.file.Close()
}
7 changes: 3 additions & 4 deletions pkg/storegateway/indexheader/encoding/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package encoding

import (
"bufio"
"os"
"path"
"testing"
Expand Down Expand Up @@ -185,7 +184,7 @@ func TestReaders_CreationWithEmptyContents(t *testing.T) {
require.NoError(t, f.Close())
})

r, err := NewFileReader(f, 0, 0, bufio.NewReader(f))
r, err := NewFileReader(f, 0, 0)
require.NoError(t, err)
require.ErrorIs(t, r.Skip(1), ErrInvalidSize)
require.ErrorIs(t, r.ResetAt(1), ErrInvalidSize)
Expand All @@ -206,7 +205,7 @@ func testReaders(t *testing.T, test func(t *testing.T, r *FileReader)) {
require.NoError(t, f.Close())
})

r, err := NewFileReader(f, 0, len(testReaderContents), bufio.NewReader(f))
r, err := NewFileReader(f, 0, len(testReaderContents))
require.NoError(t, err)

test(t, r)
Expand All @@ -226,7 +225,7 @@ func testReaders(t *testing.T, test func(t *testing.T, r *FileReader)) {
require.NoError(t, f.Close())
})

r, err := NewFileReader(f, len(offsetBytes), len(testReaderContents), bufio.NewReader(f))
r, err := NewFileReader(f, len(offsetBytes), len(testReaderContents))
require.NoError(t, err)

test(t, r)
Expand Down

0 comments on commit a0c2de3

Please sign in to comment.