Skip to content

Commit

Permalink
apacheGH-35095: Prevent write after close
Browse files Browse the repository at this point in the history
This addresses apacheGH-35095 by adding a flag to IpcFormatWriter
to track when a writer has been closed, and check this flag
before writes.
  • Loading branch information
chrisjordansquire committed Sep 20, 2023
1 parent 008d277 commit 9e69c38
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
20 changes: 20 additions & 0 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,23 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
}
}

void TestWriteAfterClose() {
// Part of GH-35095.
std::shared_ptr<RecordBatch> batch_ints;
ASSERT_OK(MakeIntRecordBatch(&batch_ints));

std::shared_ptr<Schema> schema = batch_ints->schema();

WriterHelper writer_helper;
ASSERT_OK(writer_helper.Init(schema, IpcWriteOptions::Defaults()));
ASSERT_OK(writer_helper.WriteBatch(batch_ints));
ASSERT_OK(writer_helper.Finish());

// Write after close raises status
auto foo = writer_helper.WriteBatch(batch_ints);
// ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints));
}

void TestWriteDifferentSchema() {
// Test writing batches with a different schema than the RecordBatchWriter
// was initialized with.
Expand Down Expand Up @@ -1991,6 +2008,9 @@ TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip()
TEST_F(TestFileFormatGeneratorCoalesced, DictionaryRoundTrip) {
TestDictionaryRoundtrip();
}
TEST_F(TestFileFormat, WriteAfterClose) { TestWriteAfterClose(); }

TEST_F(TestStreamFormat, WriteAfterClose) { TestWriteAfterClose(); }

TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); }

Expand Down
8 changes: 7 additions & 1 deletion cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
Status WriteRecordBatch(
const RecordBatch& batch,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata) override {
if (closed_) {
return Status::Invalid("Destination already closed");
}
if (!batch.schema()->Equals(schema_, false /* check_metadata */)) {
return Status::Invalid("Tried to write record batch with different schema");
}
Expand Down Expand Up @@ -1101,7 +1104,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {

Status Close() override {
RETURN_NOT_OK(CheckStarted());
return payload_writer_->Close();
RETURN_NOT_OK(payload_writer_->Close());
closed_ = true;
return Status::OK();
}

Status Start() {
Expand Down Expand Up @@ -1213,6 +1218,7 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
std::unordered_map<int64_t, std::shared_ptr<Array>> last_dictionaries_;

bool started_ = false;
bool closed_ = false;
IpcWriteOptions options_;
WriteStats stats_;
};
Expand Down

0 comments on commit 9e69c38

Please sign in to comment.