Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15052: [C++][Parquet] Fix DELTA_BINARY_PACKED decoder when reading only one value #15124

Merged
merged 12 commits into from Jan 4, 2023
19 changes: 17 additions & 2 deletions cpp/src/parquet/encoding.cc
Expand Up @@ -2459,7 +2459,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
ParquetException::EofException();
}
if (bit_width_data[i] > kMaxDeltaBitWidth) {
throw ParquetException("delta bit width larger than integer bit width");
throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) +
" larger than integer bit width " +
std::to_string(kMaxDeltaBitWidth));
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
}
}
mini_block_idx_ = 0;
Expand All @@ -2479,7 +2481,20 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
if (ARROW_PREDICT_FALSE(!block_initialized_)) {
buffer[i++] = last_value_;
if (ARROW_PREDICT_FALSE(i == max_values)) break;
DCHECK_EQ(i, 1); // we're at the beginning of the page
if (ARROW_PREDICT_FALSE(i == max_values)) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
// When block is uninitialized and i reaches max_values we have two
// different possibilities:
// 1. total_value_count_ == 1, which means that the page may have only
// one value (encoded in the header), and we should not initialize
// any block.
// 2. total_value_count_ != 1, which means we should initialize the
// incoming block for subsequent reads.
if (total_value_count_ != 1) {
InitBlock();
}
break;
}
InitBlock();
} else {
++mini_block_idx_;
Expand Down
30 changes: 20 additions & 10 deletions cpp/src/parquet/encoding_test.cc
Expand Up @@ -1290,7 +1290,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
public:
using c_type = typename Type::c_type;
static constexpr int TYPE = Type::type_num;
static constexpr size_t ROUND_TRIP_TIMES = 3;
static constexpr size_t kNumRoundTrips = 3;
const std::vector<int> kReadBatchSizes = {1, 11};

void InitBoundData(int nvalues, int repeats, c_type half_range) {
num_values_ = nvalues * repeats;
Expand Down Expand Up @@ -1328,16 +1329,25 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
auto encoder =
MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());

for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
auto read_batch_sizes = kReadBatchSizes;
read_batch_sizes.push_back(num_values_);
// Encode a number of times to exercise the flush logic
for (size_t i = 0; i < kNumRoundTrips; ++i) {
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();

decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));
int values_decoded = decoder->Decode(decode_buf_, num_values_);
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
// Exercise different batch sizes
for (const int read_batch_size : read_batch_sizes) {
decoder->SetData(num_values_, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));

int values_decoded = 0;
while (values_decoded < num_values_) {
values_decoded +=
decoder->Decode(decode_buf_ + values_decoded, read_batch_size);
}
ASSERT_EQ(num_values_, values_decoded);
ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
}
}
}

Expand All @@ -1353,7 +1363,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
}
}

for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
for (size_t i = 0; i < kNumRoundTrips; ++i) {
encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
encode_buffer_ = encoder->FlushValues();
decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
Expand Down