Skip to content

Commit

Permalink
ARROW-8127: [C++] [Parquet] Incorrect column chunk metadata for multi…
Browse files Browse the repository at this point in the history
…page batch writes

For buffered column writers and non-dictionary encoded columns, Parquet column chunks that span more than one page get the wrong data_page_offset recorded in the column chunk metadata.  This causes the file to be unreadable and unscannable. (Some more info and a test case [here](https://issues.apache.org/jira/browse/ARROW-8127).)

This patch fixes the error by setting the metadata values from information in the underlying ("final") stream sink.  It reorders but retains similar logic for dictionary page offsets introduced in [PARQUET-1706](#5922).

Closes #6637 from tpboudreau/ARROW-8127

Authored-by: TP Boudreau <tpboudreau@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
tpboudreau authored and wesm committed Mar 18, 2020
1 parent 76fd44c commit 774a9a4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class SerializedPageWriter : public PageWriter {
// TODO(PARQUET-594) crc checksum

PARQUET_ASSIGN_OR_THROW(int64_t start_pos, sink_->Tell());
if (data_page_offset_ == 0) {
if (page_ordinal_ == 0) {
data_page_offset_ = start_pos;
}

Expand Down
53 changes: 53 additions & 0 deletions cpp/src/parquet/file_serialize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,59 @@ TEST(TestBufferedRowGroupWriter, DisabledDictionary) {
ASSERT_FALSE(rg_reader->metadata()->ColumnChunk(0)->has_dictionary_page());
}

TEST(TestBufferedRowGroupWriter, MultiPageDisabledDictionary) {
const int VALUE_COUNT = 10000;
const int PAGE_SIZE = 16384;
auto sink = CreateOutputStream();
auto writer_props = parquet::WriterProperties::Builder()
.disable_dictionary()
->data_pagesize(PAGE_SIZE)
->build();
schema::NodeVector fields;
fields.push_back(
PrimitiveNode::Make("col", parquet::Repetition::REQUIRED, parquet::Type::INT32));
auto schema = std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, fields));
auto file_writer = parquet::ParquetFileWriter::Open(sink, schema, writer_props);
auto rg_writer = file_writer->AppendBufferedRowGroup();
auto col_writer = static_cast<Int32Writer*>(rg_writer->column(0));
std::vector<int32_t> values_in;
for (int i = 0; i < VALUE_COUNT; ++i) {
values_in.push_back((i % 100) + 1);
}
col_writer->WriteBatch(VALUE_COUNT, nullptr, nullptr, values_in.data());
rg_writer->Close();
file_writer->Close();
PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());

auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);
auto file_metadata = file_reader->metadata();
ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
std::vector<int32_t> values_out(VALUE_COUNT);
for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
auto rg_reader = file_reader->RowGroup(r);
ASSERT_EQ(1, rg_reader->metadata()->num_columns());
ASSERT_EQ(VALUE_COUNT, rg_reader->metadata()->num_rows());
int64_t total_values_read = 0;
std::shared_ptr<parquet::ColumnReader> col_reader;
ASSERT_NO_THROW(col_reader = rg_reader->Column(0));
parquet::Int32Reader* int32_reader =
static_cast<parquet::Int32Reader*>(col_reader.get());
int64_t vn = VALUE_COUNT;
int32_t* vx = values_out.data();
while (int32_reader->HasNext()) {
int64_t values_read;
int32_reader->ReadBatch(vn, nullptr, nullptr, vx, &values_read);
vn -= values_read;
vx += values_read;
total_values_read += values_read;
}
ASSERT_EQ(VALUE_COUNT, total_values_read);
ASSERT_EQ(values_in, values_out);
}
}

} // namespace test

} // namespace parquet

0 comments on commit 774a9a4

Please sign in to comment.