Skip to content

Commit

Permalink
ARROW-17453: [Go][C++][Parquet] Inconsistent Data with Repetition Lev…
Browse files Browse the repository at this point in the history
…els (#13982)

Both the C++ and Go parquet implementations assumed that if the max repetition level was 0, that there were no bytes to be skipped when initializing decoders for `DataPageV2` but the Parquet files produced by Athena in this case had repetition bytes to be skipped before getting the definition level bytes. Since the byte wasn't skipped, the wrong values were decoded for Definition levels.

In the case of the Go implementation, it made additional assumptions that proved to be incorrect on top of the same bug.

This fixes both of them to properly respect the repetition level byte length reported in the DataPageV2 header.

Authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade committed Aug 26, 2022
1 parent ae8ceb6 commit 1b9c57e
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 18 deletions.
5 changes: 4 additions & 1 deletion cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,11 @@ class ColumnReaderImplBase {
repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(),
max_rep_level_,
static_cast<int>(num_buffered_values_), buffer);
buffer += page.repetition_levels_byte_length();
}
// ARROW-17453: Even if max_rep_level_ is 0, there may still be
// repetition level bytes written and/or reported in the header by
// some writers (e.g. Athena)
buffer += page.repetition_levels_byte_length();

if (max_def_level_ > 0) {
definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(),
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,39 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
ParquetException);
}

// Repetition level byte length reported in Page but Max Repetition level
// is zero for the column.
TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {
constexpr int batch_size = 4;
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
// Bytes here came from the example parquet file in ARROW-17453's int32
// column which was delta bit-packed. The key part is the first three
// bytes: the page header reports 1 byte for repetition levels even
// though the max rep level is 0. If that byte isn't skipped then
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0].
const std::vector<uint8_t> page_data{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0};

std::shared_ptr<DataPageV2> data_page =
std::make_shared<DataPageV2>(Buffer::Wrap(page_data.data(), page_data.size()), 4, 1,
4, Encoding::DELTA_BINARY_PACKED, 2, 1, 21);

pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<Int32Reader*>(reader_.get());
int16_t def_levels_out[batch_size];
int32_t values[batch_size];
int64_t values_read;
ASSERT_TRUE(reader->HasNext());
EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*replevels=*/nullptr,
values, &values_read));
EXPECT_EQ(3, values_read);
}

// Page claims to have two values but only 1 is present.
TEST_F(TestPrimitiveReader, TestReadValuesMissingWithDictionary) {
constexpr int batch_size = 1;
Expand Down
6 changes: 5 additions & 1 deletion go/parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,12 @@ func (c *columnChunkReader) initLevelDecodersV2(page *DataPageV2) (int64, error)

if c.descr.MaxRepetitionLevel() > 0 {
c.repetitionDecoder.SetDataV2(page.repLvlByteLen, c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
buf = buf[page.repLvlByteLen:]
}
// ARROW-17453: Some writers will write repetition levels even when
// the max repetition level is 0, so we should respect the value
// in the page header regardless of whether MaxRepetitionLevel is 0
// or not.
buf = buf[page.repLvlByteLen:]

if c.descr.MaxDefinitionLevel() > 0 {
c.definitionDecoder.SetDataV2(page.defLvlByteLen, c.descr.MaxDefinitionLevel(), int(c.numBuffered), buf)
Expand Down
34 changes: 34 additions & 0 deletions go/parquet/file/column_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,40 @@ func (p *PrimitiveReaderSuite) TestInt32FlatRequiredSkip() {
})
}

func (p *PrimitiveReaderSuite) TestRepetitionLvlBytesWithMaxRepZero() {
const batchSize = 4
p.maxDefLvl = 1
p.maxRepLvl = 0
typ := schema.NewInt32Node("a", parquet.Repetitions.Optional, -1)
descr := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
// Bytes here came from the example parquet file in ARROW-17453's int32
// column which was delta bit-packed. The key part is the first three
// bytes: the page header reports 1 byte for repetition levels even
// though the max rep level is 0. If that byte isn't skipped then
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1, 0].
pageData := [...]byte{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}

p.pages = append(p.pages, file.NewDataPageV2(memory.NewBufferBytes(pageData[:]), batchSize, 1, batchSize,
parquet.Encodings.DeltaBinaryPacked, 2, 1, int32(len(pageData)), false))

p.initReader(descr)
p.NotPanics(func() { p.reader.HasNext() })

var (
values [4]int32
defLvls [4]int16
)
i32Rdr := p.reader.(*file.Int32ColumnChunkReader)
total, read, err := i32Rdr.ReadBatch(batchSize, values[:], defLvls[:], nil)
p.NoError(err)
p.EqualValues(batchSize, total)
p.EqualValues(3, read)
p.Equal([]int16{1, 1, 1, 0}, defLvls[:])
p.Equal([]int32{12, 11, 13, 0}, values[:])
}

func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages() {
p.maxDefLvl = 0
p.maxRepLvl = 0
Expand Down
15 changes: 6 additions & 9 deletions go/parquet/file/page_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,30 +573,27 @@ func (p *serializedPageReader) Next() bool {
return false
}

var pagebuf *memory.Buffer
if compressed {
if levelsBytelen > 0 {
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
}
var data []byte
if data, p.err = p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
if _, p.err = p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
return false
}
pagebuf = memory.NewBufferBytes(data)
} else {
io.ReadFull(p.r, buf.Bytes())
pagebuf = buf
pagebuf.Retain()
}
if pagebuf.Len() != lenUncompressed {
p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, pagebuf.Len())
buf.Retain()

if buf.Len() != lenUncompressed {
p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, buf.Len())
return false
}

// make datapage v2
p.curPage = &DataPageV2{
page: page{
buf: pagebuf,
buf: buf,
typ: p.curPageHdr.Type,
nvals: dataHeader.GetNumValues(),
encoding: dataHeader.GetEncoding(),
Expand Down
10 changes: 3 additions & 7 deletions go/parquet/internal/encoding/delta_bit_packing.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type deltaBitPackDecoder struct {
deltaBitWidths *memory.Buffer
deltaBitWidth byte

lastVal int64
totalValues uint64
lastVal int64
}

// returns the number of bytes read so far
Expand Down Expand Up @@ -85,15 +86,10 @@ func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error {
return xerrors.New("parquet: eof exception")
}

var totalValues uint64
if totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
if d.totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
return xerrors.New("parquet: eof exception")
}

if int(totalValues) != d.nvals {
return xerrors.New("parquet: mismatch between number of values and count in data header")
}

if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
return xerrors.New("parquet: eof exception")
}
Expand Down

0 comments on commit 1b9c57e

Please sign in to comment.