Skip to content

Commit

Permalink
GH-35015: [Go] Fix parquet memleak (#35973)
Browse files Browse the repository at this point in the history
### Rationale for this change

Some memory leaks resulted in partially skipped memory checks in pqarrow package.
This PR brings the checks back.

### What changes are included in this PR?

Releases in proper places.

### Are these changes tested?

Yes, the tests from #35015 are fully enabled now.

### Are there any user-facing changes?

No.

* Closes: #35015

Lead-authored-by: candiduslynx <candiduslynx@gmail.com>
Co-authored-by: Alex Shcherbakov <candiduslynx@users.noreply.github.com>
Co-authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
candiduslynx and zeroshade committed Jun 8, 2023
1 parent 8b5919d commit 17311b6
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 91 deletions.
22 changes: 3 additions & 19 deletions go/arrow/array/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,12 @@ func NewExtensionArrayWithStorage(dt arrow.ExtensionType, storage arrow.Array) a
panic(fmt.Errorf("arrow/array: storage type %s for extension type %s, does not match expected type %s", storage.DataType(), dt.ExtensionName(), dt.StorageType()))
}

base := ExtensionArrayBase{}
base.refCount = 1
base.storage = storage
storage.Retain()

storageData := storage.Data().(*Data)
// create a new data instance with the ExtensionType as the datatype but referencing the
// same underlying buffers to share them with the storage array.
baseData := NewData(dt, storageData.length, storageData.buffers, storageData.childData, storageData.nulls, storageData.offset)
defer baseData.Release()
base.array.setData(baseData)

// use the ExtensionType's ArrayType to construct the correctly typed object
// to use as the ExtensionArray interface. reflect.New returns a pointer to
// the newly created object.
arr := reflect.New(base.ExtensionType().ArrayType())
// set the embedded ExtensionArrayBase to the value we created above. We know
// that this field will exist because the interface requires embedding ExtensionArrayBase
// so we don't have to separately check, this will panic if called on an ArrayType
// that doesn't embed ExtensionArrayBase which is what we want.
arr.Elem().FieldByName("ExtensionArrayBase").Set(reflect.ValueOf(base))
return arr.Interface().(ExtensionArray)
data := NewData(dt, storageData.length, storageData.buffers, storageData.childData, storageData.nulls, storageData.offset)
defer data.Release()
return NewExtensionData(data)
}

// NewExtensionData expects a data with a datatype of arrow.ExtensionType and
Expand Down
24 changes: 21 additions & 3 deletions go/arrow/memory/checked_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,32 @@ func (a *CheckedAllocator) AssertSize(t TestingT, sz int) {
break
}
callersMsg.WriteString("\t")
callersMsg.WriteString(frame.Function)
callersMsg.WriteString(fmt.Sprintf(" line %d", frame.Line))
// frame.Func is a useful source of information if it's present.
// It may be nil for non-Go code or fully inlined functions.
if fn := frame.Func; fn != nil {
// format as func name + the offset in bytes from func entrypoint
callersMsg.WriteString(fmt.Sprintf("%s+%x", fn.Name(), frame.PC-fn.Entry()))
} else {
// fallback to outer func name + file line
callersMsg.WriteString(fmt.Sprintf("%s, line %d", frame.Function, frame.Line))
}

// Write a proper file name + line, so it's really easy to find the leak
callersMsg.WriteString("\n\t\t")
callersMsg.WriteString(frame.File + ":" + strconv.Itoa(frame.Line))
callersMsg.WriteString("\n")
if !more {
break
}
}
t.Errorf("LEAK of %d bytes FROM %s line %d\n%v", info.sz, f.Name(), info.line, callersMsg.String())

file, line := f.FileLine(info.pc)
t.Errorf("LEAK of %d bytes FROM\n\t%s+%x\n\t\t%s:%d\n%v",
info.sz,
f.Name(), info.pc-f.Entry(), // func name + offset in bytes between frame & entrypoint to func
file, line, // a proper file name + line, so it's really easy to find the leak
callersMsg.String(),
)
return true
})

Expand Down
18 changes: 9 additions & 9 deletions go/parquet/file/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,19 +783,19 @@ func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, dtype arrow.
}}
}

