diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index e4120069784a3..7cb391623f7b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -108,7 +108,7 @@ public void discardState() throws Exception { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { long result = 0L; for (TaskState taskState : taskStates.values()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index 9025090eab4af..657dd609e2308 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -152,7 +152,7 @@ public void discardState() throws Exception { @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { long result = 0L; for (int i = 0; i < parallelism; i++) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index 8e05b816fcb12..f07f44fe2ab1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -197,6 +197,7 @@ public static void serializeStreamStateHandle(StreamStateHandle stateHandle, Dat } else if (stateHandle instanceof FileStateHandle) { dos.writeByte(FILE_STREAM_STATE_HANDLE); FileStateHandle fileStateHandle = (FileStateHandle) stateHandle; + dos.writeLong(stateHandle.getStateSize()); dos.writeUTF(fileStateHandle.getFilePath().toString()); } else if (stateHandle instanceof ByteStreamStateHandle) { @@ -218,12 +219,13 @@ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis if (NULL_HANDLE == type) { return null; } else if (FILE_STREAM_STATE_HANDLE == type) { + long size = dis.readLong(); String pathString = dis.readUTF(); - return new FileStateHandle(new Path(pathString)); + return new FileStateHandle(new Path(pathString), size); } else if (BYTE_STREAM_STATE_HANDLE == type) { int numBytes = dis.readInt(); byte[] data = new byte[numBytes]; - dis.read(data); + dis.readFully(data); return new ByteStreamStateHandle(data); } else { throw new IOException("Unknown implementation of StreamStateHandle, code: " + type); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java index 9b308a355ba81..74057ee0b4172 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java @@ -85,7 +85,7 @@ public void discardState() throws Exception { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { long sumStateSize = 0; if (operatorStateHandles != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 0a36f926273de..7f87e866c3ced 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -118,7 +118,7 @@ public void discardState() throws Exception { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return stateHandle.getStateSize(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java index c6fd02c98fff1..9ecc4c9d2aca0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java @@ -20,8 +20,6 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.RetrievableStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -41,6 +39,7 @@ public class RetrievableStreamStateHandle implements StreamStateHandle, RetrievableStateHandle, Closeable { private static final long serialVersionUID = 314567453677355L; + /** wrapped inner stream state handle from which we deserialize on retrieval */ private final StreamStateHandle wrappedStreamStateHandle; @@ -48,9 +47,9 @@ public RetrievableStreamStateHandle(StreamStateHandle streamStateHandle) { this.wrappedStreamStateHandle = Preconditions.checkNotNull(streamStateHandle); } - public RetrievableStreamStateHandle(Path filePath) { + public RetrievableStreamStateHandle(Path filePath, long stateSize) { Preconditions.checkNotNull(filePath); - this.wrappedStreamStateHandle = new FileStateHandle(filePath); + this.wrappedStreamStateHandle = new FileStateHandle(filePath, stateSize); } @Override @@ -71,7 +70,7 @@ public void discardState() throws Exception { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return wrappedStreamStateHandle.getStateSize(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java index 47103c19e3981..4c6531820e19e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state; +import java.io.IOException; + /** * Base of all types that represent checkpointed state. Specializations are for * example {@link StateHandle StateHandles} (directly resolve to state). @@ -47,7 +49,7 @@ public interface StateObject extends java.io.Closeable, java.io.Serializable { *

If the the size is not known, return {@code 0}. * * @return Size of the state in bytes. - * @throws Exception If the operation fails during size retrieval. + * @throws IOException If the operation fails during size retrieval. */ - long getStateSize() throws Exception; + long getStateSize() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index 5ae751bf8ec15..f361263ce7888 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -26,7 +26,9 @@ import java.io.IOException; -import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * {@link StreamStateHandle} for state that was written to a file stream. The written data is @@ -36,14 +38,13 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt private static final long serialVersionUID = 350284443258002355L; - /** - * The path to the file in the filesystem, fully describing the file system - */ + /** The path to the file in the filesystem, fully describing the file system */ private final Path filePath; - /** - * Cached file system handle - */ + /** The size of the state in the file */ + private final long stateSize; + + /** Cached file system handle */ private transient FileSystem fs; /** @@ -51,8 +52,10 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt * * @param filePath The path to the file that stores the state. */ - public FileStateHandle(Path filePath) { - this.filePath = requireNonNull(filePath); + public FileStateHandle(Path filePath, long stateSize) { + checkArgument(stateSize >= -1); + this.filePath = checkNotNull(filePath); + this.stateSize = stateSize; } /** @@ -86,8 +89,7 @@ public void discardState() throws Exception { // fail (and be ignored) when some files still exist try { getFileSystem().delete(filePath.getParent(), false); - } catch (IOException ignored) { - } + } catch (IOException ignored) {} } /** @@ -98,7 +100,7 @@ public void discardState() throws Exception { */ @Override public long getStateSize() throws IOException { - return getFileSystem().getFileStatus(filePath).getLen(); + return stateSize; } /** @@ -114,6 +116,7 @@ private FileSystem getFileSystem() throws IOException { return fs; } + // ------------------------------------------------------------------------ @Override public boolean equals(Object o) { @@ -133,4 +136,9 @@ public boolean equals(Object o) { public int hashCode() { return filePath.hashCode(); } + + @Override + public String toString() { + return String.format("File State: %s [%d bytes]", filePath, stateSize); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index c027558b66205..e4f7ebac2369a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -301,9 +301,16 @@ public StreamStateHandle closeAndGetHandle() throws IOException { } else { flush(); + + long size = -1; + // make a best effort attempt to figure out the size + try { + size = outStream.getPos(); + } catch (Exception ignored) {} + outStream.close(); closed = true; - return new FileStateHandle(statePath); + return new FileStateHandle(statePath, size); } } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java index a534b4064f377..7658afb89b67f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java @@ -25,10 +25,10 @@ import org.apache.flink.runtime.state.RetrievableStreamStateHandle; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import java.io.IOException; -import java.io.ObjectOutputStream; import java.io.Serializable; /** @@ -61,19 +61,14 @@ public RetrievableStateHandle store(T state) throws Exception { for (int attempt = 0; attempt < 10; attempt++) { Path filePath = getNewFilePath(); - FSDataOutputStream outStream; - try { - outStream = fs.create(filePath, false); + + try (FSDataOutputStream outStream = fs.create(filePath, false)) { + InstantiationUtil.serializeObject(outStream, state); + return new RetrievableStreamStateHandle(filePath, outStream.getPos()); } catch (Exception e) { latestException = e; - continue; - } - - try(ObjectOutputStream os = new ObjectOutputStream(outStream)) { - os.writeObject(state); } - return new RetrievableStreamStateHandle(filePath); } throw new Exception("Could not open output stream for state backend", latestException); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index f2737970462c6..6a8d072cdf16f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -183,7 +183,7 @@ public void discardState() throws Exception { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return 0; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java index c513e26dd4eca..504143bce41f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java @@ -334,7 +334,7 @@ private static CompletedCheckpoint[] generateRandomCheckpoints( StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize); SubtaskState subtaskState = new SubtaskState( - new ChainedStateHandle<>(Arrays.asList(proxy)), + new ChainedStateHandle<>(Collections.singletonList(proxy)), duration); taskState.putState(subtaskIndex, subtaskState); @@ -371,21 +371,11 @@ private static class StateHandleProxy extends FileStateHandle { private static final long serialVersionUID = 35356735683568L; - public StateHandleProxy(Path filePath, long proxySize) { - super(filePath); - this.proxySize = proxySize; - } - - private long proxySize; - - @Override - public void discardState() throws Exception { - + public StateHandleProxy(Path filePath, long size) { + super(filePath, size); } @Override - public long getStateSize() { - return proxySize; - } + public void discardState() {} } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java index 40e1852e15c2d..e6131056bdbd7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java @@ -87,12 +87,10 @@ private static final class CloseableHandle extends AbstractCloseableHandle { private static final long serialVersionUID = 1L; @Override - public void discardState() throws Exception { - - } + public void discardState() {} @Override - public long getStateSize() throws Exception { + public long getStateSize() { return 0; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 47f1bd541f648..9f52e9c2c965f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -198,6 +198,7 @@ private void block() { // an interrupt on a waiting object leads to an infinite loop try { synchronized (this) { + //noinspection WaitNotInLoop wait(); } } @@ -216,7 +217,7 @@ private void block() { public void discardState() throws Exception {} @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return 0; } }