Skip to content

Commit

Permalink
apacheGH-39527: [C++][Parquet] Validate page sizes before truncated t…
Browse files Browse the repository at this point in the history
…o int32

Be defensive instead of writing invalid data.
  • Loading branch information
emkornfield committed Jan 9, 2024
1 parent 5acf67c commit fc89382
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
17 changes: 15 additions & 2 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ class SerializedPageWriter : public PageWriter {
}

int64_t WriteDictionaryPage(const DictionaryPage& page) override {
int64_t uncompressed_size = page.size();
int64_t uncompressed_size = page.buffer()->size();
if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Uncompressed page size overflows to INT32_MAX.");
}
std::shared_ptr<Buffer> compressed_data;
if (has_compressor()) {
auto buffer = std::static_pointer_cast<ResizableBuffer>(
Expand All @@ -288,6 +291,9 @@ class SerializedPageWriter : public PageWriter {
dict_page_header.__set_is_sorted(page.is_sorted());

const uint8_t* output_data_buffer = compressed_data->data();
if (compressed_data->size() > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed page size overflows to INT32_MAX.");
}
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());

if (data_encryptor_.get()) {
Expand Down Expand Up @@ -371,7 +377,7 @@ class SerializedPageWriter : public PageWriter {
const int64_t uncompressed_size = page.uncompressed_size();
std::shared_ptr<Buffer> compressed_data = page.buffer();
const uint8_t* output_data_buffer = compressed_data->data();
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
int64_t output_data_len = compressed_data->size();

if (data_encryptor_.get()) {
PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
Expand All @@ -383,7 +389,14 @@ class SerializedPageWriter : public PageWriter {
}

format::PageHeader page_header;

if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Uncompressed page size overflows to INT32_MAX.");
}
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
if (output_data_len > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed page size overflows to INT32_MAX.");
}
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));

if (page_checksum_verification_) {
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <memory>
#include <utility>
#include <vector>

Expand All @@ -36,6 +37,7 @@
#include "parquet/test_util.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"
#include "third_party/parquet_cpp/src2/parquet/column_page.h"

namespace bit_util = arrow::bit_util;

Expand Down Expand Up @@ -889,6 +891,44 @@ TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
ASSERT_TRUE(this->metadata_is_stats_set());
}

TEST(TestPageWriter, ThrowsOnPagesToLarge) {
NodePtr item = schema::Int32("item"); // optional item
NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST));
NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list
std::vector<NodePtr> fields = {bag};
NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields);

SchemaDescriptor schema;
schema.Init(root);

auto sink = CreateOutputStream();
auto props = WriterProperties::Builder().build();

auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0));
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink, Compression::UNCOMPRESSED,
Codec::UseDefaultCompressionLevel(), metadata.get());

uint8_t data;
std::shared_ptr<Buffer> buffer =
std::make_shared<Buffer>(&data, std::numeric_limits<int32_t>::max() + int64_t{1});
DataPageV1 over_compressed_limit(buffer, /*num_values=*/100, Encoding::BIT_PACKED,
Encoding::BIT_PACKED, Encoding::BIT_PACKED,
/*uncompressed_size=*/100);
EXPECT_THROW(pager->WriteDataPage(over_compressed_limit), ParquetException);
DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100,
Encoding::PLAIN);
EXPECT_THROW(pager->WriteDictionaryPage(dictionary_over_compressed_limit),
ParquetException);

buffer = std::make_shared<Buffer>(&data, 1);
DataPageV1 over_uncompressed_limit(
buffer, /*num_values=*/100, Encoding::BIT_PACKED, Encoding::BIT_PACKED,
Encoding::BIT_PACKED,
/*uncompressed_size=*/std::numeric_limits<int32_t>::max() + int64_t{1});
EXPECT_THROW(pager->WriteDataPage(over_uncompressed_limit), ParquetException);
}

TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) {
// In ARROW-3930 we discovered a bug when writing from Arrow when we had data
// that looks like this:
Expand Down

0 comments on commit fc89382

Please sign in to comment.