Skip to content

Commit

Permalink
GH-39527: [C++][Parquet] Validate page sizes before truncating to int…
Browse files Browse the repository at this point in the history
…32 (#39528)

Be defensive instead of writing invalid data.

### Rationale for this change

Users can provide this API pages that are large to validly write and we silently truncate lengths before writing.

### What changes are included in this PR?

Add validations and throw an exception if sizes are too large (this was previously checked only if page indexes are being build).

### Are these changes tested?

Unit tested

### Are there any user-facing changes?

This might start raising exceptions instead of writing out invalid parquet files.

Closes #39527 

**This PR contains a "Critical Fix".** 
* Closes: #39527

Lead-authored-by: emkornfield <emkornfield@gmail.com>
Co-authored-by: Micah Kornfield <micahk@google.com>
Co-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Co-authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
  • Loading branch information
5 people committed Jan 27, 2024
1 parent 13b2234 commit 5d7f661
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
29 changes: 25 additions & 4 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ class SerializedPageWriter : public PageWriter {
}

int64_t WriteDictionaryPage(const DictionaryPage& page) override {
int64_t uncompressed_size = page.size();
int64_t uncompressed_size = page.buffer()->size();
if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException(
"Uncompressed dictionary page size overflows INT32_MAX. Size:",
uncompressed_size);
}
std::shared_ptr<Buffer> compressed_data;
if (has_compressor()) {
auto buffer = std::static_pointer_cast<ResizableBuffer>(
Expand All @@ -288,6 +293,11 @@ class SerializedPageWriter : public PageWriter {
dict_page_header.__set_is_sorted(page.is_sorted());

const uint8_t* output_data_buffer = compressed_data->data();
if (compressed_data->size() > std::numeric_limits<int32_t>::max()) {
throw ParquetException(
"Compressed dictionary page size overflows INT32_MAX. Size: ",
uncompressed_size);
}
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());

if (data_encryptor_.get()) {
Expand Down Expand Up @@ -371,18 +381,29 @@ class SerializedPageWriter : public PageWriter {
const int64_t uncompressed_size = page.uncompressed_size();
std::shared_ptr<Buffer> compressed_data = page.buffer();
const uint8_t* output_data_buffer = compressed_data->data();
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
int64_t output_data_len = compressed_data->size();

if (output_data_len > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed data page size overflows INT32_MAX. Size:",
output_data_len);
}

if (data_encryptor_.get()) {
PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
UpdateEncryption(encryption::kDataPage);
output_data_len = data_encryptor_->Encrypt(compressed_data->data(), output_data_len,
output_data_len = data_encryptor_->Encrypt(compressed_data->data(),
static_cast<int32_t>(output_data_len),
encryption_buffer_->mutable_data());
output_data_buffer = encryption_buffer_->data();
}

format::PageHeader page_header;

if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Uncompressed data page size overflows INT32_MAX. Size:",
uncompressed_size);
}
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));

Expand Down Expand Up @@ -421,7 +442,7 @@ class SerializedPageWriter : public PageWriter {
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
if (compressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed page size overflows to INT32_MAX.");
throw ParquetException("Compressed page size overflows INT32_MAX.");
}
if (!page.first_row_index().has_value()) {
throw ParquetException("First row index is not set in data page.");
Expand Down
45 changes: 45 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
// specific language governing permissions and limitations
// under the License.

#include <memory>
#include <utility>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "arrow/io/buffered.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
#include "parquet/file_reader.h"
Expand Down Expand Up @@ -479,6 +482,9 @@ using TestValuesWriterInt64Type = TestPrimitiveWriter<Int64Type>;
using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
using TestFixedLengthByteArrayValuesWriter = TestPrimitiveWriter<FLBAType>;

using ::testing::HasSubstr;
using ::testing::ThrowsMessage;

TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
this->TestRequiredWithEncoding(Encoding::PLAIN);
}
Expand Down Expand Up @@ -889,6 +895,45 @@ TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
ASSERT_TRUE(this->metadata_is_stats_set());
}

TEST(TestPageWriter, ThrowsOnPagesTooLarge) {
NodePtr item = schema::Int32("item"); // optional item
NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST));
NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list
std::vector<NodePtr> fields = {bag};
NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields);

SchemaDescriptor schema;
schema.Init(root);

auto sink = CreateOutputStream();
auto props = WriterProperties::Builder().build();

auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0));
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get());

uint8_t data;
std::shared_ptr<Buffer> buffer =
std::make_shared<Buffer>(&data, std::numeric_limits<int32_t>::max() + int64_t{1});
DataPageV1 over_compressed_limit(buffer, /*num_values=*/100, Encoding::BIT_PACKED,
Encoding::BIT_PACKED, Encoding::BIT_PACKED,
/*uncompressed_size=*/100);
EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); },
ThrowsMessage<ParquetException>(HasSubstr("overflows INT32_MAX")));
DictionaryPage dictionary_over_compressed_limit(buffer, /*num_values=*/100,
Encoding::PLAIN);
EXPECT_THAT([&]() { pager->WriteDictionaryPage(dictionary_over_compressed_limit); },
ThrowsMessage<ParquetException>(HasSubstr("overflows INT32_MAX")));

buffer = std::make_shared<Buffer>(&data, 1);
DataPageV1 over_uncompressed_limit(
buffer, /*num_values=*/100, Encoding::BIT_PACKED, Encoding::BIT_PACKED,
Encoding::BIT_PACKED,
/*uncompressed_size=*/std::numeric_limits<int32_t>::max() + int64_t{1});
EXPECT_THAT([&]() { pager->WriteDataPage(over_compressed_limit); },
ThrowsMessage<ParquetException>(HasSubstr("overflows INT32_MAX")));
}

TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) {
// In ARROW-3930 we discovered a bug when writing from Arrow when we had data
// that looks like this:
Expand Down

0 comments on commit 5d7f661

Please sign in to comment.