Skip to content

Commit

Permalink
Move buffer pool management into FileReader.
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Dec 12, 2022
1 parent af30987 commit 9f7d5c9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 22 deletions.
18 changes: 2 additions & 16 deletions pkg/storegateway/indexheader/encoding/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
package encoding

import (
"bufio"
"encoding/binary"
"hash/crc32"
"os"
"sync"

"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"
Expand All @@ -22,12 +20,6 @@ type DecbufFactory struct {
path string
}

var bufferPool = sync.Pool{
New: func() any {
return bufio.NewReaderSize(nil, readerBufferSize)
},
}

func NewDecbufFactory(path string) *DecbufFactory {
return &DecbufFactory{
path: path,
Expand Down Expand Up @@ -65,10 +57,9 @@ func (df *DecbufFactory) NewDecbufAtChecked(offset int, table *crc32.Table) Decb
return Decbuf{E: errors.Wrapf(ErrInvalidSize, "insufficient bytes read for size (got %d, wanted %d)", n, 4)}
}

bufReader := bufferPool.Get().(*bufio.Reader)
contentLength := int(binary.BigEndian.Uint32(lengthBytes))
bufferLength := len(lengthBytes) + contentLength + crc32.Size
r, err := NewFileReader(f, offset, bufferLength, bufReader)
r, err := NewFileReader(f, offset, bufferLength)
if err != nil {
return Decbuf{E: errors.Wrap(err, "create file reader")}
}
Expand Down Expand Up @@ -124,9 +115,8 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf {
return Decbuf{E: errors.Wrap(err, "stat file for decbuf")}
}

bufReader := bufferPool.Get().(*bufio.Reader)
fileSize := stat.Size()
reader, err := NewFileReader(f, 0, int(fileSize), bufReader)
reader, err := NewFileReader(f, 0, int(fileSize))
if err != nil {
return Decbuf{E: errors.Wrap(err, "file reader for decbuf")}
}
Expand All @@ -137,10 +127,6 @@ func (df *DecbufFactory) NewRawDecbuf() Decbuf {

// Close cleans up any resources associated with the Decbuf
func (df *DecbufFactory) Close(d Decbuf) error {
if d.r != nil {
bufferPool.Put(d.r.buf)
}

return d.close()
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/storegateway/indexheader/encoding/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"os"
"sync"
)

type FileReader struct {
Expand All @@ -18,12 +19,18 @@ type FileReader struct {
pos int
}

var bufferPool = sync.Pool{
New: func() any {
return bufio.NewReaderSize(nil, readerBufferSize)
},
}

// NewFileReader creates a new FileReader for the segment of file beginning at base bytes
// extending length bytes using the supplied buffered reader.
func NewFileReader(file *os.File, base, length int, buf *bufio.Reader) (*FileReader, error) {
func NewFileReader(file *os.File, base, length int) (*FileReader, error) {
f := &FileReader{
file: file,
buf: buf,
buf: bufferPool.Get().(*bufio.Reader),
base: base,
length: length,
}
Expand Down Expand Up @@ -143,5 +150,7 @@ func (f *FileReader) Len() int {
// is unexported to ensure that all resource management is handled by DecbufFactory
// which pools resources.
func (f *FileReader) close() error {
bufferPool.Put(f.buf)

return f.file.Close()
}
7 changes: 3 additions & 4 deletions pkg/storegateway/indexheader/encoding/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package encoding

import (
"bufio"
"os"
"path"
"testing"
Expand Down Expand Up @@ -185,7 +184,7 @@ func TestReaders_CreationWithEmptyContents(t *testing.T) {
require.NoError(t, f.Close())
})

r, err := NewFileReader(f, 0, 0, bufio.NewReader(f))
r, err := NewFileReader(f, 0, 0)
require.NoError(t, err)
require.ErrorIs(t, r.Skip(1), ErrInvalidSize)
require.ErrorIs(t, r.ResetAt(1), ErrInvalidSize)
Expand All @@ -206,7 +205,7 @@ func testReaders(t *testing.T, test func(t *testing.T, r *FileReader)) {
require.NoError(t, f.Close())
})

r, err := NewFileReader(f, 0, len(testReaderContents), bufio.NewReader(f))
r, err := NewFileReader(f, 0, len(testReaderContents))
require.NoError(t, err)

test(t, r)
Expand All @@ -226,7 +225,7 @@ func testReaders(t *testing.T, test func(t *testing.T, r *FileReader)) {
require.NoError(t, f.Close())
})

r, err := NewFileReader(f, len(offsetBytes), len(testReaderContents), bufio.NewReader(f))
r, err := NewFileReader(f, len(offsetBytes), len(testReaderContents))
require.NoError(t, err)

test(t, r)
Expand Down

0 comments on commit 9f7d5c9

Please sign in to comment.