Skip to content

pqarrow.FileWriter.Close() leaks per-column-chunk buffers when the underlying io.Writer fails mid-flush #794

@AaronReboot

Description

@AaronReboot

pqarrow.FileWriter.Close() leaks per-column-chunk buffers when the underlying io.Writer fails mid-flush

Describe the bug, including details regarding any error messages, version, and platform.

When pqarrow.FileWriter.Close() runs against an io.Writer that has gone broken, it returns the first column-chunk close error and leaves every subsequent column's per-chunk allocator-tracked state stranded. The leak is strictly on the error path: the same code with a working sink returns CheckedAllocator.CurrentAlloc() == 0. Close() is the writer's only chance to release that state — there is no separate Release() API.

The root cause is in parquet/file/row_group_writer.go::(*rowGroupWriter).Close (v18.5.2 lines 232-254):

for _, wr := range rg.columnWriters {
    if wr != nil {
        if err := wr.Close(); err != nil {
            return err   // ← strands every column past this one
        }
        rg.bytesWritten += wr.TotalBytesWritten()
        rg.compressedBytesWritten += wr.TotalCompressedBytes()
    }
}

When wr.Close() errors on column N, columns N+1..end never get Close() called, so their currentEncoder and (in the buffered-row-group path) their bufferedPageWriter.inMemSink are never released. The bufferedPageWriter.Close body that does release the in-memory sink (page_writer.go:485) is reached via the per-column pager.Close call inside columnWriter.Close — so skipping a column's Close() also skips releasing its accumulated page bytes.

Versions:

  • github.com/apache/arrow-go/v18 v18.5.2
  • Go 1.26.2
  • Verified on darwin/arm64 with the reproducer below. Originally observed on linux/amd64 in production with a different failing-writer (a cloud storage authentication failure on a long-lived multi-GiB write).

Reproducer

go.mod pins github.com/apache/arrow-go/v18 v18.5.2. Two-column schema, WriteBuffered loop, an io.Writer that returns io.ErrShortWrite after failAt bytes:

package main

import (
	"fmt"
	"io"

	"github.com/apache/arrow-go/v18/arrow"
	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	"github.com/apache/arrow-go/v18/parquet"
	"github.com/apache/arrow-go/v18/parquet/pqarrow"
)

// failAfter returns io.ErrShortWrite once it has accepted `remaining`
// bytes — models any sink that breaks mid-flush (token expiry, broken
// pipe, HTTP 412 precondition failure, ctx cancellation).
type failAfter struct{ remaining int }

func (w *failAfter) Write(p []byte) (int, error) {
	if w.remaining <= 0 {
		return 0, io.ErrShortWrite
	}
	if len(p) <= w.remaining {
		w.remaining -= len(p)
		return len(p), nil
	}
	n := w.remaining
	w.remaining = 0
	return n, io.ErrShortWrite
}

func main() {
	alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
	schema := arrow.NewSchema([]arrow.Field{
		{Name: "s", Type: arrow.BinaryTypes.String, Nullable: false},
		{Name: "i", Type: arrow.PrimitiveTypes.Int32, Nullable: false},
	}, nil)

	props := parquet.NewWriterProperties(
		parquet.WithAllocator(alloc),
		parquet.WithMaxRowGroupLength(20_000),
	)
	fw, err := pqarrow.NewFileWriter(
		schema, &failAfter{remaining: 512 << 10}, props, pqarrow.DefaultWriterProps())
	if err != nil {
		panic(err)
	}

	for b := range 32 {
		bld := array.NewRecordBuilder(alloc, schema)
		for r := range 25_000 {
			bld.Field(0).(*array.StringBuilder).Append(fmt.Sprintf("v-%d-%d", b, r%50))
			bld.Field(1).(*array.Int32Builder).Append(int32(b*25_000 + r))
		}
		rec := bld.NewRecordBatch()
		werr := fw.WriteBuffered(rec)
		rec.Release()
		bld.Release()
		if werr != nil {
			break
		}
	}

	fmt.Printf("Close error: %v\n", fw.Close())
	fmt.Printf("Leaked: %d bytes\n", alloc.CurrentAlloc())
	alloc.AssertSize(printT{}, 0)
}

type printT struct{}

func (printT) Helper()                      {}
func (printT) Errorf(f string, args ...any) { fmt.Printf(f+"\n", args...) }

Run with:

mkdir pqarrow-leak-repro && cd pqarrow-leak-repro
go mod init pqarrow-leak-repro
go get github.com/apache/arrow-go/v18@v18.5.2
# (paste main.go above)
ARROW_CHECKED_MAX_RETAINED_FRAMES=20 go run .

ARROW_CHECKED_MAX_RETAINED_FRAMES=20 is optional but strongly recommended — without it, the retained AssertSize frame for each leak is just (*Buffer).ResizeNoShrink, which doesn't reveal where the un-released state came from. With it, the call site is plain.

Replacing the failAfter with io.Discard returns Leaked: 0 — same code, working sink, no leak.

Expected behavior