func (fr *byteArrayRecordReader) ReserveValues(extra int64, hasNullable bool) error {
fr.bldr.Reserve(int(extra))
return fr.primitiveRecordReader.ReserveValues(extra, hasNullable)
func (br *byteArrayRecordReader) ReserveValues(extra int64, hasNullable bool) error {
br.bldr.Reserve(int(extra))
return br.primitiveRecordReader.ReserveValues(extra, hasNullable)
}

func (fr *byteArrayRecordReader) Retain() {
fr.bldr.Retain()
fr.primitiveRecordReader.Retain()
func (br *byteArrayRecordReader) Retain() {
br.bldr.Retain()
br.primitiveRecordReader.Retain()
}

func (fr *byteArrayRecordReader) Release() {
fr.bldr.Release()
fr.primitiveRecordReader.Release()
func (br *byteArrayRecordReader) Release() {
br.bldr.Release()
br.primitiveRecordReader.Release()
}

func (br *byteArrayRecordReader) ReadValuesDense(toRead int64) error {
Expand Down
86 changes: 46 additions & 40 deletions go/parquet/pqarrow/column_readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ func (lr *leafReader) Retain() {

func (lr *leafReader) Release() {
if atomic.AddInt64(&lr.refCount, -1) == 0 {
if lr.out != nil {
lr.out.Release()
lr.out = nil
}
lr.releaseOut()
if lr.recordRdr != nil {
lr.recordRdr.Release()
lr.recordRdr = nil
Expand All @@ -93,10 +90,7 @@ func (lr *leafReader) GetRepLevels() ([]int16, error) {
func (lr *leafReader) IsOrHasRepeatedChild() bool { return false }

func (lr *leafReader) LoadBatch(nrecords int64) (err error) {
if lr.out != nil {
lr.out.Release()
lr.out = nil
}
lr.releaseOut()
lr.recordRdr.Reset()

if err := lr.recordRdr.Reserve(nrecords); err != nil {
Expand All @@ -117,12 +111,25 @@ func (lr *leafReader) LoadBatch(nrecords int64) (err error) {
}
}
}
lr.out, err = transferColumnData(lr.recordRdr, lr.field.Type, lr.descr, lr.rctx.mem)
lr.out, err = transferColumnData(lr.recordRdr, lr.field.Type, lr.descr)
return
}

func (lr *leafReader) BuildArray(_ int64) (*arrow.Chunked, error) {
return lr.out, nil
func (lr *leafReader) BuildArray(int64) (*arrow.Chunked, error) {
return lr.clearOut(), nil
}

// releaseOut will clear lr.out as well as release it if it wasn't nil
func (lr *leafReader) releaseOut() {
if out := lr.clearOut(); out != nil {
out.Release()
}
}

// clearOut will clear lt.out and return the old value
func (lr *leafReader) clearOut() (out *arrow.Chunked) {
out, lr.out = lr.out, nil
return out
}

func (lr *leafReader) Field() *arrow.Field { return lr.field }
Expand Down Expand Up @@ -251,6 +258,7 @@ func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
if lenBound > 0 && (sr.hasRepeatedChild || sr.filtered.Nullable) {
nullBitmap = memory.NewResizableBuffer(sr.rctx.mem)
nullBitmap.Resize(int(bitutil.BytesForBits(lenBound)))
defer nullBitmap.Release()
validityIO.ValidBits = nullBitmap.Bytes()
defLevels, err := sr.GetDefLevels()
if err != nil {
Expand All @@ -275,18 +283,20 @@ func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
nullBitmap.Resize(int(bitutil.BytesForBits(validityIO.Read)))
}

childArrData := make([]arrow.ArrayData, 0)
childArrData := make([]arrow.ArrayData, len(sr.children))
defer releaseArrayData(childArrData)
// gather children arrays and def levels
for _, child := range sr.children {
for i, child := range sr.children {
field, err := child.BuildArray(lenBound)
if err != nil {
return nil, err
}
arrdata, err := chunksToSingle(field)

childArrData[i], err = chunksToSingle(field)
field.Release() // release field before checking
if err != nil {
return nil, err
}
childArrData = append(childArrData, arrdata)
}

if !sr.filtered.Nullable && !sr.hasRepeatedChild {
Expand All @@ -300,7 +310,7 @@ func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {

data := array.NewData(sr.filtered.Type, int(validityIO.Read), buffers, childArrData, int(validityIO.NullCount), 0)
defer data.Release()
arr := array.MakeFromData(data)
arr := array.NewStructData(data)
defer arr.Release()
return arrow.NewChunked(sr.filtered.Type, []arrow.Array{arr}), nil
}
Expand Down Expand Up @@ -389,6 +399,7 @@ func (lr *listReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
if err != nil {
return nil, err
}
defer arr.Release()

// resize to actual number of elems returned
offsetsBuffer.Resize(arrow.Int32Traits.BytesRequired(int(validityIO.Read) + 1))
Expand Down Expand Up @@ -443,14 +454,16 @@ func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) {
case 0:
return array.NewData(chunked.DataType(), 0, []*memory.Buffer{nil, nil}, nil, 0, 0), nil
case 1:
return chunked.Chunk(0).Data(), nil
data := chunked.Chunk(0).Data()
data.Retain() // we pass control to the caller
return data, nil
default: // if an item reader yields a chunked array, this is not yet implemented
return nil, arrow.ErrNotImplemented
}
}

// create a chunked arrow array from the raw record data
func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr *schema.Column, mem memory.Allocator) (*arrow.Chunked, error) {
func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr *schema.Column) (*arrow.Chunked, error) {
dt := valueType
if valueType.ID() == arrow.EXTENSION {
dt = valueType.(arrow.ExtensionType).StorageType()
Expand Down Expand Up @@ -525,8 +538,9 @@ func transferZeroCopy(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData
}
}()

return array.NewData(dt, rdr.ValuesWritten(), []*memory.Buffer{
bitmap, values}, nil, int(rdr.NullCount()), 0)
return array.NewData(dt, rdr.ValuesWritten(),
[]*memory.Buffer{bitmap, values},
nil, int(rdr.NullCount()), 0)
}

func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked {
Expand All @@ -535,24 +549,18 @@ func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked {
return transferDictionary(brdr, &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: dt})
}
chunks := brdr.GetBuilderChunks()
switch {
case dt.ID() == arrow.EXTENSION:
etype := dt.(arrow.ExtensionType)
for idx, chk := range chunks {
chunks[idx] = array.NewExtensionArrayWithStorage(etype, chk)
chk.Release() // NewExtensionArrayWithStorage will call retain on chk, so it still needs to be released
defer chunks[idx].Release()
}
case dt == arrow.BinaryTypes.String || dt == arrow.BinaryTypes.LargeString:
for idx := range chunks {
prev := chunks[idx]
chunks[idx] = array.MakeFromData(chunks[idx].Data())
prev.Release()
defer chunks[idx].Release()
defer releaseArrays(chunks)

switch dt := dt.(type) {
case arrow.ExtensionType:
for idx, chunk := range chunks {
chunks[idx] = array.NewExtensionArrayWithStorage(dt, chunk)
chunk.Release()
}
default:
for idx := range chunks {
defer chunks[idx].Release()
case *arrow.StringType, *arrow.LargeStringType:
for idx, chunk := range chunks {
chunks[idx] = array.MakeFromData(chunk.Data())
chunk.Release()
}
}
return arrow.NewChunked(dt, chunks)
Expand Down Expand Up @@ -846,8 +854,6 @@ func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arro
func transferDictionary(rdr file.RecordReader, logicalValueType arrow.DataType) *arrow.Chunked {
brdr := rdr.(file.BinaryRecordReader)
chunks := brdr.GetBuilderChunks()
for _, chunk := range chunks {
defer chunk.Release()
}
defer releaseArrays(chunks)
return arrow.NewChunked(logicalValueType, chunks)
}
11 changes: 6 additions & 5 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, row
}

func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) {
t.Helper()
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

Expand All @@ -338,6 +339,7 @@ func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) {

chunked, err := crdr.NextBatch(tbl.NumRows())
require.NoError(t, err)
defer chunked.Release()

require.EqualValues(t, tbl.NumRows(), chunked.Len())

Expand Down Expand Up @@ -724,6 +726,7 @@ func (ps *ParquetIOTestSuite) checkSingleColumnRead(mem memory.Allocator, typ ar

chunked, err := cr.NextBatch(smallSize)
ps.NoError(err)
defer chunked.Release()

ps.Len(chunked.Chunks(), 1)
ps.True(array.Equal(values, chunked.Chunk(0)))
Expand Down Expand Up @@ -993,6 +996,7 @@ func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(mem memory.Allocator,

chunked, err := cr.NextBatch(smallSize)
ps.NoError(err)
defer chunked.Release()

ps.Len(chunked.Chunks(), 1)
ps.NotNil(chunked.Chunk(0))
Expand Down Expand Up @@ -1040,10 +1044,7 @@ func (ps *ParquetIOTestSuite) TestSingleColumnRequiredWrite() {
}
}

func (ps *ParquetIOTestSuite) roundTripTable(_ memory.Allocator, expected arrow.Table, storeSchema bool) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator) // FIXME: currently overriding allocator to isolate leaks between roundTripTable and caller
//defer mem.AssertSize(ps.T(), 0) // FIXME: known leak

func (ps *ParquetIOTestSuite) roundTripTable(mem memory.Allocator, expected arrow.Table, storeSchema bool) {
var buf bytes.Buffer
var props pqarrow.ArrowWriterProperties
if storeSchema {
Expand Down Expand Up @@ -1180,7 +1181,7 @@ func prepareListOfListTable(dt arrow.DataType, size, nullCount int, nullablePare

func (ps *ParquetIOTestSuite) TestSingleEmptyListsColumnReadWrite() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
//defer mem.AssertSize(ps.T(), 0) // FIXME: known leak
defer mem.AssertSize(ps.T(), 0)

expected := prepareEmptyListsTable(smallSize)
defer expected.Release()
Expand Down
3 changes: 2 additions & 1 deletion go/parquet/pqarrow/encode_dictionary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (ar *ArrowReadDictSuite) writeSimple() {
pqarrow.DefaultWriterProps()))
}

func (ArrowReadDictSuite) NullProbabilities() []float64 {
func (*ArrowReadDictSuite) NullProbabilities() []float64 {
return []float64{0.0, 0.5, 1}
}

Expand Down Expand Up @@ -543,6 +543,7 @@ func (ar *ArrowReadDictSuite) TestIncrementalReads() {
for i := 0; i < numReads; i++ {
chunk, err := col.NextBatch(int64(batchSize))
ar.Require().NoError(err)
defer chunk.Release()
// no need to manually release chunk, like other record readers
// the col reader holds onto the current record and will release it
// when the next is requested or when the reader is released
Expand Down

0 comments on commit 17311b6

Please sign in to comment.