From f98cca7f91edd778add227d4d50839048aeb7826 Mon Sep 17 00:00:00 2001 From: Chris Jordan-Squire <788080+chrisjordansquire@users.noreply.github.com> Date: Wed, 20 Sep 2023 23:48:54 -0400 Subject: [PATCH] GH-35095: [C++] Prevent write after close in arrow::ipc::IpcFormatWriter (#37783) This addresses GH-35095 by adding a flag to IpcFormatWriter to track when a writer has been closed, and check this flag before writes. ### Rationale for this change This addresses #35095 , preventing stream and file IPC writers from writing record batches once the IPC writer has been closed. ### What changes are included in this PR? Adding a flag so that an IpcFormatWriter to track when it's been closed, a check before writes in IpcFormatWriter, and two tests to confirm it works as expected. ### Are these changes tested? Yes, the changes are tested. The two tests were added, and the C++ test suite ran. No unexpected failures appeared. ### Are there any user-facing changes? Other than newly returning an invalid status when writing after close, no, there should not be any user-facing changes. * Closes: #35095 Lead-authored-by: Chris Jordan-Squire Co-authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- cpp/src/arrow/ipc/read_write_test.cc | 19 +++++++++++++++++++ cpp/src/arrow/ipc/writer.cc | 8 +++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 69b827b8fe78d..3ae007c20efe7 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1519,6 +1519,22 @@ class ReaderWriterMixin : public ExtensionTypesMixin { } } + void TestWriteAfterClose() { + // Part of GH-35095. + std::shared_ptr batch_ints; + ASSERT_OK(MakeIntRecordBatch(&batch_ints)); + + auto schema = batch_ints->schema(); + + WriterHelper writer_helper; + ASSERT_OK(writer_helper.Init(schema, IpcWriteOptions::Defaults())); + ASSERT_OK(writer_helper.WriteBatch(batch_ints)); + ASSERT_OK(writer_helper.Finish()); + + // Write after close raises status + ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints)); + } + void TestWriteDifferentSchema() { // Test writing batches with a different schema than the RecordBatchWriter // was initialized with. @@ -1991,6 +2007,9 @@ TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip() TEST_F(TestFileFormatGeneratorCoalesced, DictionaryRoundTrip) { TestDictionaryRoundtrip(); } +TEST_F(TestFileFormat, WriteAfterClose) { TestWriteAfterClose(); } + +TEST_F(TestStreamFormat, WriteAfterClose) { TestWriteAfterClose(); } TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 1d230601566a0..e4b49ed56464e 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -1070,6 +1070,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { Status WriteRecordBatch( const RecordBatch& batch, const std::shared_ptr& custom_metadata) override { + if (closed_) { + return Status::Invalid("Destination already closed"); + } if (!batch.schema()->Equals(schema_, false /* check_metadata */)) { return Status::Invalid("Tried to write record batch with different schema"); } @@ -1101,7 +1104,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { Status Close() override { RETURN_NOT_OK(CheckStarted()); - return payload_writer_->Close(); + RETURN_NOT_OK(payload_writer_->Close()); + closed_ = true; + return Status::OK(); } Status Start() { @@ -1213,6 +1218,7 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { std::unordered_map> last_dictionaries_; bool started_ = false; + bool closed_ = false; IpcWriteOptions options_; WriteStats stats_; };