CurrentAlloc() == 0 after FileWriter.Close() regardless of whether the underlying writer succeeded or failed. An error from Close() should not strand allocator-tracked buffers.

Actual behavior

Close error: short write
Leaked: 4718592 bytes
LEAK of 131072 bytes FROM
	github.com/apache/arrow-go/v18/arrow/memory.(*Buffer).ResizeNoShrink+4f
		arrow/memory/buffer.go:143
	github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).expandBuffer+2c
		parquet/internal/encoding/encoder.go:164
	github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).addIndex
		parquet/internal/encoding/encoder.go:226
	github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).Put+9c
		parquet/internal/encoding/encoder.go:301
	github.com/apache/arrow-go/v18/parquet/internal/encoding.(*typedDictEncoder[...]).Put
		parquet/internal/encoding/typed_encoder.go:145
	github.com/apache/arrow-go/v18/parquet/file.(*Int32ColumnChunkWriter).writeValues+ab
		parquet/file/column_writer_types.gen.go:183
	... (writeBatch / pqarrow / WriteBuffered)
[36 identical leak sites; total 36 × 131072 = 4 718 592 bytes]

The 36 leaks correspond to row groups whose rowGroupWriter.Close was started, errored on column 0's Close(), and never reached column 1's Close() — leaving column 1's dictEncoder.idxBuffer un-released. Production has also seen the same root cause surface as bufferedPageWriter.inMemSink BufferWriter leaks (rooted at serializedPageWriter.WriteDataPagebufferedPageWriter.WriteDataPagecolumnWriter.WriteDataPage), which fires when the un-closed column had triggered FallbackToPlain mid-write and accumulated post-fallback page bytes in its inMemSink. The reproducer above shows the dict-encoder surface; both surfaces share the same row-group early-return root cause.

Conditions

The leak fires when, in one row group's close:

  1. The writer is built with pqarrow.FileWriter.WriteBuffered (the buffered-row-group path — exercised by every path where all columns of a row group are buffered and flushed together at row-group close).
  2. The schema has multiple columns and the failing column's Close is not the last in iteration order.
  3. The underlying io.Writer errors during the failing column's bufferedPageWriter.Close write to the sink — early enough that rowGroupWriter.Close returns before reaching subsequent columns.

Defaults are otherwise sufficient — the only WriterProperties knob the reproducer sets is MaxRowGroupLength(20_000) to amplify the number of row groups closed during the run; the bug fires equally with default MaxRowGroupLength (int64.max) on a larger payload.

Suggested fix

Primary — rowGroupWriter.Close: drain every column unconditionally, capture the first error, return it after the loop.

func (rg *rowGroupWriter) Close() error {
    if !rg.closed {
        rg.closed = true
        if err := rg.checkRowsWritten(); err != nil {
            return err
        }

        var firstErr error
        for _, wr := range rg.columnWriters {
            if wr == nil {
                continue
            }
            if err := wr.Close(); err != nil {
                if firstErr == nil {
                    firstErr = err
                }
                continue                       // keep draining the rest
            }
            rg.bytesWritten += wr.TotalBytesWritten()
            rg.compressedBytesWritten += wr.TotalCompressedBytes()
        }
        rg.columnWriters = nil
        if firstErr != nil {
            return firstErr
        }
        rg.metadata.SetNumRows(rg.nrows)
        rg.metadata.Finish(rg.bytesWritten, rg.ordinal)
    }
    return nil
}

This is sufficient to close both leak surfaces in the buffered-row-group path: every column gets Close() called, which calls pager.Close()bufferedPageWriter.Close, whose own defer buf.Release() then releases the in-memory sink bytes; and the column's own defer (registered during its Close) releases currentEncoder.

Secondary — columnWriter.Close: register the cleanup defer before any fallible call (v18.5.2 lines 579-621).

func (w *columnWriter) Close() (err error) {
    if !w.closed {
        w.closed = true
        defer func() {
            w.defLevelSink.Reset(0)
            w.repLevelSink.Reset(0)
            if w.bitsBuffer != nil {
                w.bitsBuffer.Release()
                w.bitsBuffer = nil
            }
            if w.currentEncoder != nil {
                w.currentEncoder.Release()
                w.currentEncoder = nil
            }
        }()
        if w.hasDict && !w.fallbackToNonDict {
            if err = w.WriteDictionaryPage(); err != nil {
                return err
            }
        }
        if err = w.FlushBufferedDataPages(); err != nil {
            return err
        }
        // ... unchanged ...
    }
    return err
}

Currently the cleanup defer is registered after WriteDictionaryPage and FlushBufferedDataPages, both of which can return errors. In the buffered-row-group path those calls only mutate in-memory state and don't fail, so this contributes nothing to the reproducer above — but in the SerialRowGroupWriter path (where each WriteDataPage writes straight to the sink), the same defer-after-fallible-call pattern would strand currentEncoder directly. The nil-check on w.currentEncoder becomes load-bearing because the defer can now fire on paths where currentEncoder was never set.

Component(s)

Parquet

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions