Skip to content

Commit

Permalink
apacheGH-34106: [C++][Parquet] Fix updating page stats for WriteArrow…
Browse files Browse the repository at this point in the history
…Dictionary (apache#34107)

### Rationale for this change

`ColumnWriter::WriteArrowDictionary` has tried to update stats but has problem if a single write has been split into batches and more than one page is written.

### What changes are included in this PR?

Make sure every write of batch has updated the stats.

### Are these changes tested?

Add test case which fails without the fix.

### Are there any user-facing changes?

No.
* Closes: apache#34106

Authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
wgtmac authored and fatemehp committed Feb 24, 2023
1 parent 8afecfa commit 3eebcfc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 45 deletions.
34 changes: 34 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4243,6 +4243,40 @@ TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) {
ASSERT_EQ(stats1->EncodeMin(), "b");
ASSERT_EQ(stats0->EncodeMax(), "b");
ASSERT_EQ(stats1->EncodeMax(), "c");

// Check page statistics
const auto expected_page_type =
GetParquetDataPageVersion() == ParquetDataPageVersion::V1 ? PageType::DATA_PAGE
: PageType::DATA_PAGE_V2;
auto rg0_page_reader = parquet_reader->RowGroup(0)->GetColumnPageReader(0);
ASSERT_EQ(PageType::DICTIONARY_PAGE, rg0_page_reader->NextPage()->type());
const std::vector<std::string> rg0_min_values = {"a", "a", "a"};
const std::vector<std::string> rg0_max_values = {"a", "a", "b"};
for (int i = 0; i < 3; ++i) {
auto page = rg0_page_reader->NextPage();
ASSERT_EQ(expected_page_type, page->type());
auto data_page = std::static_pointer_cast<DataPage>(page);
ASSERT_EQ(3, data_page->num_values());
const auto& stats = data_page->statistics();
EXPECT_EQ(1, stats.null_count);
EXPECT_EQ(rg0_min_values[i], stats.min());
EXPECT_EQ(rg0_max_values[i], stats.max());
}
ASSERT_EQ(rg0_page_reader->NextPage(), nullptr);

auto rg1_page_reader = parquet_reader->RowGroup(1)->GetColumnPageReader(0);
ASSERT_EQ(PageType::DICTIONARY_PAGE, rg1_page_reader->NextPage()->type());
{
auto page = rg1_page_reader->NextPage();
ASSERT_EQ(expected_page_type, page->type());
auto data_page = std::static_pointer_cast<DataPage>(page);
ASSERT_EQ(3, data_page->num_values());
const auto& stats = data_page->statistics();
EXPECT_EQ(1, stats.null_count);
EXPECT_EQ("b", stats.min());
EXPECT_EQ("c", stats.max());
}
ASSERT_EQ(rg1_page_reader->NextPage(), nullptr);
}

// ----------------------------------------------------------------------
Expand Down
81 changes: 36 additions & 45 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,39 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
std::shared_ptr<::arrow::Array> dictionary = data.dictionary();
std::shared_ptr<::arrow::Array> indices = data.indices();

auto update_stats = [&](int64_t num_chunk_levels,
const std::shared_ptr<Array>& chunk_indices) {
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
// of the values array to a (probably not smaller) referenced values array.
//
// Once the MinMax kernel supports all data types we should use that kernel instead
// as it does not make any copies.
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
exec_ctx.set_use_threads(false);

std::shared_ptr<::arrow::Array> referenced_dictionary;
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*chunk_indices, &exec_ctx));

// On first run, we might be able to re-use the existing dictionary
if (referenced_indices.length() == dictionary->length()) {
referenced_dictionary = dictionary;
} else {
PARQUET_ASSIGN_OR_THROW(
::arrow::Datum referenced_dictionary_datum,
::arrow::compute::Take(dictionary, referenced_indices,
::arrow::compute::TakeOptions(/*boundscheck=*/false),
&exec_ctx));
referenced_dictionary = referenced_dictionary_datum.make_array();
}

int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count();
page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
};

int64_t value_offset = 0;
auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) {
int64_t batch_num_values = 0;
Expand All @@ -1498,6 +1531,9 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
AddIfNotNull(rep_levels, offset));
std::shared_ptr<Array> writeable_indices =
indices->Slice(value_offset, batch_num_spaced_values);
if (page_statistics_) {
update_stats(/*num_chunk_levels=*/batch_size, writeable_indices);
}
PARQUET_ASSIGN_OR_THROW(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
Expand All @@ -1506,43 +1542,6 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
value_offset += batch_num_spaced_values;
};

auto update_stats = [&]() {
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
// of the values array to a (probably not smaller) referenced values array.
//
// Once the MinMax kernel supports all data types we should use that kernel instead
// as it does not make any copies.
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
exec_ctx.set_use_threads(false);

std::shared_ptr<::arrow::Array> referenced_dictionary;
// If dictionary is the same dictionary we already have, just use that
if (preserved_dictionary_ && preserved_dictionary_ == dictionary) {
referenced_dictionary = preserved_dictionary_;
} else {
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*indices, &exec_ctx));

// On first run, we might be able to re-use the existing dictionary
if (referenced_indices.length() == dictionary->length()) {
referenced_dictionary = dictionary;
} else {
PARQUET_ASSIGN_OR_THROW(
::arrow::Datum referenced_dictionary_datum,
::arrow::compute::Take(dictionary, referenced_indices,
::arrow::compute::TakeOptions(/*boundscheck=*/false),
&exec_ctx));
referenced_dictionary = referenced_dictionary_datum.make_array();
}
}

int64_t non_null_count = indices->length() - indices->null_count();
page_statistics_->IncrementNullCount(num_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
};

// Handle seeing dictionary for the first time
if (!preserved_dictionary_) {
// It's a new dictionary. Call PutDictionary and keep track of it
Expand All @@ -1556,19 +1555,11 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
return WriteDense();
}

if (page_statistics_ != nullptr) {
update_stats();
}
preserved_dictionary_ = dictionary;
} else if (!dictionary->Equals(*preserved_dictionary_)) {
// Dictionary has changed
PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
return WriteDense();
} else {
// Dictionary is same, but we need to update stats
if (page_statistics_ != nullptr) {
update_stats();
}
}

PARQUET_CATCH_NOT_OK(
Expand Down

0 comments on commit 3eebcfc

Please sign in to comment.