Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37969: [C++][Parquet] add more closed file checks for ParquetFileWriter #38390

Merged
merged 11 commits into from
Nov 16, 2023
27 changes: 27 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5206,6 +5206,33 @@ TEST(TestArrowReadWrite, FuzzReader) {
}
}

// Test writing table with a closed writer, should not segfault (GH-37969).
TEST(TestArrowReadWrite, OperationsOnClosedWriter) {
// A sample table, type and structure does not matter in this test case
auto schema = ::arrow::schema({::arrow::field("letter", ::arrow::utf8())});
auto table = ::arrow::Table::Make(
schema, {::arrow::ArrayFromJSON(::arrow::utf8(), R"(["a", "b", "c"])")});

auto sink = CreateOutputStream();
ASSERT_OK_AND_ASSIGN(auto writer, parquet::arrow::FileWriter::Open(
*schema, ::arrow::default_memory_pool(), sink,
parquet::default_writer_properties(),
parquet::default_arrow_writer_properties()));

// Should be ok
ASSERT_OK(writer->WriteTable(*table, 1));

// Operations on closed writer are invalid
ASSERT_OK(writer->Close());

ASSERT_RAISES(Invalid, writer->NewRowGroup(1));
ASSERT_RAISES(Invalid, writer->WriteColumnChunk(table->column(0), 0, 1));
ASSERT_RAISES(Invalid, writer->NewBufferedRowGroup());
ASSERT_OK_AND_ASSIGN(auto record_batch, table->CombineChunksToBatch());
ASSERT_RAISES(Invalid, writer->WriteRecordBatch(*record_batch));
ASSERT_RAISES(Invalid, writer->WriteTable(*table, 1));
}

namespace {

struct ColumnIndexObject {
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class FileWriterImpl : public FileWriter {
}

Status NewRowGroup(int64_t chunk_size) override {
RETURN_NOT_OK(CheckClosed());
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
Expand All @@ -325,6 +326,13 @@ class FileWriterImpl : public FileWriter {
return Status::OK();
}

Status CheckClosed() const {
if (closed_) {
return Status::Invalid("Operation on closed file");
}
return Status::OK();
}

Status WriteColumnChunk(const Array& data) override {
// A bit awkward here since cannot instantiate ChunkedArray from const Array&
auto chunk = ::arrow::MakeArray(data.data());
Expand All @@ -334,6 +342,7 @@ class FileWriterImpl : public FileWriter {

Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
int64_t size) override {
RETURN_NOT_OK(CheckClosed());
if (arrow_properties_->engine_version() == ArrowWriterProperties::V2 ||
arrow_properties_->engine_version() == ArrowWriterProperties::V1) {
if (row_group_writer_->buffered()) {
Expand All @@ -356,6 +365,7 @@ class FileWriterImpl : public FileWriter {
std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }

Status WriteTable(const Table& table, int64_t chunk_size) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(table.Validate());

if (chunk_size <= 0 && table.num_rows() > 0) {
Expand Down Expand Up @@ -392,6 +402,7 @@ class FileWriterImpl : public FileWriter {
}

Status NewBufferedRowGroup() override {
RETURN_NOT_OK(CheckClosed());
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
Expand All @@ -400,6 +411,7 @@ class FileWriterImpl : public FileWriter {
}

Status WriteRecordBatch(const RecordBatch& batch) override {
RETURN_NOT_OK(CheckClosed());
if (batch.num_rows() == 0) {
return Status::OK();
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,11 @@ void ParquetFileWriter::AddKeyValueMetadata(
}

const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
return contents_->properties();
if (contents_) {
return contents_->properties();
} else {
throw ParquetException("Cannot get properties from closed file");
}
}

} // namespace parquet
Loading