From ca63cff4045cb60ed377b2727f01a457579d057c Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Wed, 22 Feb 2017 06:26:38 +0200 Subject: [PATCH 1/2] [BEAM-1465] No natural place to flush/close resources in FileBasedWriter --- .../main/java/org/apache/beam/sdk/io/AvroIO.java | 8 ++++---- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 14 +++++++++++--- .../main/java/org/apache/beam/sdk/io/TextIO.java | 7 +++++-- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 01a4cba00d4f..9e217c0cce48 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -1027,13 +1027,13 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { } @Override - public void write(T value) throws Exception { - dataFileWriter.append(value); + protected void finishWrite() throws Exception { + dataFileWriter.flush(); } @Override - protected void writeFooter() throws Exception { - dataFileWriter.flush(); + public void write(T value) throws Exception { + dataFileWriter.append(value); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 32b8b4f11a5e..512e8cb5979f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -579,6 +579,12 @@ public FileBasedWriter(FileBasedWriteOperation writeOperation) { */ protected abstract void prepareWrite(WritableByteChannel channel) throws Exception; + /** + * Called after all calls to writeHeader, writeFooter, and write. + * If any resources opened in the write processes need to be closed, close them here. + */ + protected void finishWrite() throws Exception {} + /** * Writes header at the beginning of output files. Nothing by default; subclasses may override. */ @@ -627,9 +633,11 @@ public final void open(String uId) throws Exception { */ @Override public final FileResult close() throws Exception { - try (WritableByteChannel theChannel = channel) { - LOG.debug("Writing footer to {}.", filename); - writeFooter(); + LOG.debug("Writing footer to {}.", filename); + writeFooter(); + finishWrite(); + if (channel.isOpen()) { + channel.close(); } FileResult result = new FileResult(filename); LOG.debug("Result for bundle {}: {}", this.id, filename); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 726411c78260..fd8922e81c04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -1095,6 +1095,11 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8); } + @Override + protected void finishWrite() throws Exception { + out.close(); + } + @Override protected void writeHeader() throws Exception { writeIfNotNull(header); @@ -1103,8 +1108,6 @@ protected void writeHeader() throws Exception { @Override protected void writeFooter() throws Exception { writeIfNotNull(footer); - // Flush here because there is currently no other natural place to do this. [BEAM-1465] - out.flush(); } @Override From 1146c31a88b88f5d2a3711f8e6dc429266a1dce8 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Wed, 22 Feb 2017 21:51:07 +0200 Subject: [PATCH 2/2] Changes after review --- .../java/org/apache/beam/sdk/io/AvroIO.java | 8 +++--- .../org/apache/beam/sdk/io/FileBasedSink.java | 25 +++++++++++-------- .../java/org/apache/beam/sdk/io/TextIO.java | 12 ++++----- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 9e217c0cce48..388d9f0b0d2c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -1027,13 +1027,13 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { } @Override - protected void finishWrite() throws Exception { - dataFileWriter.flush(); + public void write(T value) throws Exception { + dataFileWriter.append(value); } @Override - public void write(T value) throws Exception { - dataFileWriter.append(value); + protected void finishWrite() throws Exception { + dataFileWriter.flush(); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 512e8cb5979f..e14ba59029eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -579,12 +579,6 @@ public FileBasedWriter(FileBasedWriteOperation writeOperation) { */ protected abstract void prepareWrite(WritableByteChannel channel) throws Exception; - /** - * Called after all calls to writeHeader, writeFooter, and write. - * If any resources opened in the write processes need to be closed, close them here. - */ - protected void finishWrite() throws Exception {} - /** * Writes header at the beginning of output files. Nothing by default; subclasses may override. */ @@ -595,6 +589,12 @@ protected void writeHeader() throws Exception {} */ protected void writeFooter() throws Exception {} + /** + * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}. + * If any resources opened in the write processes need to be flushed, flush them here. + */ + protected void finishWrite() throws Exception {} + /** * Opens the channel. */ @@ -633,11 +633,14 @@ public final void open(String uId) throws Exception { */ @Override public final FileResult close() throws Exception { - LOG.debug("Writing footer to {}.", filename); - writeFooter(); - finishWrite(); - if (channel.isOpen()) { - channel.close(); + try (WritableByteChannel theChannel = channel) { + LOG.debug("Writing footer to {}.", filename); + writeFooter(); + LOG.debug("Finishing write to {}.", filename); + finishWrite(); + if (!channel.isOpen()) { + throw new IllegalStateException("Channel should only be closed by its owner: " + channel); + } } FileResult result = new FileResult(filename); LOG.debug("Result for bundle {}: {}", this.id, filename); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index fd8922e81c04..86e698908111 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -1096,13 +1096,13 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { } @Override - protected void finishWrite() throws Exception { - out.close(); + protected void writeHeader() throws Exception { + writeIfNotNull(header); } @Override - protected void writeHeader() throws Exception { - writeIfNotNull(header); + public void write(String value) throws Exception { + writeLine(value); } @Override @@ -1111,8 +1111,8 @@ protected void writeFooter() throws Exception { } @Override - public void write(String value) throws Exception { - writeLine(value); + protected void finishWrite() throws Exception { + out.flush(); } } }