Skip to content

Commit

Permalink
ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for s…
Browse files Browse the repository at this point in the history
…kipping rows (#13887)

Authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade authored Aug 16, 2022
1 parent f6ad4bf commit ee3a8d8
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 25 deletions.
18 changes: 13 additions & 5 deletions go/parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package file

import (
"fmt"
"sync"

"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/apache/arrow/go/v10/internal/utils"
Expand Down Expand Up @@ -125,6 +126,7 @@ type columnChunkReader struct {
// the number of values we've decoded so far
numDecoded int64
mem memory.Allocator
bufferPool *sync.Pool

decoders map[format.Encoding]encoding.TypedDecoder
decoderTraits encoding.DecoderTraits
Expand All @@ -136,8 +138,12 @@ type columnChunkReader struct {

// NewColumnReader returns a column reader for the provided column initialized with the given pagereader that will
// provide the pages of data for this column. The type is determined from the column passed in.
func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator) ColumnChunkReader {
base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder)}
//
// In addition to the page reader and allocator, a pointer to a shared sync.Pool is expected to provide buffers for temporary
// usage to minimize allocations. The bufferPool should provide *memory.Buffer objects that can be resized as necessary, buffers
// should have `ResizeNoShrink(0)` called on them before being put back into the pool.
func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: bufferPool}
switch descr.PhysicalType() {
case parquet.Types.FixedLenByteArray:
base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
Expand Down Expand Up @@ -435,15 +441,17 @@ func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, b
valsRead int64 = 0
)

// TODO(ARROW-16790): ideally we should re-use a shared pool of buffers to avoid unnecessary memory allocation for skips
scratch := memory.NewResizableBuffer(c.mem)
scratch := c.bufferPool.Get().(*memory.Buffer)
defer func() {
scratch.ResizeNoShrink(0)
c.bufferPool.Put(scratch)
}()
bufMult := 1
if c.descr.PhysicalType() == parquet.Types.Boolean {
// for bools, BytesRequired returns 1 byte per 8 bool, but casting []byte to []bool requires 1 byte per 1 bool
bufMult = 8
}
scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
defer scratch.Release()

for {
batchSize = utils.Min(batchSize, toskip)
Expand Down
19 changes: 18 additions & 1 deletion go/parquet/file/column_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"math"
"math/rand"
"reflect"
"runtime"
"sync"
"testing"

"github.com/apache/arrow/go/v10/arrow/memory"
Expand Down Expand Up @@ -173,10 +175,25 @@ type PrimitiveReaderSuite struct {
nvalues int
maxDefLvl int16
maxRepLvl int16

bufferPool sync.Pool
}

func (p *PrimitiveReaderSuite) SetupTest() {
p.bufferPool = sync.Pool{
New: func() interface{} {
buf := memory.NewResizableBuffer(mem)
runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
obj.Release()
})
return buf
},
}
}

func (p *PrimitiveReaderSuite) TearDownTest() {
p.clear()
p.bufferPool = sync.Pool{}
}

func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
Expand All @@ -185,7 +202,7 @@ func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
m.TestData().Set("pages", p.pages)
m.On("Err").Return((error)(nil))
p.pager = m
p.reader = file.NewColumnReader(d, m, mem)
p.reader = file.NewColumnReader(d, m, mem, &p.bufferPool)
}

func (p *PrimitiveReaderSuite) checkResults(typ reflect.Type) {
Expand Down
20 changes: 19 additions & 1 deletion go/parquet/file/column_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"math"
"reflect"
"runtime"
"sync"
"testing"

"github.com/apache/arrow/go/v10/arrow/bitutil"
Expand Down Expand Up @@ -223,19 +225,35 @@ type PrimitiveWriterTestSuite struct {
metadata *metadata.ColumnChunkMetaDataBuilder
sink *encoding.BufferWriter
readbuffer *memory.Buffer

bufferPool sync.Pool
}

func (p *PrimitiveWriterTestSuite) SetupTest() {
p.SetupValuesOut(SmallSize)
p.props = parquet.NewWriterProperties()
p.SetupSchema(parquet.Repetitions.Required, 1)
p.descr = p.Schema.Column(0)

p.bufferPool = sync.Pool{
New: func() interface{} {
buf := memory.NewResizableBuffer(mem)
runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
obj.Release()
})
return buf
},
}
}

