Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions be/src/format/parquet/byte_array_dict_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,24 @@ Status ByteArrayDictDecoder::set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32
return Status::Corruption("Wrong dictionary data for byte array type, dict is null.");
}
_dict_items.reserve(num_values);
uint32_t offset_cursor = 0;
if (UNLIKELY(length < 0)) {
return Status::Corruption("Wrong data length in dictionary");
}
const size_t dict_length = cast_set<size_t>(length);
size_t offset_cursor = 0;
char* dict_item_address = reinterpret_cast<char*>(_dict.get());

size_t total_length = 0;
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(offset_cursor > dict_length ||
dict_length - offset_cursor < sizeof(uint32_t))) {
return Status::Corruption("Wrong data length in dictionary");
}
uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor);
offset_cursor += 4;
offset_cursor += sizeof(uint32_t);
if (UNLIKELY(l > dict_length - offset_cursor)) {
return Status::Corruption("Wrong data length in dictionary");
}
offset_cursor += l;
total_length += l;
}
Expand All @@ -53,20 +64,24 @@ Status ByteArrayDictDecoder::set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32
size_t offset = 0;
offset_cursor = 0;
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(offset_cursor > dict_length ||
dict_length - offset_cursor < sizeof(uint32_t))) {
return Status::Corruption("Wrong data length in dictionary");
}
uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor);
offset_cursor += 4;
offset_cursor += sizeof(uint32_t);
if (UNLIKELY(l > dict_length - offset_cursor)) {
return Status::Corruption("Wrong data length in dictionary");
}
memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l);
_dict_items.emplace_back(&_dict_data[offset], l);
offset_cursor += l;
offset += l;
if (offset_cursor > length) {
return Status::Corruption("Wrong data length in dictionary");
}
if (l > _max_value_length) {
_max_value_length = l;
}
}
if (offset_cursor != length) {
if (offset_cursor != dict_length) {
return Status::Corruption("Wrong dictionary data for byte array type");
}
return Status::OK();
Expand Down
41 changes: 20 additions & 21 deletions be/src/format/parquet/byte_array_plain_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,22 @@
#include "core/string_ref.h"

namespace doris {
namespace {
Status read_length(const Slice* data, uint32_t* offset, uint32_t* length) {
if (UNLIKELY(*offset > data->size || data->size - *offset < sizeof(uint32_t))) {
return Status::IOError("Can't read byte array length from plain decoder");
}
*length = decode_fixed32_le(reinterpret_cast<const uint8_t*>(data->data) + *offset);
*offset += sizeof(uint32_t);
return Status::OK();
}
} // namespace

Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
uint32_t length = 0;
RETURN_IF_ERROR(read_length(_data, &_offset, &length));
if (UNLIKELY(_offset > _data->size || length > _data->size - _offset)) {
return Status::IOError("Can't skip enough bytes in plain decoder");
}
_offset += length;
Expand Down Expand Up @@ -62,13 +69,9 @@ Status ByteArrayPlainDecoder::_decode_values(MutableColumnPtr& doris_column, Dat
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
uint32_t length = 0;
RETURN_IF_ERROR(read_length(_data, &_offset, &length));
if (UNLIKELY(_offset > _data->size || length > _data->size - _offset)) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
string_values.emplace_back(_data->data + _offset, length);
Expand All @@ -83,13 +86,9 @@ Status ByteArrayPlainDecoder::_decode_values(MutableColumnPtr& doris_column, Dat
}
case ColumnSelectVector::FILTERED_CONTENT: {
for (int i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
uint32_t length = 0;
RETURN_IF_ERROR(read_length(_data, &_offset, &length));
if (UNLIKELY(_offset > _data->size || length > _data->size - _offset)) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
_offset += length;
Expand Down
17 changes: 17 additions & 0 deletions be/test/format/parquet/byte_array_dict_decoder_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,4 +511,21 @@ TEST_F(ByteArrayDictDecoderTest, test_skip_value) {
}
}

TEST_F(ByteArrayDictDecoderTest, test_set_dict_rejects_truncated_length_prefix) {
auto dict_data = make_unique_buffer<uint8_t>(2);
dict_data[0] = 1;
dict_data[1] = 0;

ByteArrayDictDecoder decoder;
ASSERT_FALSE(decoder.set_dict(dict_data, 2, 1).ok());
}

TEST_F(ByteArrayDictDecoderTest, test_set_dict_rejects_truncated_payload_after_prefix) {
auto dict_data = make_unique_buffer<uint8_t>(4);
encode_fixed32_le(dict_data.get(), 1);

ByteArrayDictDecoder decoder;
ASSERT_FALSE(decoder.set_dict(dict_data, 4, 1).ok());
}

} // namespace doris
43 changes: 43 additions & 0 deletions be/test/format/parquet/byte_array_plain_decoder_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,47 @@ TEST_F(ByteArrayPlainDecoderTest, test_skip_value) {
EXPECT_EQ(result_column->get_data_at(0).to_string(), "cherry");
}

TEST_F(ByteArrayPlainDecoderTest, test_decode_truncated_length_prefix) {
uint8_t data[] = {0x01, 0x00};
_data_slice = Slice(data, sizeof(data));

ByteArrayPlainDecoder decoder;
ASSERT_TRUE(decoder.set_data(&_data_slice).ok());

MutableColumnPtr column = ColumnString::create();
DataTypePtr data_type = std::make_shared<DataTypeString>();
size_t num_values = 1;
std::vector<uint16_t> run_length_null_map(1, num_values);
std::vector<uint8_t> filter_data(num_values, 1);
FilterMap filter_map;
ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok());
ColumnSelectVector select_vector;
ASSERT_TRUE(select_vector.init(run_length_null_map, num_values, nullptr, &filter_map, 0).ok());

ASSERT_FALSE(decoder.decode_values(column, data_type, select_vector, false).ok());
}

TEST_F(ByteArrayPlainDecoderTest, test_decode_rejects_overflow_length) {
uint8_t data[8] = {};
encode_fixed32_le(data, 0);
encode_fixed32_le(data + 4, UINT32_MAX);
_data_slice = Slice(data, sizeof(data));

ByteArrayPlainDecoder decoder;
ASSERT_TRUE(decoder.set_data(&_data_slice).ok());
ASSERT_TRUE(decoder.skip_values(1).ok());

MutableColumnPtr column = ColumnString::create();
DataTypePtr data_type = std::make_shared<DataTypeString>();
size_t num_values = 1;
std::vector<uint16_t> run_length_null_map(1, num_values);
std::vector<uint8_t> filter_data(num_values, 1);
FilterMap filter_map;
ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok());
ColumnSelectVector select_vector;
ASSERT_TRUE(select_vector.init(run_length_null_map, num_values, nullptr, &filter_map, 0).ok());

ASSERT_FALSE(decoder.decode_values(column, data_type, select_vector, false).ok());
}

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,8 @@ suite("test_hdfs_parquet_group6", "p0,external") {
sql """ select * from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "parquet") limit 10; """
exception "Can't read byte array length from plain decoder"
"format" = "parquet"); """
exception "Can't read enough bytes in plain decode"
}


Expand Down
Loading