Skip to content

Commit

Permalink
ARROW-14041: [C++] Replace uses of BitmapReader in Parquet decoders
Browse files Browse the repository at this point in the history
Use potentially more performant alternatives, though micro-benchmarks don't seem to show any significant improvement.

Closes #11769 from pitrou/ARROW-14041-parquet-bitmap-reader

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou committed Nov 30, 2021
1 parent 29f98dd commit 3e666f6
Showing 1 changed file with 53 additions and 47 deletions.
100 changes: 53 additions & 47 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/array/builder_dict.h"
#include "arrow/stl_allocator.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_stream_utils.h"
#include "arrow/util/bit_util.h"
Expand Down Expand Up @@ -1544,13 +1545,12 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
ParquetException::EofException();
}

/// XXX(wesm): Cannot append "valid bits" directly to the builder
std::vector<uint8_t> valid_bytes(num_values);
::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
for (int64_t i = 0; i < num_values; ++i) {
valid_bytes[i] = static_cast<uint8_t>(bit_reader.IsSet());
bit_reader.Next();
}
// XXX(wesm): Cannot append "valid bits" directly to the builder
std::vector<uint8_t> valid_bytes(num_values, 0);
int64_t i = 0;
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() { valid_bytes[i++] = 1; }, [&]() { ++i; });

auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
PARQUET_THROW_NOT_OK(
Expand Down Expand Up @@ -1889,56 +1889,62 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,

ArrowBinaryHelper helper(out);

::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
int num_appended = 0;
while (num_appended < num_values) {
bool is_valid = bit_reader.IsSet();
bit_reader.Next();

if (is_valid) {
int32_t batch_size =
std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
int num_indices = idx_decoder_.GetBatch(indices, batch_size);

int num_indices = 0;
int pos_indices = 0;

auto visit_valid = [&](int64_t position) -> Status {
if (num_indices == pos_indices) {
// Refill indices buffer
const auto batch_size =
std::min<int32_t>(kBufferSize, num_values - null_count - values_decoded);
num_indices = idx_decoder_.GetBatch(indices, batch_size);
if (ARROW_PREDICT_FALSE(num_indices < 1)) {
return Status::Invalid("Invalid number of indices '", num_indices, "'");
return Status::Invalid("Invalid number of indices: ", num_indices);
}
pos_indices = 0;
}
const auto index = indices[pos_indices++];
RETURN_NOT_OK(IndexInBounds(index));
const auto& val = dict_values[index];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++values_decoded;
return Status::OK();
};

int i = 0;
while (true) {
// Consume all indices
if (is_valid) {
auto idx = indices[i];
RETURN_NOT_OK(IndexInBounds(idx));
const auto& val = dict_values[idx];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++i;
++values_decoded;
auto visit_null = [&]() -> Status {
RETURN_NOT_OK(helper.AppendNull());
return Status::OK();
};

::arrow::internal::BitBlockCounter bit_blocks(valid_bits, valid_bits_offset,
num_values);
int64_t position = 0;
while (position < num_values) {
const auto block = bit_blocks.NextWord();
if (block.AllSet()) {
for (int64_t i = 0; i < block.length; ++i, ++position) {
ARROW_RETURN_NOT_OK(visit_valid(position));
}
} else if (block.NoneSet()) {
for (int64_t i = 0; i < block.length; ++i, ++position) {
ARROW_RETURN_NOT_OK(visit_null());
}
} else {
for (int64_t i = 0; i < block.length; ++i, ++position) {
if (BitUtil::GetBit(valid_bits, valid_bits_offset + position)) {
ARROW_RETURN_NOT_OK(visit_valid(position));
} else {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
}
++num_appended;
if (i == num_indices) {
// Do not advance the bit_reader if we have fulfilled the decode
// request
break;
ARROW_RETURN_NOT_OK(visit_null());
}
is_valid = bit_reader.IsSet();
bit_reader.Next();
}
} else {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
++num_appended;
}
}

*out_num_values = values_decoded;
return Status::OK();
}
Expand Down

0 comments on commit 3e666f6

Please sign in to comment.