diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 0cc05d6d75..2a6c23c66e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -148,6 +148,10 @@ public void close() throws IOException, InterruptedException { } public void write(T value) throws IOException, InterruptedException { + if (aborted) { + throw new IOException("Writer has been aborted due to a previous error and is in an undefined state. " + + "No further writes are allowed. Please create a new writer."); + } try { writeSupport.write(value); ++recordCount; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java index 51f8a7dd67..5ab8f89b5c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java @@ -29,14 +29,18 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; import org.apache.parquet.hadoop.codec.CleanUtil; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.LocalOutputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -58,6 +62,49 @@ public class TestParquetWriterError { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Test + public void testWriteAfterAbortShouldThrow() throws Exception { + java.nio.file.Path outputFile = tmpFolder.newFile("abort_test.parquet").toPath(); + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary name; required int32 age; }"); + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(outputFile)) + .withType(schema) + .withAllocator(allocator) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build(); + + // Write one valid record + writer.write(groupFactory.newGroup().append("name", "Alice").append("age", 30)); + + // Simulate an aborted state by reflectively setting the aborted flag + // on the internal writer. This mirrors what happens when a write fails + // with an exception (e.g. OOM during page flush). + Field internalWriterField = ParquetWriter.class.getDeclaredField("writer"); + internalWriterField.setAccessible(true); + InternalParquetRecordWriter internalWriter = + (InternalParquetRecordWriter) internalWriterField.get(writer); + Field abortedField = InternalParquetRecordWriter.class.getDeclaredField("aborted"); + abortedField.setAccessible(true); + abortedField.setBoolean(internalWriter, true); + + // Now try to write again - this should throw IOException + try { + writer.write(groupFactory.newGroup().append("name", "Charlie").append("age", 25)); + Assert.fail("Expected IOException when writing to an aborted writer"); + } catch (IOException e) { + Assert.assertTrue( + "Error message should mention aborted state", + e.getMessage().contains("aborted")); + } + + // Close should not throw (it should silently skip flushing due to aborted state) + writer.close(); + } + } + @Test public void testInSeparateProcess() throws IOException, InterruptedException { String outputFile = tmpFolder.newFile("out.parquet").toString();