Skip to content

Commit

Permalink
apacheGH-34335: [C++][Parquet] Optimize Decoding DELTA_LENGTH_BYTE_AR…
Browse files Browse the repository at this point in the history
…RAY (apache#34955)

### Rationale for this change

According to apache#34323 . DELTA_LENGTH_BYTE_ARRAY is much more slower. So do some optimizations.

### What changes are included in this PR?

Some tiny changes

### Are these changes tested?

### Are there any user-facing changes?

* Closes: apache#34335

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
mapleFU authored and liujiacheng777 committed May 11, 2023
1 parent f524eaf commit 5f3d97d
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions cpp/src/parquet/encoding.cc
Expand Up @@ -2718,21 +2718,14 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
MemoryPool* pool = ::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
buffered_length_(AllocateBuffer(pool, 0)),
buffered_data_(AllocateBuffer(pool, 0)) {}
buffered_length_(AllocateBuffer(pool, 0)) {}

void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
DecoderImpl::SetData(num_values, data, len);
decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
DecodeLengths();
}

void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) {
num_values_ = num_values;
decoder_ = decoder;
DecodeLengths();
}

int Decode(ByteArray* buffer, int max_values) override {
// Decode up to `max_values` strings into an internal buffer
// and reference them into `buffer`.
Expand All @@ -2745,6 +2738,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int32_t data_size = 0;
const int32_t* length_ptr =
reinterpret_cast<const int32_t*>(buffered_length_->data()) + length_idx_;
int bytes_offset = len_ - decoder_->bytes_left();
for (int i = 0; i < max_values; ++i) {
int32_t len = length_ptr[i];
if (ARROW_PREDICT_FALSE(len < 0)) {
Expand All @@ -2756,13 +2750,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
}
}
length_idx_ += max_values;

PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size));
if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) != data_size) {
if (ARROW_PREDICT_FALSE(!decoder_->Advance(8 * static_cast<int64_t>(data_size)))) {
ParquetException::EofException();
}
const uint8_t* data_ptr = buffered_data_->data();

const uint8_t* data_ptr = data_ + bytes_offset;
for (int i = 0; i < max_values; ++i) {
buffer[i].ptr = data_ptr;
data_ptr += buffer[i].len;
Expand Down Expand Up @@ -2850,7 +2841,6 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int num_valid_values_;
uint32_t length_idx_;
std::shared_ptr<ResizableBuffer> buffered_length_;
std::shared_ptr<ResizableBuffer> buffered_data_;
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -3071,8 +3061,12 @@ class DeltaByteArrayDecoder : public DecoderImpl,
prefix_len_offset_ = 0;
num_valid_values_ = num_prefix;

int bytes_left = decoder_->bytes_left();
// If len < bytes_left, prefix_len_decoder.Decode will throw exception.
DCHECK_GE(len, bytes_left);
int suffix_begins = len - bytes_left;
// at this time, the decoder_ will be at the start of the encoded suffix data.
suffix_decoder_.SetDecoder(num_values, decoder_);
suffix_decoder_.SetData(num_values, data + suffix_begins, bytes_left);

// TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set
// to last_value_in_previous_page_ when decoding a new page(except the first page)
Expand Down

0 comments on commit 5f3d97d

Please sign in to comment.