Skip to content

Commit

Permalink
apacheGH-36318: [Go] only decode lengths for the number of existing v…
Browse files Browse the repository at this point in the history
…alues, not for all nvalues. (apache#36322)

### Rationale for this change

Fixes issue 36318.
DeltaLengthBinaryArray Encoding fails to handle null values.

### What changes are included in this PR?

Instead of decoding lengths for "all" values (even the undefined ones), we only decode the lengths for the actually set values.
The Go Version of arrow was unable to read parquet files it produced itself if the mentioned encoding was used but the values contain nulls.

### Are these changes tested?

Tests are included.

### Are there any user-facing changes?

No.

**This PR contains a "Critical Fix".**
* Closes: apache#36318

Authored-by: Justin Heesemann <jheesemann@argo.ai>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
convoi authored and chelseajonesr committed Jul 20, 2023
1 parent 71ce494 commit 93d7a2c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 3 deletions.
52 changes: 52 additions & 0 deletions go/parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,55 @@ func TestIncompleteMetadata(t *testing.T) {
_, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
assert.Error(t, err)
}

func TestDeltaLengthByteArrayPackingWithNulls(t *testing.T) {
// produce file with DeltaLengthByteArray Encoding with mostly null values but one actual value.
root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, schema.FieldList{
schema.NewByteArrayNode("byte_array_col", parquet.Repetitions.Optional, -1),
}, -1)
props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST),
parquet.WithEncoding(parquet.Encodings.DeltaLengthByteArray), parquet.WithDictionaryDefault(false))
sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)

writer := file.NewParquetWriter(sink, root, file.WithWriterProps(props))
rgw := writer.AppendRowGroup()
ccw, err := rgw.NextColumn()
assert.NoError(t, err)
const elements = 500
data := make([]parquet.ByteArray, elements)
data[0] = parquet.ByteArray{1, 2, 3, 4, 5, 6, 7, 8}

defLvls := make([]int16, elements)
repLvls := make([]int16, elements)
defLvls[0] = 1

_, err = ccw.(*file.ByteArrayColumnChunkWriter).WriteBatch(data, defLvls, repLvls)
assert.NoError(t, err)
assert.NoError(t, ccw.Close())
assert.NoError(t, rgw.Close())
assert.NoError(t, writer.Close())
buf := sink.Finish()
defer buf.Release()

// read file back in
reader, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
assert.NoError(t, err)
defer reader.Close()
ccr, err := reader.RowGroup(0).Column(0)
assert.NoError(t, err)
const batchSize = 500

for ccr.HasNext() {
readData := make([]parquet.ByteArray, batchSize)
readdevLvls := make([]int16, batchSize)
readrepLvls := make([]int16, batchSize)
cr := ccr.(*file.ByteArrayColumnChunkReader)

total, read, err := cr.ReadBatch(batchSize, readData, readdevLvls, readrepLvls)
assert.NoError(t, err)
assert.Equal(t, int64(batchSize), total)
assert.Equal(t, 1, read)
assert.Equal(t, data[0], readData[0])
assert.NotNil(t, readData[0])
}
}
4 changes: 2 additions & 2 deletions go/parquet/internal/encoding/delta_bit_packing.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (d *DeltaBitPackInt32Decoder) unpackNextMini() error {
// Decode retrieves min(remaining values, len(out)) values from the data and returns the number
// of values actually decoded and any errors encountered.
func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) {
max := shared_utils.MinInt(len(out), d.nvals)
max := shared_utils.MinInt(len(out), int(d.totalValues))
if max == 0 {
return 0, nil
}
Expand Down Expand Up @@ -315,7 +315,7 @@ const (
// Consists of a header followed by blocks of delta encoded values binary packed.
//
// Format
// [header] [block 1] [block 2] ... [block N]
// [header] [block 1] [block 2] ... [block N]
//
// Header
// [block size] [number of mini blocks per block] [total value count] [first value]
Expand Down
47 changes: 47 additions & 0 deletions go/parquet/internal/encoding/delta_byte_array_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoding

import (
"fmt"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/parquet"
"github.com/stretchr/testify/assert"
"testing"
)

func TestDeltaByteArrayDecoder_SetData(t *testing.T) {
tests := []struct {
name string
nvalues int
data []byte
wantErr assert.ErrorAssertionFunc
}{
{
name: "null only page",
nvalues: 126609,
data: []byte{128, 1, 4, 0, 0},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
d := NewDecoder(parquet.Types.ByteArray, parquet.Encodings.DeltaLengthByteArray, nil, memory.DefaultAllocator)
t.Run(tt.name, func(t *testing.T) {
tt.wantErr(t, d.SetData(tt.nvalues, tt.data), fmt.Sprintf("SetData(%v, %v)", tt.nvalues, tt.data))
})
}
}
2 changes: 1 addition & 1 deletion go/parquet/internal/encoding/delta_length_byte_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (d *DeltaLengthByteArrayDecoder) SetData(nvalues int, data []byte) error {
if err := dec.SetData(nvalues, data); err != nil {
return err
}
d.lengths = make([]int32, nvalues)
d.lengths = make([]int32, dec.totalValues)
dec.Decode(d.lengths)

return d.decoder.SetData(nvalues, data[int(dec.bytesRead()):])
Expand Down

0 comments on commit 93d7a2c

Please sign in to comment.