Skip to content

Commit

Permalink
apacheGH-39122: [C++][Parquet] Optimize FLBA record reader (apache#39124
Browse files Browse the repository at this point in the history
)

### Rationale for this change

The FLBA implementation of RecordReader is suboptimal:
* it doesn't preallocate the output array
* it reads the decoded validity bitmap one bit at a time and recreates it, one bit at a time

### What changes are included in this PR?

Optimize the FLBA implementation of RecordReader so as to avoid the aforementioned inefficiencies.
I did a quick-and-dirty benchmark on a Parquet file with two columns:
* column 1: uncompressed, PLAIN-encoded, FLBA<3> with no nulls
* column 2: uncompressed, PLAIN-encoded, FLBA<3> with 25% nulls

With git main, the file can be read at 465 MB/s. With this PR, the file can be read at 700 MB/s.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: apache#39122

Lead-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou authored and dgreiss committed Feb 17, 2024
1 parent 7366f8f commit 4b730da
Showing 1 changed file with 50 additions and 20 deletions.
70 changes: 50 additions & 20 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <vector>

#include "arrow/array.h"
#include "arrow/array/array_binary.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_primitive.h"
Expand Down Expand Up @@ -2040,23 +2041,29 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
LevelInfo leaf_info_;
};

class FLBARecordReader : public TypedRecordReader<FLBAType>,
virtual public BinaryRecordReader {
class FLBARecordReader final : public TypedRecordReader<FLBAType>,
virtual public BinaryRecordReader {
public:
FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
: TypedRecordReader<FLBAType>(descr, leaf_info, pool, read_dense_for_nullable),
builder_(nullptr) {
byte_width_(descr_->type_length()),
empty_(byte_width_, 0),
type_(::arrow::fixed_size_binary(byte_width_)),
null_bitmap_builder_(pool),
data_builder_(pool) {
ARROW_DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
int byte_width = descr_->type_length();
std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
builder_ = std::make_unique<::arrow::FixedSizeBinaryBuilder>(type, this->pool_);
}

::arrow::ArrayVector GetBuilderChunks() override {
std::shared_ptr<::arrow::Array> chunk;
PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
return ::arrow::ArrayVector({chunk});
const int64_t null_count = null_bitmap_builder_.false_count();
const int64_t length = null_bitmap_builder_.length();
ARROW_DCHECK_EQ(length * byte_width_, data_builder_.length());
PARQUET_ASSIGN_OR_THROW(auto data_buffer, data_builder_.Finish());
PARQUET_ASSIGN_OR_THROW(auto null_bitmap, null_bitmap_builder_.Finish());
auto chunk = std::make_shared<::arrow::FixedSizeBinaryArray>(
type_, length, data_buffer, null_bitmap, null_count);
return ::arrow::ArrayVector({std::move(chunk)});
}

void ReadValuesDense(int64_t values_to_read) override {
Expand All @@ -2065,9 +2072,9 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
this->current_decoder_->Decode(values, static_cast<int>(values_to_read));
CheckNumberDecoded(num_decoded, values_to_read);

for (int64_t i = 0; i < num_decoded; i++) {
PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
}
PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
UnsafeAppendDense(values, num_decoded);
ResetValues();
}

Expand All @@ -2081,22 +2088,45 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
valid_bits, valid_bits_offset);
ARROW_DCHECK_EQ(num_decoded, values_to_read);

PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
if (null_count == 0) {
UnsafeAppendDense(values, num_decoded);
} else {
UnsafeAppendSpaced(values, num_decoded, valid_bits, valid_bits_offset);
}
ResetValues();
}

void UnsafeAppendDense(const FLBA* values, int64_t num_decoded) {
null_bitmap_builder_.UnsafeAppend(num_decoded, /*value=*/true);
for (int64_t i = 0; i < num_decoded; i++) {
data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
}
}

void UnsafeAppendSpaced(const FLBA* values, int64_t num_decoded,
const uint8_t* valid_bits, int64_t valid_bits_offset) {
null_bitmap_builder_.UnsafeAppend(valid_bits, valid_bits_offset, num_decoded);
for (int64_t i = 0; i < num_decoded; i++) {
if (::arrow::bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
} else {
PARQUET_THROW_NOT_OK(builder_->AppendNull());
data_builder_.UnsafeAppend(empty_.data(), byte_width_);
}
}
ResetValues();
}

private:
std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
const int byte_width_;
const std::vector<uint8_t> empty_;
std::shared_ptr<::arrow::DataType> type_;
::arrow::TypedBufferBuilder<bool> null_bitmap_builder_;
::arrow::BufferBuilder data_builder_;
};

class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
class ByteArrayChunkedRecordReader final : public TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
public:
ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
Expand Down Expand Up @@ -2137,8 +2167,8 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
};

class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
class ByteArrayDictionaryRecordReader final : public TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
Expand Down

0 comments on commit 4b730da

Please sign in to comment.