func (p *PrimitiveWriterTestSuite) TearDownTest() {
p.bufferPool = sync.Pool{}
}

func (p *PrimitiveWriterTestSuite) buildReader(nrows int64, compression compress.Compression) file.ColumnChunkReader {
p.readbuffer = p.sink.Finish()
pagereader, _ := file.NewPageReader(arrutils.NewBufferedReader(bytes.NewReader(p.readbuffer.Bytes()), p.readbuffer.Len()), nrows, compression, mem, nil)
return file.NewColumnReader(p.descr, pagereader, mem)
return file.NewColumnReader(p.descr, pagereader, mem, &p.bufferPool)
}

func (p *PrimitiveWriterTestSuite) buildWriter(_ int64, columnProps parquet.ColumnProperties, version parquet.Version) file.ColumnChunkWriter {
Expand Down
23 changes: 23 additions & 0 deletions go/parquet/file/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"io"
"os"
"runtime"
"sync"

"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/apache/arrow/go/v10/parquet"
Expand All @@ -47,6 +49,8 @@ type Reader struct {
metadata *metadata.FileMetaData
footerOffset int64
fileDecryptor encryption.FileDecryptor

bufferPool sync.Pool
}

type ReadOption func(*Reader)
Expand Down Expand Up @@ -113,13 +117,31 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, er
f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
}

f.bufferPool = sync.Pool{
New: func() interface{} {
buf := memory.NewResizableBuffer(f.props.Allocator())
runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
obj.Release()
})
return buf
},
}

if f.metadata == nil {
return f, f.parseMetaData()
}

return f, nil
}

// BufferPool returns the internal buffer pool being utilized by this reader.
// This is primarily for use by the pqarrow.FileReader or anything that builds
// on top of the Reader and constructs their own ColumnReaders (like the
// RecordReader)
func (f *Reader) BufferPool() *sync.Pool {
return &f.bufferPool
}

// Close will close the current reader, and if the underlying reader being used
// is an `io.Closer` then Close will be called on it too.
func (f *Reader) Close() error {
Expand Down Expand Up @@ -290,5 +312,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
r: f.r,
sourceSz: f.footerOffset,
fileDecryptor: f.fileDecryptor,
bufferPool: &f.bufferPool,
}
}
25 changes: 13 additions & 12 deletions go/parquet/file/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package file

import (
"fmt"
"sync"
"sync/atomic"
"unsafe"

Expand Down Expand Up @@ -127,9 +128,9 @@ type primitiveRecordReader struct {
useValues bool
}

func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator) primitiveRecordReader {
func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator, bufferPool *sync.Pool) primitiveRecordReader {
return primitiveRecordReader{
ColumnChunkReader: NewColumnReader(descr, nil, mem),
ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool),
values: memory.NewResizableBuffer(mem),
validBits: memory.NewResizableBuffer(mem),
mem: mem,
Expand Down Expand Up @@ -326,12 +327,12 @@ func (b *binaryRecordReader) GetBuilderChunks() []arrow.Array {
return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks()
}

func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
}

pr := createPrimitiveRecordReader(descr, mem)
pr := createPrimitiveRecordReader(descr, mem, bufferPool)
return &recordReader{
refCount: 1,
recordReaderImpl: &pr,
Expand Down Expand Up @@ -722,7 +723,7 @@ func (fr *flbaRecordReader) GetBuilderChunks() []arrow.Array {
return []arrow.Array{fr.bldr.NewArray()}
}

func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
}
Expand All @@ -731,7 +732,7 @@ func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Alloca

