Skip to content

Commit

Permalink
apacheGH-35775: [Go][Parquet] Allow key value file metadata to be wri…
Browse files Browse the repository at this point in the history
…tten after writing row groups (apache#37786)

### Rationale for this change

The key value file metadata may include information generated while writing row groups.  Currently, it is not possible to set the key value file metadata after creating a writer.  With the changes in this branch, key value pairs may be added any time before closing the writer.

### What changes are included in this PR?

This branch adds a `writer.AppendKeyValueMetadata(key, value)` method to the parquet `file.Writer` and to the `pqarrow.FileWriter`.

### Are these changes tested?

Tests are added for the new functionality.

### Are there any user-facing changes?

The `KeyValueMetadata` field on the parquet `file.Writer` has been renamed to `initialKeyValueMetadata`.  This is a breaking change.  Although the field was exported, setting it did not result in new key value metadata being written.  Instead, it represented the initial key value metadata if the writer was passed the `WithWriteMetadata` write option.

The `WithWriteMetadata` option can still be used to provide the initial key value metadata values.  In addition, the `AppendKeyValueMetadata` method can be called to add key value pairs after creating a writer.

The `FileMetadata` field on the parquet `file.Writer` has been removed.  Previously, setting this field value had no effect.

**This PR includes breaking changes to public APIs.** 

The `KeyValueMetadata` field is no longer exported from the parquet `file.Writer` struct.  Use the `WithWriteMetadata` writer option to set key value metadata when creating a writer or use the `AppendKeyValueMetadata` method to add key value metadata after creating a writer.

The `FileMetadata` field on the parquet `file.Writer` has been removed.
* Closes: apache#35775

Authored-by: Tim Schaub <tim@planet.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
tschaub authored and Jeremy Aguilon committed Oct 23, 2023
1 parent 78124ba commit f7058d6
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 25 deletions.
62 changes: 37 additions & 25 deletions go/parquet/file/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,24 @@ type Writer struct {

// The Schema of this writer
Schema *schema.Schema
// The current FileMetadata to write
FileMetadata *metadata.FileMetaData
// The current keyvalue metadata
KeyValueMetadata metadata.KeyValueMetadata
}

type WriteOption func(*Writer)
type writerConfig struct {
props *parquet.WriterProperties
keyValueMetadata metadata.KeyValueMetadata
}

type WriteOption func(*writerConfig)

func WithWriterProps(props *parquet.WriterProperties) WriteOption {
return func(w *Writer) {
w.props = props
return func(c *writerConfig) {
c.props = props
}
}

func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption {
return func(w *Writer) {
w.KeyValueMetadata = meta
return func(c *writerConfig) {
c.keyValueMetadata = meta
}
}

Expand All @@ -66,19 +67,23 @@ func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption {
// If props is nil, then the default Writer Properties will be used. If the key value metadata is not nil,
// it will be added to the file.
func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *Writer {
config := &writerConfig{}
for _, o := range opts {
o(config)
}
if config.props == nil {
config.props = parquet.NewWriterProperties()
}

fileSchema := schema.NewSchema(sc)
fw := &Writer{
props: config.props,
sink: &utils.TellWrapper{Writer: w},
open: true,
Schema: fileSchema,
}
for _, o := range opts {
o(fw)
}
if fw.props == nil {
fw.props = parquet.NewWriterProperties()
}
fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, fw.KeyValueMetadata)

fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, config.keyValueMetadata)
fw.startFile()
return fw
}
Expand Down Expand Up @@ -154,6 +159,11 @@ func (fw *Writer) startFile() {
}
}

// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
func (fw *Writer) AppendKeyValueMetadata(key string, value string) error {
return fw.metadata.AppendKeyValueMetadata(key, value)
}

// Close closes any open row group writer and writes the file footer. Subsequent
// calls to close will have no effect.
func (fw *Writer) Close() (err error) {
Expand All @@ -180,11 +190,12 @@ func (fw *Writer) Close() (err error) {

fileEncryptProps := fw.props.FileEncryptionProperties()
if fileEncryptProps == nil { // non encrypted file
if fw.FileMetadata, err = fw.metadata.Finish(); err != nil {
fileMetadata, err := fw.metadata.Finish()
if err != nil {
return err
}

_, err = writeFileMetadata(fw.FileMetadata, fw.sink)
_, err = writeFileMetadata(fileMetadata, fw.sink)
return err
}

Expand All @@ -193,12 +204,12 @@ func (fw *Writer) Close() (err error) {
return nil
}

func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (err error) {
func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) error {
// encrypted file with encrypted footer
if props.EncryptedFooter() {
fw.FileMetadata, err = fw.metadata.Finish()
fileMetadata, err := fw.metadata.Finish()
if err != nil {
return
return err
}

footerLen := int64(0)
Expand All @@ -211,7 +222,7 @@ func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (e

footerLen += n
footerEncryptor := fw.fileEncryptor.GetFooterEncryptor()
n, err = writeEncryptedFileMetadata(fw.FileMetadata, fw.sink, footerEncryptor, true)
n, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerEncryptor, true)
if err != nil {
return err
}
Expand All @@ -224,11 +235,12 @@ func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (e
return err
}
} else {
if fw.FileMetadata, err = fw.metadata.Finish(); err != nil {
return
fileMetadata, err := fw.metadata.Finish()
if err != nil {
return err
}
footerSigningEncryptor := fw.fileEncryptor.GetFooterSigningEncryptor()
if _, err = writeEncryptedFileMetadata(fw.FileMetadata, fw.sink, footerSigningEncryptor, false); err != nil {
if _, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerSigningEncryptor, false); err != nil {
return err
}
}
Expand Down
29 changes: 29 additions & 0 deletions go/parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/arrow/go/v14/parquet/internal/testutils"
"github.com/apache/arrow/go/v14/parquet/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -371,6 +372,34 @@ func TestAllNulls(t *testing.T) {
assert.Equal(t, []int16{0, 0, 0}, defLevels[:])
}

func TestKeyValueMetadata(t *testing.T) {
fields := schema.FieldList{
schema.NewInt32Node("unused", parquet.Repetitions.Optional, -1),
}
sc, _ := schema.NewGroupNode("root", parquet.Repetitions.Required, fields, -1)
sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)

writer := file.NewParquetWriter(sink, sc)

testKey := "testKey"
testValue := "testValue"
writer.AppendKeyValueMetadata(testKey, testValue)
writer.Close()

buffer := sink.Finish()
defer buffer.Release()
props := parquet.NewReaderProperties(memory.DefaultAllocator)
props.BufferedStreamEnabled = true

reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()), file.WithReadProps(props))
assert.NoError(t, err)

metadata := reader.MetaData()
got := metadata.KeyValueMetadata().FindValue(testKey)
require.NotNil(t, got)
assert.Equal(t, testValue, *got)
}

func createSerializeTestSuite(typ reflect.Type) suite.TestingSuite {
return &SerializeTestSuite{PrimitiveTypedTest: testutils.NewPrimitiveTypedTest(typ)}
}
Expand Down
5 changes: 5 additions & 0 deletions go/parquet/metadata/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder {
return f.currentRgBldr
}

// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
func (f *FileMetaDataBuilder) AppendKeyValueMetadata(key string, value string) error {
return f.kvmeta.Append(key, value)
}

// Finish will finalize the metadata of the number of rows, row groups,
// version etc. This will clear out this filemetadatabuilder so it can
// be re-used
Expand Down
35 changes: 35 additions & 0 deletions go/parquet/metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,41 @@ func TestKeyValueMetadata(t *testing.T) {
assert.True(t, faccessor.KeyValueMetadata().Equals(kvmeta))
}

func TestKeyValueMetadataAppend(t *testing.T) {
props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))

fields := schema.FieldList{
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
}
root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
require.NoError(t, err)
schema := schema.NewSchema(root)

kvmeta := metadata.NewKeyValueMetadata()
key1 := "test_key1"
value1 := "test_value1"
require.NoError(t, kvmeta.Append(key1, value1))

fbuilder := metadata.NewFileMetadataBuilder(schema, props, kvmeta)

key2 := "test_key2"
value2 := "test_value2"
require.NoError(t, fbuilder.AppendKeyValueMetadata(key2, value2))
faccessor, err := fbuilder.Finish()
require.NoError(t, err)

kv := faccessor.KeyValueMetadata()

got1 := kv.FindValue(key1)
require.NotNil(t, got1)
assert.Equal(t, value1, *got1)

got2 := kv.FindValue(key2)
require.NotNil(t, got2)
assert.Equal(t, value2, *got2)
}

