Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions parquet/pqarrow/column_readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,8 +904,8 @@ func bigEndianToDecimal256(buf []byte) (decimal256.Num, error) {
result := initWord
if len(buf) > 0 {
// incorporate the actual values if present
// shift left enough bits to make room for the incoming int64
result = result << uint64(wordLen)
// shift left enough bits to make room for the incoming bytes
result = result << uint64(wordLen*8)
// preserve the upper bits by inplace OR-ing the int64
result |= uint64FromBigEndianShifted(word)
}
Expand Down
91 changes: 91 additions & 0 deletions parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package pqarrow_test
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math"
"math/big"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -1229,6 +1231,95 @@ func (ps *ParquetIOTestSuite) TestReadDecimal256() {
ps.Truef(array.Equal(expected, chunked.Chunk(0)), "expected: %s\ngot: %s", expected, chunked.Chunk(0))
}

// TestReadDecimal256PartialWord verifies that bigEndianToDecimal256 correctly
// handles byte arrays whose length is not a multiple of 8, exercising the
// partial-word sign-extension path. This is a regression test for a bug where
// the shift was by wordLen (bytes) instead of wordLen*8 (bits).
func (ps *ParquetIOTestSuite) TestReadDecimal256PartialWord() {
// decimal256ToBigEndian converts a big.Int to big-endian two's complement
// bytes truncated to byteWidth, simulating what the Parquet encoder writes.
decimal256ToBigEndian := func(bi *big.Int, byteWidth int) []byte {
num := decimal256.FromBigInt(bi)
vals := num.Array()
var full [32]byte
binary.BigEndian.PutUint64(full[0:], vals[3])
binary.BigEndian.PutUint64(full[8:], vals[2])
binary.BigEndian.PutUint64(full[16:], vals[1])
binary.BigEndian.PutUint64(full[24:], vals[0])
return append([]byte{}, full[32-byteWidth:]...)
}

// maxForPrecision returns 10^precision - 1 as a *big.Int.
maxForPrecision := func(precision int) *big.Int {
v := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(precision)), nil)
return v.Sub(v, big.NewInt(1))
}

// One precision per partial-word remainder (1-7), plus remainder 0 as sanity.
// precision : DecimalSize : byteWidth % 8
// 39 : 17 : 1 41 : 18 : 2 45 : 19 : 3 47 : 20 : 4
// 49 : 21 : 5 51 : 22 : 6 53 : 23 : 7 76 : 32 : 0
precisions := []int32{39, 41, 45, 47, 49, 51, 53, 76}

for _, precision := range precisions {
byteWidth := int(pqarrow.DecimalSize(precision))
maxVal := maxForPrecision(int(precision))
minVal := new(big.Int).Neg(maxVal)

tests := []struct {
name string
val *big.Int
}{
{"max_positive", maxVal},
{"max_negative", minVal},
{"zero", big.NewInt(0)},
{"minus_one", big.NewInt(-1)},
}

for _, tt := range tests {
ps.Run(fmt.Sprintf("p%d_bw%d_r%d/%s", precision, byteWidth, byteWidth%8, tt.name), func() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)

bigEndian := []parquet.ByteArray{decimal256ToBigEndian(tt.val, byteWidth)}

bldr := array.NewDecimal256Builder(mem, &arrow.Decimal256Type{Precision: precision, Scale: 0})
defer bldr.Release()
bldr.Append(decimal256.FromBigInt(tt.val))
expected := bldr.NewDecimal256Array()
defer expected.Release()

sc := schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{
schema.Must(schema.NewPrimitiveNodeLogical("decimals", parquet.Repetitions.Required,
schema.NewDecimalLogicalType(precision, 0), parquet.Types.ByteArray, -1, -1)),
}, -1))

sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
writer := file.NewParquetWriter(sink, sc)
rgw := writer.AppendRowGroup()
cw, _ := rgw.NextColumn()
cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil)
cw.Close()
rgw.Close()
writer.Close()

rdr := ps.createReader(mem, sink.Bytes())
cr, err := rdr.GetColumn(context.TODO(), 0)
ps.Require().NoError(err)

chunked, err := cr.NextBatch(smallSize)
ps.Require().NoError(err)
defer chunked.Release()

ps.Require().Len(chunked.Chunks(), 1)
ps.Truef(array.Equal(expected, chunked.Chunk(0)),
"expected: %s\ngot: %s", expected, chunked.Chunk(0))
})
}
}
}

func (ps *ParquetIOTestSuite) TestReadNestedStruct() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)
Expand Down
Loading