From 844c9893045a4257630ed1ca564f67db0dee0b4a Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 15 Aug 2016 16:28:36 -0500 Subject: [PATCH 1/3] FLINK-4278: Added close method calls to FSDataOutputStream and FsStateBackend --- .../main/java/org/apache/flink/core/fs/FSDataOutputStream.java | 2 ++ .../apache/flink/runtime/state/filesystem/FsStateBackend.java | 3 +++ 2 files changed, 5 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java index a6becf7965b28..222a41733f66f 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java @@ -32,4 +32,6 @@ public abstract class FSDataOutputStream extends OutputStream { public abstract void flush() throws IOException; public abstract void sync() throws IOException; + + public abstract void close() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 61cf7410b92c9..d1d07d9fc55e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -484,6 +485,7 @@ public void write(byte[] b, int off, int len) throws IOException { flush(); // write the bytes directly outStream.write(b, off, len); + outStream.close(); } } @@ -500,6 +502,7 @@ public void flush() throws IOException { try { statePath = new Path(basePath, UUID.randomUUID().toString()); outStream = fs.create(statePath, false); + outStream.close(); break; } catch (Exception e) { From d7d1273d7fee1fa728f97aa1fca360bad96c804d Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 15 Aug 2016 22:42:25 -0500 Subject: [PATCH 2/3] FLINK-4278: Added close method to the FileSystemStateStorageHelper and StringWriter --- .../zookeeper/filesystem/FileSystemStateStorageHelper.java | 3 +++ .../org/apache/flink/streaming/connectors/fs/StringWriter.java | 1 + 2 files changed, 4 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java index 6692ef01fb598..7c592461c986c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.runtime.zookeeper.StateStorageHelper; @@ -64,6 +65,7 @@ public StateHandle store(T state) throws Exception { FSDataOutputStream outStream; try { outStream = fs.create(filePath, false); + outStream.close(); } catch (Exception e) { latestException = e; @@ -72,6 +74,7 @@ public StateHandle store(T state) throws Exception { try(ObjectOutputStream os = new ObjectOutputStream(outStream)) { os.writeObject(state); + os.close(); } return new FileSerializableStateHandle<>(filePath); diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java index 6568a86c7f091..7b39bc8005a91 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java @@ -77,6 +77,7 @@ public void write(T element) throws IOException { FSDataOutputStream outputStream = getStream(); outputStream.write(element.toString().getBytes(charset)); outputStream.write('\n'); + outputStream.close(); } @Override From a677710a30b21915f1779b52806793db41bddb11 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Wed, 24 Aug 2016 06:05:48 -0700 Subject: [PATCH 3/3] FLINK-4278: Removed the close() from the FSDataOutputStream and removed unused imports that were failing stylechecks --- .../main/java/org/apache/flink/core/fs/FSDataOutputStream.java | 1 - .../apache/flink/runtime/state/filesystem/FsStateBackend.java | 1 - .../zookeeper/filesystem/FileSystemStateStorageHelper.java | 1 - 3 files changed, 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java index 222a41733f66f..cda29af4f6098 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java @@ -33,5 +33,4 @@ public abstract class FSDataOutputStream extends OutputStream { public abstract void sync() throws IOException; - public abstract void close() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index d1d07d9fc55e3..c5318de5027ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java index 7c592461c986c..e1f0ea0612457 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle; import org.apache.flink.util.FileUtils; -import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.runtime.zookeeper.StateStorageHelper;