func TestApplicationVersion(t *testing.T) {
version := metadata.NewAppVersion("parquet-mr version 1.7.9")
version1 := metadata.NewAppVersion("parquet-mr version 1.8.0")
Expand Down
45 changes: 45 additions & 0 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,51 @@ func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) {
}
}

func TestWriteKeyValueMetadata(t *testing.T) {
kv := map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
}

sc := arrow.NewSchema([]arrow.Field{
{Name: "int32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
}, nil)
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
defer bldr.Release()
for _, b := range bldr.Fields() {
b.AppendNull()
}

rec := bldr.NewRecord()
defer rec.Release()

props := parquet.NewWriterProperties(
parquet.WithVersion(parquet.V1_0),
)
var buf bytes.Buffer
fw, err := pqarrow.NewFileWriter(sc, &buf, props, pqarrow.DefaultWriterProps())
require.NoError(t, err)
err = fw.Write(rec)
require.NoError(t, err)

for key, value := range kv {
require.NoError(t, fw.AppendKeyValueMetadata(key, value))
}

err = fw.Close()
require.NoError(t, err)

reader, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)

for key, value := range kv {
got := reader.MetaData().KeyValueMetadata().FindValue(key)
require.NotNil(t, got)
assert.Equal(t, value, *got)
}
}

func TestWriteEmptyLists(t *testing.T) {
sc := arrow.NewSchema([]arrow.Field{
{Name: "f1", Type: arrow.ListOf(arrow.FixedWidthTypes.Date32)},
Expand Down
5 changes: 5 additions & 0 deletions go/parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error {
return nil
}

// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error {
return fw.wr.AppendKeyValueMetadata(key, value)
}

// Close flushes out the data and closes the file. It can be called multiple times,
// subsequent calls after the first will have no effect.
func (fw *FileWriter) Close() error {
Expand Down

0 comments on commit f7058d6

Please sign in to comment.