Skip to content

Commit

Permalink
PARQUET-2323: [C++] Use bitmap to store pre-buffered column chunks (#…
Browse files Browse the repository at this point in the history
…36649)

### Rationale for this change

In https://issues.apache.org/jira/browse/PARQUET-2316 we allow partial buffer in parquet File Reader by storing prebuffered column chunk index in a hash set, and make a copy of this hash set for each rowgroup reader.

In extreme conditions where numerous columns are prebuffered and multiple rowgroup readers are created for the same row group , the hash set would incur significant overhead. 

Using a bitmap instead (with one bit per column chunk indicating whether it's prebuffered or not) would be a reasonsable mitigation, taking 4KB for 32K columns.

### What changes are included in this PR?

Switch from a hash set to a bitmap buffer.

### Are these changes tested?

Yes, passed unit tests on partial prebuffer.

### Are there any user-facing changes?

No.

Lead-authored-by: jp0317 <zjpzlz@gmail.com>
Co-authored-by: Jinpeng <zjpzlz@163.com>
Co-authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
2 people authored and raulcd committed Jul 28, 2023
1 parent f0f27d7 commit 7ea78a9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
4 changes: 2 additions & 2 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2413,9 +2413,9 @@ TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) {

ASSERT_EQ(2, reader->num_row_groups());

// Pre-buffer 3 columns in the 2nd row group.
// Pre-buffer column 0 and column 3 in the 2nd row group.
const std::vector<int> row_groups = {1};
const std::vector<int> column_indices = {0, 1, 4};
const std::vector<int> column_indices = {0, 3};
reader->parquet_reader()->PreBuffer(row_groups, column_indices,
::arrow::io::IOContext(),
::arrow::io::CacheOptions::Defaults());
Expand Down
33 changes: 19 additions & 14 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/io/caching.h"
#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/int_util_overflow.h"
Expand Down Expand Up @@ -179,15 +180,15 @@ class SerializedRowGroup : public RowGroupReader::Contents {
std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source,
int64_t source_size, FileMetaData* file_metadata,
int row_group_number, const ReaderProperties& props,
std::unordered_set<int> prebuffered_column_chunks,
std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap,
std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr)
: source_(std::move(source)),
cached_source_(std::move(cached_source)),
source_size_(source_size),
file_metadata_(file_metadata),
properties_(props),
row_group_ordinal_(row_group_number),
prebuffered_column_chunks_(std::move(prebuffered_column_chunks)),
prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)),
file_decryptor_(file_decryptor) {
row_group_metadata_ = file_metadata->RowGroup(row_group_number);
}
Expand All @@ -203,8 +204,8 @@ class SerializedRowGroup : public RowGroupReader::Contents {
::arrow::io::ReadRange col_range =
ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i);
std::shared_ptr<ArrowInputStream> stream;
if (cached_source_ &&
prebuffered_column_chunks_.find(i) != prebuffered_column_chunks_.end()) {
if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr &&
::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) {
// PARQUET-1698: if read coalescing is enabled, read from pre-buffered
// segments.
PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
Expand Down Expand Up @@ -272,7 +273,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
std::unique_ptr<RowGroupMetaData> row_group_metadata_;
ReaderProperties properties_;
int row_group_ordinal_;
const std::unordered_set<int> prebuffered_column_chunks_;
const std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
};

Expand Down Expand Up @@ -302,17 +303,17 @@ class SerializedFile : public ParquetFileReader::Contents {
}

std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
std::unordered_set<int> prebuffered_column_chunks;
// Avoid updating the map as this function can be called concurrently. The map can
// only be updated within Prebuffer().
std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap;
// Avoid updating the bitmap as this function can be called concurrently. The bitmap
// can only be updated within Prebuffer().
auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i);
if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) {
prebuffered_column_chunks = prebuffered_column_chunks_iter->second;
prebuffered_column_chunks_bitmap = prebuffered_column_chunks_iter->second;
}

std::unique_ptr<SerializedRowGroup> contents = std::make_unique<SerializedRowGroup>(
source_, cached_source_, source_size_, file_metadata_.get(), i, properties_,
std::move(prebuffered_column_chunks), file_decryptor_);
std::move(prebuffered_column_chunks_bitmap), file_decryptor_);
return std::make_shared<RowGroupReader>(std::move(contents));
}

Expand Down Expand Up @@ -366,9 +367,12 @@ class SerializedFile : public ParquetFileReader::Contents {
std::vector<::arrow::io::ReadRange> ranges;
prebuffered_column_chunks_.clear();
for (int row : row_groups) {
std::unordered_set<int>& prebuffered = prebuffered_column_chunks_[row];
std::shared_ptr<Buffer>& col_bitmap = prebuffered_column_chunks_[row];
int num_cols = file_metadata_->num_columns();
PARQUET_THROW_NOT_OK(
AllocateEmptyBitmap(num_cols, properties_.memory_pool()).Value(&col_bitmap));
for (int col : column_indices) {
prebuffered.insert(col);
::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col);
ranges.push_back(
ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col));
}
Expand Down Expand Up @@ -578,8 +582,9 @@ class SerializedFile : public ParquetFileReader::Contents {
ReaderProperties properties_;
std::shared_ptr<PageIndexReader> page_index_reader_;
std::unique_ptr<BloomFilterReader> bloom_filter_reader_;
// Maps a row group to its column chunks that are cached via Prebuffer().
std::unordered_map<int, std::unordered_set<int>> prebuffered_column_chunks_;
// Maps row group ordinal and prebuffer status of its column chunks in the form of a
// bitmap buffer.
std::unordered_map<int, std::shared_ptr<Buffer>> prebuffered_column_chunks_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;

// \return The true length of the metadata in bytes
Expand Down

0 comments on commit 7ea78a9

Please sign in to comment.