return &binaryRecordReader{&recordReader{
recordReaderImpl: &flbaRecordReader{
createPrimitiveRecordReader(descr, mem),
createPrimitiveRecordReader(descr, mem, bufferPool),
array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth}),
nil,
},
Expand All @@ -750,7 +751,7 @@ type byteArrayRecordReader struct {
valueBuf []parquet.ByteArray
}

func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader {
func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
}
Expand All @@ -762,7 +763,7 @@ func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.A

return &binaryRecordReader{&recordReader{
recordReaderImpl: &byteArrayRecordReader{
createPrimitiveRecordReader(descr, mem),
createPrimitiveRecordReader(descr, mem, bufferPool),
array.NewBinaryBuilder(mem, dt),
nil,
},
Expand Down Expand Up @@ -840,13 +841,13 @@ func (br *byteArrayRecordReader) GetBuilderChunks() []arrow.Array {

// TODO(mtopol): create optimized readers for dictionary types after ARROW-7286 is done

func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator) RecordReader {
func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
switch descr.PhysicalType() {
case parquet.Types.ByteArray:
return newByteArrayRecordReader(descr, info, mem)
return newByteArrayRecordReader(descr, info, mem, bufferPool)
case parquet.Types.FixedLenByteArray:
return newFLBARecordReader(descr, info, mem)
return newFLBARecordReader(descr, info, mem, bufferPool)
default:
return newRecordReader(descr, info, mem)
return newRecordReader(descr, info, mem, bufferPool)
}
}
5 changes: 4 additions & 1 deletion go/parquet/file/row_group_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package file

import (
"fmt"
"sync"

"github.com/apache/arrow/go/v10/internal/utils"
"github.com/apache/arrow/go/v10/parquet"
Expand All @@ -38,6 +39,8 @@ type RowGroupReader struct {
rgMetadata *metadata.RowGroupMetaData
props *parquet.ReaderProperties
fileDecryptor encryption.FileDecryptor

bufferPool *sync.Pool
}

// MetaData returns the metadata of the current Row Group
Expand Down Expand Up @@ -65,7 +68,7 @@ func (r *RowGroupReader) Column(i int) (ColumnChunkReader, error) {
if err != nil {
return nil, fmt.Errorf("parquet: unable to initialize page reader: %w", err)
}
return NewColumnReader(descr, pageRdr, r.props.Allocator()), nil
return NewColumnReader(descr, pageRdr, r.props.Allocator(), r.bufferPool), nil
}

func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
Expand Down
5 changes: 3 additions & 2 deletions go/parquet/pqarrow/column_readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/binary"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -50,13 +51,13 @@ type leafReader struct {
refCount int64
}

func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties) (*ColumnReader, error) {
func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) (*ColumnReader, error) {
ret := &leafReader{
rctx: rctx,
field: field,
input: input,
descr: input.Descr(),
recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem),
recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem, bufferPool),
props: props,
refCount: 1,
}
Expand Down
6 changes: 3 additions & 3 deletions go/parquet/pqarrow/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups
// greatly improves performance.
// GetFieldReader causes read operations, when issued serially on large numbers of columns,
// this is super time consuming. Get field readers concurrently.
g,gctx := errgroup.WithContext(ctx)
g, gctx := errgroup.WithContext(ctx)
if !fr.Props.Parallel {
g.SetLimit(1)
}
Expand Down Expand Up @@ -482,7 +482,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
return nil, nil
}

out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props)
out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props, fr.rdr.BufferPool())
return
}

Expand All @@ -499,7 +499,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
// When reading structs with large numbers of columns, the serial load is very slow.
// This is especially true when reading Cloud Storage. Loading concurrently
// greatly improves performance.
g,gctx := errgroup.WithContext(ctx)
g, gctx := errgroup.WithContext(ctx)
if !fr.Props.Parallel {
g.SetLimit(1)
}
Expand Down

0 comments on commit ee3a8d8

Please sign in to comment.