From b2f3bc41bc991e8deb22fb89822f28c75d94c8f7 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 22 Feb 2017 22:18:50 +0100 Subject: [PATCH 1/4] [FLINK-5897] [checkpoints] Make checkpoint externalization not depend strictly on FileSystems That is the first step towards checkpoints that can be externalized to other stores as well, like k/v stores and databases, if supported by the state backend. --- .../checkpoint/CheckpointCoordinator.java | 20 ++- .../checkpoint/CompletedCheckpoint.java | 135 ++++++++++++++---- .../checkpoint/CompletedCheckpointStore.java | 1 + .../runtime/checkpoint/PendingCheckpoint.java | 92 +++++++----- .../StandaloneCompletedCheckpointStore.java | 4 + .../ZooKeeperCompletedCheckpointStore.java | 5 + .../checkpoint/savepoint/SavepointLoader.java | 19 ++- .../checkpoint/savepoint/SavepointStore.java | 93 +++++++++--- .../apache/flink/runtime/state/StateUtil.java | 17 +-- .../flink/runtime/jobmanager/JobManager.scala | 14 +- .../CheckpointCoordinatorFailureTest.java | 5 + .../CompletedCheckpointStoreTest.java | 2 +- .../checkpoint/CompletedCheckpointTest.java | 17 ++- .../checkpoint/PendingCheckpointTest.java | 25 ++-- .../jobmanager/JobManagerHARecoveryTest.java | 4 + .../runtime/jobmanager/JobManagerITCase.scala | 2 +- .../JobManagerHACheckpointRecoveryITCase.java | 2 +- 17 files changed, 337 insertions(+), 120 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c1c65b5309592..6da6f7d1dcf6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.TaskStateHandles; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -758,13 +759,19 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro CompletedCheckpoint completedCheckpoint = null; try { - completedCheckpoint = pendingCheckpoint.finalizeCheckpoint(); + // externalize the checkpoint if required + if (pendingCheckpoint.getProps().externalizeCheckpoint()) { + completedCheckpoint = pendingCheckpoint.finalizeCheckpointExternalized(); + } else { + completedCheckpoint = pendingCheckpoint.finalizeCheckpointNonExternalized(); + } completedCheckpointStore.addCheckpoint(completedCheckpoint); rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(checkpointId); - } catch (Exception exception) { + } + catch (Exception exception) { // abort the current pending checkpoint if it has not been discarded yet if (!pendingCheckpoint.isDiscarded()) { pendingCheckpoint.abortError(exception); @@ -779,8 +786,8 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro public void run() { try { cc.discard(); - } catch (Exception nestedException) { - LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException); + } catch (Throwable t) { + LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), t); } } }); @@ -808,11 +815,12 @@ public void run() { builder.append(", "); } // Remove last two chars ", " - builder.delete(builder.length() - 2, builder.length()); + builder.setLength(builder.length() - 2); LOG.debug(builder.toString()); } + // send the "notify complete" call to all vertices final long timestamp = completedCheckpoint.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { @@ -934,7 +942,7 @@ public boolean restoreLatestCheckpointedState( latest.getCheckpointID(), latest.getProperties(), restoreTimestamp, - latest.getExternalPath()); + latest.getExternalPointer()); statsTracker.reportRestoredCheckpoint(restored); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index db86484650058..17ce4d51b8bab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats.DiscardCallback; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +38,36 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) - * and that is considered completed. + * A CompletedCheckpoint describes a checkpoint after all required tasks acknowledged it (with their state) + * and that is considered successful. The CompletedCheckpoint class contains all the metadata of the + * checkpoint, i.e., checkpoint ID, timestamps, and the handles to all states that are part of the + * checkpoint. + * + *

Size the CompletedCheckpoint Instances

+ * + * In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint + * states are only pointers (such as file paths). However, the some state backend implementations may + * choose to store some payload data directly with the metadata (for example to avoid many small files). + * If those thresholds are increased to large values, the memory consumption of the CompletedCheckpoint + * objects can be significant. + * + *

Externalized Metadata

+ * + * The metadata of the CompletedCheckpoint is optionally also persisted in an external storage + * system. In that case, the checkpoint is called externalized. + * + *

Externalized checkpoints have an external pointer, which points to the metadata. For example + * when externalizing to a file system, that pointer is the file path to the checkpoint's folder + * or the metadata file. For a state backend that stores metadata in database tables, the pointer + * could be the table name and row key. The pointer is encoded as a String. + * + *

Externalized Metadata and High-availability

+ * + * For high availability setups, the checkpoint metadata must be stored persistent and available + * as well. The high-availability services that stores the checkpoint ground-truth (meaning what are + * the latest completed checkpoints in what order) often rely on checkpoints being externalized. That + * way, those services only store pointers to the externalized metadata, rather than the complete + * metadata itself (for example ZooKeeper's ZNode payload should ideally be less than megabytes). */ public class CompletedCheckpoint implements Serializable { @@ -44,8 +75,12 @@ public class CompletedCheckpoint implements Serializable { private static final long serialVersionUID = -8360248179615702014L; + // ------------------------------------------------------------------------ + + /** The ID of the job that the checkpoint belongs to */ private final JobID job; + /** The ID (logical timestamp) of the checkpoint */ private final long checkpointID; /** The timestamp when the checkpoint was triggered. */ @@ -60,23 +95,41 @@ public class CompletedCheckpoint implements Serializable { /** Properties for this checkpoint. */ private final CheckpointProperties props; - /** External path if persisted checkpoint; null otherwise. */ - private final String externalPath; + /** The state handle to the externalized meta data, if the metadata has been externalized */ + @Nullable + private final StreamStateHandle externalizedMetadata; + + /** External pointer to the completed checkpoint (for example file path) if externalized; null otherwise. */ + @Nullable + private final String externalPointer; /** Optional stats tracker callback for discard. */ @Nullable - private transient CompletedCheckpointStats.DiscardCallback discardCallback; + private transient volatile DiscardCallback discardCallback; // ------------------------------------------------------------------------ - public CompletedCheckpoint( + @VisibleForTesting + CompletedCheckpoint( JobID job, long checkpointID, long timestamp, long completionTimestamp, Map taskStates) { - this(job, checkpointID, timestamp, completionTimestamp, taskStates, CheckpointProperties.forStandardCheckpoint(), null); + this(job, checkpointID, timestamp, completionTimestamp, taskStates, + CheckpointProperties.forStandardCheckpoint()); + } + + public CompletedCheckpoint( + JobID job, + long checkpointID, + long timestamp, + long completionTimestamp, + Map taskStates, + CheckpointProperties props) { + + this(job, checkpointID, timestamp, completionTimestamp, taskStates, props, null, null); } public CompletedCheckpoint( @@ -86,24 +139,27 @@ public CompletedCheckpoint( long completionTimestamp, Map taskStates, CheckpointProperties props, - String externalPath) { + @Nullable StreamStateHandle externalizedMetadata, + @Nullable String externalPointer) { checkArgument(checkpointID >= 0); checkArgument(timestamp >= 0); checkArgument(completionTimestamp >= 0); + checkArgument((externalPointer == null) == (externalizedMetadata == null), + "external pointer without externalized metadata must be both null or both non-null"); + + checkArgument(!props.externalizeCheckpoint() || externalPointer != null, + "Checkpoint properties require externalized checkpoint, but checkpoint is not externalized"); + this.job = checkNotNull(job); this.checkpointID = checkpointID; this.timestamp = timestamp; this.duration = completionTimestamp - timestamp; this.taskStates = checkNotNull(taskStates); this.props = checkNotNull(props); - this.externalPath = externalPath; - - if (props.externalizeCheckpoint() && externalPath == null) { - throw new NullPointerException("Checkpoint properties say that the checkpoint " + - "should have been persisted, but missing external path."); - } + this.externalizedMetadata = externalizedMetadata; + this.externalPointer = externalPointer; } // ------------------------------------------------------------------------ @@ -146,10 +202,9 @@ public boolean discard(JobStatus jobStatus) throws Exception { discard(); return true; } else { - if (externalPath != null) { + if (externalPointer != null) { LOG.info("Persistent checkpoint with ID {} at '{}' not discarded.", - checkpointID, - externalPath); + checkpointID, externalPointer); } return false; @@ -158,14 +213,36 @@ public boolean discard(JobStatus jobStatus) throws Exception { void discard() throws Exception { try { - if (externalPath != null) { - SavepointStore.removeSavepointFile(externalPath); + // collect exceptions and continue cleanup + Exception exception = null; + + // drop the metadata, if we have some + if (externalizedMetadata != null) { + try { + externalizedMetadata.discardState(); + } + catch (Exception e) { + exception = e; + } } - StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); - } finally { + // drop the actual state + try { + StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); + } + catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + } + finally { taskStates.clear(); + // to be null-pointer safe, copy reference to stack + DiscardCallback discardCallback = this.discardCallback; if (discardCallback != null) { discardCallback.notifyDiscardedCheckpoint(); } @@ -190,8 +267,18 @@ public TaskState getTaskState(JobVertexID jobVertexID) { return taskStates.get(jobVertexID); } - public String getExternalPath() { - return externalPath; + public boolean isExternalized() { + return externalizedMetadata != null; + } + + @Nullable + public StreamStateHandle getExternalizedMetadata() { + return externalizedMetadata; + } + + @Nullable + public String getExternalPointer() { + return externalPointer; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index d2c0f6cba3475..676276eee3b9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -72,4 +72,5 @@ public interface CompletedCheckpointStore { */ int getNumberOfRetainedCheckpoints(); + boolean requiresExternalizedCheckpoints(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 908ff7fd58ee6..a321c824f0588 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -20,6 +20,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; import java.io.IOException; import java.util.HashMap; @@ -28,6 +29,8 @@ import java.util.Set; import java.util.concurrent.Executor; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; @@ -41,7 +44,9 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +79,7 @@ public class PendingCheckpoint { /** * The checkpoint properties. If the checkpoint should be persisted - * externally, it happens in {@link #finalizeCheckpoint()}. + * externally, it happens in {@link #finalizeCheckpointExternalized()}. */ private final CheckpointProperties props; @@ -203,48 +208,67 @@ public Future getCompletionFuture() { return onCompletionPromise; } - public CompletedCheckpoint finalizeCheckpoint() { + public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException { + synchronized (lock) { - Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - - // Persist if required - String externalPath = null; - if (props.externalizeCheckpoint()) { - try { - Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - externalPath = SavepointStore.storeSavepoint( - targetDirectory, - savepoint - ); - } catch (IOException e) { - LOG.error("Failed to persist checkpoint {}.",checkpointId, e); - } - } + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - CompletedCheckpoint completed = new CompletedCheckpoint( - jobId, - checkpointId, - checkpointTimestamp, - System.currentTimeMillis(), - new HashMap<>(taskStates), - props, - externalPath); + // externalize the metadata + final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - onCompletionPromise.complete(completed); + // TEMP FIX - The savepoint store is strictly typed to file systems currently + // but the checkpoints think more generic. we need to work with file handles + // here until the savepoint serializer accepts a generic stream factory - if (statsCallback != null) { - // Finalize the statsCallback and give the completed checkpoint a - // callback for discards. - CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath); - completed.setDiscardCallback(discardCallback); - } + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().getParent().toString(); - dispose(false); + return finalizeInternal(metadataHandle, externalPointer); + } + } + + public CompletedCheckpoint finalizeCheckpointNonExternalized() { + synchronized (lock) { + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - return completed; + // finalize without external metadata + return finalizeInternal(null, null); } } + @GuardedBy("lock") + private CompletedCheckpoint finalizeInternal( + @Nullable StreamStateHandle externalMetadata, + @Nullable String externalPointer) { + + assert(Thread.holdsLock(lock)); + + CompletedCheckpoint completed = new CompletedCheckpoint( + jobId, + checkpointId, + checkpointTimestamp, + System.currentTimeMillis(), + new HashMap<>(taskStates), + props, + externalMetadata, + externalPointer); + + onCompletionPromise.complete(completed); + + if (statsCallback != null) { + // Finalize the statsCallback and give the completed checkpoint a + // callback for discards. + CompletedCheckpointStats.DiscardCallback discardCallback = + statsCallback.reportCompletedCheckpoint(externalPointer); + completed.setDiscardCallback(discardCallback); + } + + // mark this pending checkpoint as disposed, but do NOT drop the state + dispose(false); + + return completed; + } + /** * Acknowledges the task with the given execution attempt id and the given subtask state. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 082bca9f262b4..a0248b276ada0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -96,4 +96,8 @@ public void shutdown(JobStatus jobStatus) throws Exception { } } + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index fdd0d409643f8..4b03ceab018b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -125,6 +125,11 @@ public ZooKeeperCompletedCheckpointStore( LOG.info("Initialized in '{}'.", checkpointsPath); } + @Override + public boolean requiresExternalizedCheckpoints() { + return true; + } + /** * Gets the latest checkpoint from ZooKeeper and removes all others. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java index 950a9a0f553bd..60f02878532b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.StreamStateHandle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,22 +48,27 @@ public class SavepointLoader { * @param jobId The JobID of the job to load the savepoint for. * @param tasks Tasks that will possibly be reset * @param savepointPath The path of the savepoint to rollback to - * @param userClassLoader The user code classloader + * @param classLoader The class loader to resolve serialized classes in legacy savepoint versions. * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped * to any job vertex in tasks. * * @throws IllegalStateException If mismatch between program and savepoint state - * @throws Exception If savepoint store failure + * @throws IOException If savepoint store failure */ public static CompletedCheckpoint loadAndValidateSavepoint( JobID jobId, Map tasks, String savepointPath, - ClassLoader userClassLoader, + ClassLoader classLoader, boolean allowNonRestoredState) throws IOException { // (1) load the savepoint - Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader); + final Tuple2 savepointAndHandle = + SavepointStore.loadSavepointWithHandle(savepointPath, classLoader); + + final Savepoint savepoint = savepointAndHandle.f0; + final StreamStateHandle metadataHandle = savepointAndHandle.f1; + final Map taskStates = new HashMap<>(savepoint.getTaskStates().size()); boolean expandedToLegacyIds = false; @@ -114,10 +121,12 @@ public static CompletedCheckpoint loadAndValidateSavepoint( // (3) convert to checkpoint so the system can fall back to it CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); - return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath); + return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, + taskStates, props, metadataHandle, savepointPath); } // ------------------------------------------------------------------------ + /** This class is not meant to be instantiated */ private SavepointLoader() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 95370a5a4c226..5c8ac6bce9853 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -19,12 +19,15 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; @@ -118,6 +121,28 @@ public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) * @throws IOException Failures during store are forwarded */ public static String storeSavepoint(String directory, T savepoint) throws IOException { + // write and create the file handle + FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint); + + // we return the savepoint directory path here! + // The directory path also works to resume from and is more elegant than the direct + // metadata file pointer + return metadataFileHandle.getFilePath().getParent().toString(); + } + + /** + * Stores the savepoint metadata file to a state handle. + * + * @param directory Target directory to store savepoint in + * @param savepoint Savepoint to be stored + * + * @return State handle to the checkpoint metadata + * @throws IOException Failures during store are forwarded + */ + public static FileStateHandle storeSavepointToHandle( + String directory, + T savepoint) throws IOException { + checkNotNull(directory, "Target directory"); checkNotNull(savepoint, "Savepoint"); @@ -127,10 +152,9 @@ public static String storeSavepoint(String directory, T sa final FileSystem fs = FileSystem.get(basePath.toUri()); boolean success = false; - try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); + try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); DataOutputStream dos = new DataOutputStream(fdos)) { - // Write header dos.writeInt(MAGIC_NUMBER); dos.writeInt(savepoint.getVersion()); @@ -138,7 +162,13 @@ public static String storeSavepoint(String directory, T sa // Write savepoint SavepointSerializer serializer = SavepointSerializers.getSerializer(savepoint); serializer.serialize(savepoint, dos); + + // construct result handle + FileStateHandle handle = new FileStateHandle(metadataFilePath, dos.size()); + + // all good! success = true; + return handle; } finally { if (!success && fs.exists(metadataFilePath)) { @@ -147,22 +177,37 @@ public static String storeSavepoint(String directory, T sa } } } - - // we return the savepoint directory path here! - // The directory path also works to resume from and is more elegant than the direct - // metadata file pointer - return basePath.toString(); } /** * Loads the savepoint at the specified path. * * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file. + * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats. * @return The loaded savepoint + * * @throws IOException Failures during load are forwarded */ - public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException { - Preconditions.checkNotNull(savepointFileOrDirectory, "Path"); + public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader classLoader) throws IOException { + return loadSavepointWithHandle(savepointFileOrDirectory, classLoader).f0; + } + + /** + * Loads the savepoint at the specified path. This methods returns the savepoint, as well as the + * handle to the metadata. + * + * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file. + * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats. + * @return The loaded savepoint + * + * @throws IOException Failures during load are forwarded + */ + public static Tuple2 loadSavepointWithHandle( + String savepointFileOrDirectory, + ClassLoader classLoader) throws IOException { + + checkNotNull(savepointFileOrDirectory, "savepointFileOrDirectory"); + checkNotNull(classLoader, "classLoader"); Path path = new Path(savepointFileOrDirectory); @@ -180,11 +225,13 @@ public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoad LOG.info("Using savepoint file in {}", path); } else { throw new IOException("Cannot find meta data file in directory " + path - + ". Please try to load the savepoint directly from the meta data file " - + "instead of the directory."); + + ". Please try to load the savepoint directly from the meta data file " + + "instead of the directory."); } } + // load the savepoint + final Savepoint savepoint; try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) { int magicNumber = dis.readInt(); @@ -192,15 +239,27 @@ public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoad int version = dis.readInt(); SavepointSerializer serializer = SavepointSerializers.getSerializer(version); - return serializer.deserialize(dis, userClassLoader); + savepoint = serializer.deserialize(dis, classLoader); } else { - throw new RuntimeException("Unexpected magic number. This is most likely " + - "caused by trying to load a Flink 1.0 savepoint. You cannot load a " + - "savepoint triggered by Flink 1.0 with this version of Flink. If it is " + - "_not_ a Flink 1.0 savepoint, this error indicates that the specified " + - "file is not a proper savepoint or the file has been corrupted."); + throw new RuntimeException("Unexpected magic number. This can have multiple reasons: " + + "(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " + + "version of Flink. (2) The file you were pointing to is not a savepoint at all. " + + "(3) The savepoint file has been corrupted."); } } + + // construct the stream handle to the metadata file + // we get the size best-effort + long size = 0; + try { + size = fs.getFileStatus(path).getLen(); + } + catch (Exception ignored) { + // we don't know the size, but we don't want to fail the savepoint loading for that + } + StreamStateHandle metadataHandle = new FileStateHandle(path, size); + + return new Tuple2<>(savepoint, metadataHandle); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java index c6f5c8698f3d5..b2508310e8e61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; import java.util.concurrent.RunnableFuture; @@ -42,26 +43,22 @@ public static void bestEffortDiscardAllStateObjects( Iterable handlesToDiscard) throws Exception { if (handlesToDiscard != null) { - - Exception suppressedExceptions = null; + Exception exception = null; for (StateObject state : handlesToDiscard) { if (state != null) { try { state.discardState(); - } catch (Exception ex) { - //best effort to still cleanup other states and deliver exceptions in the end - if (suppressedExceptions == null) { - suppressedExceptions = new Exception(ex); - } - suppressedExceptions.addSuppressed(ex); + } + catch (Exception ex) { + exception = ExceptionUtils.firstOrSuppressed(ex, exception); } } } - if (suppressedExceptions != null) { - throw suppressedExceptions; + if (exception != null) { + throw exception; } } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 21749cb5f61f6..87cd4acd8b2c3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager -import java.io.{File, IOException} +import java.io.IOException import java.net._ import java.util.UUID import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} @@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager} -import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway @@ -77,7 +77,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} -import org.apache.flink.runtime.{FlinkActor, JobException, LeaderSessionMessageFilter, LogMessages} +import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} import org.jboss.netty.channel.ChannelException @@ -611,7 +611,7 @@ class JobManager( new BiFunction[CompletedCheckpoint, Throwable, Void] { override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { if (success != null) { - val path = success.getExternalPath() + val path = success.getExternalPointer() log.info(s"Savepoint stored in $path. Now cancelling $jobId.") executionGraph.cancel() senderRef ! decorateMessage(CancellationSuccess(jobId, path)) @@ -787,11 +787,11 @@ class JobManager( new BiFunction[CompletedCheckpoint, Throwable, Void] { override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { if (success != null) { - if (success.getExternalPath != null) { + if (success.getExternalPointer != null) { senderRef ! TriggerSavepointSuccess( jobId, success.getCheckpointID, - success.getExternalPath, + success.getExternalPointer, success.getTimestamp ) } else { @@ -1784,7 +1784,7 @@ class JobManager( case t: Throwable => log.error(s"Could not properly unregister job $jobID form the library cache.", t) } - jobManagerMetricGroup.map(_.removeJob(jobID)) + jobManagerMetricGroup.foreach(_.removeJob(jobID)) futureOption } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index d4c3a2d2d7501..9517257a2af42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -134,5 +134,10 @@ public List getAllCheckpoints() throws Exception { public int getNumberOfRetainedCheckpoints() { return -1; } + + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 725b85f77233d..f77c755797827 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -237,7 +237,7 @@ public TestCompletedCheckpoint( Map taskGroupStates, CheckpointProperties props) { - super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props, null); + super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 0d933ff53627c..b34e9a6ac26f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -55,7 +57,9 @@ public void testDiscard() throws Exception { // Verify discard call is forwarded to state CompletedCheckpoint checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(), file.getAbsolutePath()); + new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(), + new FileStateHandle(new Path(file.toURI()), file.length()), + file.getAbsolutePath()); checkpoint.discard(JobStatus.FAILED); @@ -74,7 +78,7 @@ public void testCleanUpOnSubsume() throws Exception { boolean discardSubsumed = true; CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true); CompletedCheckpoint checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, taskStates, props, null); + new JobID(), 0, 0, 1, taskStates, props); // Subsume checkpoint.subsume(); @@ -104,7 +108,9 @@ public void testCleanUpOnShutdown() throws Exception { // Keep CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); CompletedCheckpoint checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, externalPath); + new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, + new FileStateHandle(new Path(file.toURI()), file.length()), + externalPath); checkpoint.discard(status); verify(state, times(0)).discardState(); @@ -113,7 +119,7 @@ public void testCleanUpOnShutdown() throws Exception { // Discard props = new CheckpointProperties(false, false, true, true, true, true, true); checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, null); + new JobID(), 0, 0, 1, new HashMap<>(taskStates), props); checkpoint.discard(status); verify(state, times(1)).discardState(); @@ -135,8 +141,7 @@ public void testCompletedCheckpointStatsCallbacks() throws Exception { 0, 1, new HashMap<>(taskStates), - CheckpointProperties.forStandardCheckpoint(), - null); + CheckpointProperties.forStandardCheckpoint()); CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class); completed.setDiscardCallback(callback); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 3a85c4c0ea030..6f04f39b37fa8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -24,9 +24,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + import org.mockito.Mockito; import java.io.File; @@ -49,9 +51,6 @@ public class PendingCheckpointTest { - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - private static final Map ACK_TASKS = new HashMap<>(); private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID(); @@ -59,6 +58,9 @@ public class PendingCheckpointTest { ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class)); } + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + /** * Tests that pending checkpoints can be subsumed iff they are forced. */ @@ -96,7 +98,7 @@ public void testPersistExternally() throws Exception { PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath()); pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertEquals(0, tmp.listFiles().length); - pending.finalizeCheckpoint(); + pending.finalizeCheckpointExternalized(); assertEquals(1, tmp.listFiles().length); // Ephemeral checkpoint @@ -105,7 +107,7 @@ public void testPersistExternally() throws Exception { pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertEquals(1, tmp.listFiles().length); - pending.finalizeCheckpoint(); + pending.finalizeCheckpointNonExternalized(); assertEquals(1, tmp.listFiles().length); } @@ -148,7 +150,8 @@ public void testCompletionFuture() throws Exception { assertFalse(future.isDone()); pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); - pending.finalizeCheckpoint(); + assertTrue(pending.isFullyAcknowledged()); + pending.finalizeCheckpointExternalized(); assertTrue(future.isDone()); // Finalize (missing ACKs) @@ -157,7 +160,13 @@ public void testCompletionFuture() throws Exception { assertFalse(future.isDone()); try { - pending.finalizeCheckpoint(); + pending.finalizeCheckpointNonExternalized(); + fail("Did not throw expected Exception"); + } catch (IllegalStateException ignored) { + // Expected + } + try { + pending.finalizeCheckpointExternalized(); fail("Did not throw expected Exception"); } catch (IllegalStateException ignored) { // Expected @@ -233,7 +242,7 @@ public void testPendingCheckpointStatsCallbacks() throws Exception { pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class)); - pending.finalizeCheckpoint(); + pending.finalizeCheckpointNonExternalized(); verify(callback, times(1)).reportCompletedCheckpoint(any(String.class)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 5a38be2860aea..cbb077ce2a053 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -483,6 +483,10 @@ public int getNumberOfRetainedCheckpoints() { return checkpoints.size(); } + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } } static class MyCheckpointRecoveryFactory implements CheckpointRecoveryFactory { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 60b12d2295ed6..75f1fd4f4973c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -979,7 +979,7 @@ class JobManagerITCase(_system: ActorSystem) jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor) val checkpoint = Mockito.mock(classOf[CompletedCheckpoint]) - when(checkpoint.getExternalPath).thenReturn("Expected test savepoint path") + when(checkpoint.getExternalPointer).thenReturn("Expected test savepoint path") // Succeed the promise savepointPromise.complete(checkpoint) diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 60a3a62bf4b30..f910e498d5e18 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -167,7 +167,7 @@ public void testCheckpointedStreamingSumProgram() throws Exception { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); ActorSystem testSystem = null; - JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; + final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; LeaderRetrievalService leaderRetrievalService = null; ActorSystem taskManagerSystem = null; From a3d2405f690e08d3de4f641428887ab04ba2ca2d Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 17 Feb 2017 17:51:00 +0100 Subject: [PATCH 2/4] [FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend --- .../streaming/state/RocksDBStateBackend.java | 12 +- .../state/RocksDBStateBackendFactory.java | 19 +- .../jobmanager/JMXJobManagerMetricTest.java | 2 +- .../CheckpointConfigHandlerTest.java | 3 + .../checkpoint/CheckpointCoordinator.java | 21 ++- .../executiongraph/ExecutionGraph.java | 4 +- .../executiongraph/ExecutionGraphBuilder.java | 32 +++- .../tasks/JobSnapshottingSettings.java | 15 +- .../runtime/state/AbstractStateBackend.java | 173 +++++++++++++++++- .../runtime/state/StateBackendFactory.java | 16 +- .../state/filesystem/FsStateBackend.java | 31 +++- .../filesystem/FsStateBackendFactory.java | 22 +-- .../runtime/state/heap/package-info.java | 23 +++ .../runtime/state/internal/package-info.java | 52 ++++++ .../state/memory/MemoryStateBackend.java | 2 +- .../CheckpointStatsTrackerTest.java | 1 + .../checkpoint/CoordinatorShutdownTest.java | 5 +- ...ecutionGraphCheckpointCoordinatorTest.java | 3 +- .../ArchivedExecutionGraphTest.java | 3 +- .../tasks/JobSnapshottingSettingsTest.java | 6 + .../jobmanager/JobManagerHARecoveryTest.java | 1 + .../runtime/jobmanager/JobManagerTest.java | 5 + .../runtime/jobmanager/JobSubmitTest.java | 2 +- .../state/StateBackendLoadingTest.java | 164 +++++++++++++++++ .../runtime/jobmanager/JobManagerITCase.scala | 3 + .../api/graph/StreamGraphGenerator.java | 2 +- .../api/graph/StreamingJobGraphGenerator.java | 1 + .../streaming/runtime/tasks/StreamTask.java | 72 ++------ .../tasks/BlockingCheckpointsTest.java | 2 +- .../runtime/tasks/StreamTaskTest.java | 56 +++--- .../streaming/runtime/StateBackendITCase.java | 2 +- 31 files changed, 609 insertions(+), 146 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 3fd5d0f5c505b..dd0e2f72e3840 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -29,10 +29,12 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.util.AbstractID; + import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,7 +162,7 @@ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) { private void lazyInitializeForJob( Environment env, - String operatorIdentifier) throws Exception { + String operatorIdentifier) throws IOException { if (isInitialized) { return; @@ -193,7 +195,7 @@ private void lazyInitializeForJob( } if (dirs.isEmpty()) { - throw new Exception("No local storage directories available. " + errorMessage); + throw new IOException("No local storage directories available. " + errorMessage); } else { initializedDbBasePaths = dirs.toArray(new File[dirs.size()]); } @@ -235,7 +237,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry) throws IOException { // first, make sure that the RocksDB JNI library is loaded // we do this explicitly here to have better error handling @@ -437,7 +439,7 @@ public String toString() { // static library loading utilities // ------------------------------------------------------------------------ - private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception { + private void ensureRocksDBIsLoaded(String tempDirectory) throws IOException { synchronized (RocksDBStateBackend.class) { if (!rocksDbInitialized) { @@ -488,7 +490,7 @@ private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception { } } - throw new Exception("Could not load the native RocksDB library", lastException); + throw new IOException("Could not load the native RocksDB library", lastException); } } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java index 5002272c34767..bd9bcaa241be9 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java @@ -15,24 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateBackendFactory; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; /** * A factory that creates an {@link org.apache.flink.contrib.streaming.state.RocksDBStateBackend} * from a configuration. */ -public class RocksDBStateBackendFactory implements StateBackendFactory { +public class RocksDBStateBackendFactory implements StateBackendFactory { protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class); @@ -44,9 +45,11 @@ public class RocksDBStateBackendFactory implements StateBackendFactoryemptyList(), Collections.emptyList(), Collections.emptyList(), - 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true)); + 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), null, true)); flink.waitForActorsToBeAlive(); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java index e517c3c84b56f..95ced0a2cbade 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java @@ -56,6 +56,7 @@ public void testSimpleConfig() throws Exception { minPause, maxConcurrent, externalized, + null, true); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); @@ -92,6 +93,7 @@ public void testAtLeastOnce() throws Exception { 1212L, 12, ExternalizedCheckpointSettings.none(), + null, false); // at least once AccessExecutionGraph graph = mock(AccessExecutionGraph.class); @@ -122,6 +124,7 @@ public void testEnabledExternalizedCheckpointSettings() throws Exception { 1212L, 12, externalizedSettings, + null, false); // at least once AccessExecutionGraph graph = mock(AccessExecutionGraph.class); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6da6f7d1dcf6b..0592e3d9aeac9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -85,6 +85,12 @@ public class CheckpointCoordinator { /** The job whose checkpoint this coordinator coordinates */ private final JobID job; + /** Default checkpoint properties **/ + private final CheckpointProperties checkpointProperties; + + /** The executor used for asynchronous calls, like potentially blocking I/O */ + private final Executor executor; + /** Tasks who need to be sent a message when a checkpoint is started */ private final ExecutionVertex[] tasksToTrigger; @@ -101,7 +107,9 @@ public class CheckpointCoordinator { * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; - /** Default directory for persistent checkpoints; null if none configured. */ + /** Default directory for persistent checkpoints; null if none configured. + * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */ + @Nullable private final String checkpointDirectory; /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */ @@ -154,11 +162,6 @@ public class CheckpointCoordinator { @Nullable private CheckpointStatsTracker statsTracker; - /** Default checkpoint properties **/ - private final CheckpointProperties checkpointProperties; - - private final Executor executor; - // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -173,7 +176,7 @@ public CheckpointCoordinator( ExecutionVertex[] tasksToCommitTo, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, - String checkpointDirectory, + @Nullable String checkpointDirectory, Executor executor) { // sanity checks @@ -211,6 +214,8 @@ public CheckpointCoordinator( this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.checkpointDirectory = checkpointDirectory; + this.executor = checkNotNull(executor); + this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.timer = new Timer("Checkpoint Timer", true); @@ -229,8 +234,6 @@ public CheckpointCoordinator( } catch (Throwable t) { throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t); } - - this.executor = checkNotNull(executor); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index ad4347d0d9692..a76a421622854 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; @@ -348,7 +349,7 @@ public boolean isArchived() { return false; } - public void enableSnapshotCheckpointing( + public void enableCheckpointing( long interval, long checkpointTimeout, long minPauseBetweenCheckpoints, @@ -360,6 +361,7 @@ public void enableSnapshotCheckpointing( CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, String checkpointDir, + StateBackend metadataStore, CheckpointStatsTracker statsTracker) { // simple sanity checks diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index c558e43587969..2a793028ca80d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -37,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.util.DynamicCodeLoadingException; import org.slf4j.Logger; import javax.annotation.Nullable; @@ -71,8 +75,8 @@ public static ExecutionGraph buildGraph( MetricGroup metrics, int parallelismForAutoMax, Logger log) - throws JobExecutionException, JobException - { + throws JobExecutionException, JobException { + checkNotNull(jobGraph, "job graph cannot be null"); final String jobName = jobGraph.getName(); @@ -191,7 +195,28 @@ public static ExecutionGraph buildGraph( String externalizedCheckpointsDir = jobManagerConfig.getString( ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null); - executionGraph.enableSnapshotCheckpointing( + // load the state backend for checkpoint metadata. + // if specified in the application, use from there, otherwise load from configuration + final StateBackend metadataBackend; + + final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); + if (applicationConfiguredBackend != null) { + metadataBackend = applicationConfiguredBackend; + + log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", + applicationConfiguredBackend); + } + else { + try { + metadataBackend = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log); + } + catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) { + throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e); + } + } + + executionGraph.enableCheckpointing( snapshotSettings.getCheckpointInterval(), snapshotSettings.getCheckpointTimeout(), snapshotSettings.getMinPauseBetweenCheckpoints(), @@ -203,6 +228,7 @@ public static ExecutionGraph buildGraph( checkpointIdCounter, completedCheckpoints, externalizedCheckpointsDir, + metadataBackend, checkpointStatsTracker); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java index 561ba89cf0b97..233aa8887de5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.StateBackend; +import javax.annotation.Nullable; import java.util.List; import static java.util.Objects.requireNonNull; @@ -50,6 +52,10 @@ public class JobSnapshottingSettings implements java.io.Serializable { /** Settings for externalized checkpoints. */ private final ExternalizedCheckpointSettings externalizedCheckpointSettings; + /** The default state backend, if configured by the user in the job */ + @Nullable + private final StateBackend defaultStateBackend; + /** * Flag indicating whether exactly once checkpoint mode has been configured. * If false, at least once mode has been configured. This is @@ -58,7 +64,7 @@ public class JobSnapshottingSettings implements java.io.Serializable { * UI. */ private final boolean isExactlyOnce; - + public JobSnapshottingSettings( List verticesToTrigger, List verticesToAcknowledge, @@ -68,6 +74,7 @@ public JobSnapshottingSettings( long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, ExternalizedCheckpointSettings externalizedCheckpointSettings, + @Nullable StateBackend defaultStateBackend, boolean isExactlyOnce) { // sanity checks @@ -84,6 +91,7 @@ public JobSnapshottingSettings( this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings); + this.defaultStateBackend = defaultStateBackend; this.isExactlyOnce = isExactlyOnce; } @@ -121,6 +129,11 @@ public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() { return externalizedCheckpointSettings; } + @Nullable + public StateBackend getDefaultStateBackend() { + return defaultStateBackend; + } + public boolean isExactlyOnce() { return isExactlyOnce; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index a335e45eb620e..2cf20a1cdf9f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -21,20 +21,50 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * An abstract base implementation of the {@link StateBackend} interface. + * + *

*/ @PublicEvolving public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable { private static final long serialVersionUID = 4620415814639230247L; + // ------------------------------------------------------------------------ + // Configuration shortcut names + // ------------------------------------------------------------------------ + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // ------------------------------------------------------------------------ + // State Backend - Persisting Byte Storage + // ------------------------------------------------------------------------ + @Override public abstract CheckpointStreamFactory createStreamFactory( JobID jobId, @@ -46,6 +76,10 @@ public abstract CheckpointStreamFactory createSavepointStreamFactory( String operatorIdentifier, @Nullable String targetLocation) throws IOException; + // ------------------------------------------------------------------------ + // State Backend - State-Holding Backends + // ------------------------------------------------------------------------ + @Override public abstract AbstractKeyedStateBackend createKeyedStateBackend( Environment env, @@ -54,7 +88,7 @@ public abstract AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception; + TaskKvStateRegistry kvStateRegistry) throws IOException; @Override public OperatorStateBackend createOperatorStateBackend( @@ -63,4 +97,141 @@ public OperatorStateBackend createOperatorStateBackend( return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + // ------------------------------------------------------------------------ + // Loading the state backend from a configuration + // ------------------------------------------------------------------------ + + /** + * Loads the state backend from the configuration, from the parameter 'state.backend', as defined + * in {@link CoreOptions#STATE_BACKEND}. + * + *

The state backends can be specified either via their shortcut name, or via the class name + * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory + * is instantiated (via its zero-argument constructor) and its + * {@link StateBackendFactory#createFromConfig(Configuration)} method is called. + * + *

Recognized shortcut names are '{@value AbstractStateBackend#MEMORY_STATE_BACKEND_NAME}', + * '{@value AbstractStateBackend#FS_STATE_BACKEND_NAME}', and + * '{@value AbstractStateBackend#ROCKSDB_STATE_BACKEND_NAME}'. + * + * @param config The configuration to load the state backend from + * @param classLoader The class loader that should be used to load the state backend + * @param logger Optionally, a logger to log actions to (may be null) + * + * @return The instantiated state backend. + * + * @throws DynamicCodeLoadingException + * Thrown if a state backend factory is configured and the factory class was not + * found or the factory could not be instantiated + * @throws IllegalConfigurationException + * May be thrown by the StateBackendFactory when creating / configuring the state + * backend in the factory + * @throws IOException + * May be thrown by the StateBackendFactory when instantiating the state backend + */ + public static StateBackend loadStateBackendFromConfig( + Configuration config, + ClassLoader classLoader, + @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { + + checkNotNull(config, "config"); + checkNotNull(classLoader, "classLoader"); + + final String backendName = config.getString(CoreOptions.STATE_BACKEND); + if (backendName == null) { + return null; + } + + // by default the factory class is the backend name + String factoryClassName = backendName; + + switch (backendName.toLowerCase()) { + case MEMORY_STATE_BACKEND_NAME: + if (logger != null) { + logger.info("State backend is set to heap memory (checkpoint to JobManager)"); + } + return new MemoryStateBackend(); + + case FS_STATE_BACKEND_NAME: + FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config); + if (logger != null) { + logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", + fsBackend.getBasePath()); + } + return fsBackend; + + case ROCKSDB_STATE_BACKEND_NAME: + factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory"; + // fall through to the 'default' case that uses reflection to load the backend + // that way we can keep RocksDB in a separate module + + default: + if (logger != null) { + logger.info("Loading state backend via factory {}", factoryClassName); + } + + StateBackendFactory factory; + try { + @SuppressWarnings("rawtypes") + Class clazz = + Class.forName(factoryClassName, false, classLoader) + .asSubclass(StateBackendFactory.class); + + factory = clazz.newInstance(); + } + catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured state backend factory class: " + backendName, e); + } + catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException("The class configured under '" + + CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" + + backendName + ')', e); + } + + return factory.createFromConfig(config); + } + } + + /** + * Loads the state backend from the configuration, from the parameter 'state.backend', as defined + * in {@link CoreOptions#STATE_BACKEND}. If no state backend is configures, this instantiates the + * default state backend (the {@link MemoryStateBackend}). + * + *

Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on + * how the state backend is loaded from the configuration. + * + * @param config The configuration to load the state backend from + * @param classLoader The class loader that should be used to load the state backend + * @param logger Optionally, a logger to log actions to (may be null) + * + * @return The instantiated state backend. + * + * @throws DynamicCodeLoadingException + * Thrown if a state backend factory is configured and the factory class was not + * found or the factory could not be instantiated + * @throws IllegalConfigurationException + * May be thrown by the StateBackendFactory when creating / configuring the state + * backend in the factory + * @throws IOException + * May be thrown by the StateBackendFactory when instantiating the state backend + */ + public static StateBackend loadStateBackendFromConfigOrCreateDefault( + Configuration config, + ClassLoader classLoader, + @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { + + final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger); + + if (fromConfig != null) { + return fromConfig; + } + else { + if (logger != null) { + logger.info("No state backend has been configured, using default state backend (Memory / JobManager)"); + } + return new MemoryStateBackend(); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java index 39e7ed25fc74b..78c976a17a94d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java @@ -18,17 +18,24 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import java.io.IOException; import java.io.Serializable; /** * A factory to create a specific state backend. The state backend creation gets a Configuration * object that can be used to read further config values. * + *

The state backend factory is typically specified in the configuration to produce a + * configured state backend. + * * @param The type of the state backend created. */ -public interface StateBackendFactory extends Serializable { +@PublicEvolving +public interface StateBackendFactory extends Serializable { /** * Creates the state backend, optionally using the given configuration. @@ -36,7 +43,10 @@ public interface StateBackendFactory extends Ser * @param config The Flink configuration (loaded by the TaskManager). * @return The created state backend. * - * @throws Exception Exceptions during instantiation can be forwarded. + * @throws IllegalConfigurationException + * If the configuration misses critical values, or specifies invalid values + * @throws IOException + * If the state backend initialization failed due to an I/O exception */ - AbstractStateBackend createFromConfig(Configuration config) throws Exception; + T createFromConfig(Configuration config) throws IllegalConfigurationException, IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index b614d9816ae65..5e8a15ddd59ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -36,6 +36,8 @@ import java.net.URI; import java.net.URISyntaxException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * The file state backend is a state backend that stores the state of streaming jobs in a file system. * @@ -139,17 +141,14 @@ public FsStateBackend(URI checkpointDataUri) throws IOException { * rather than in files * * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds. */ public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException { - if (fileStateSizeThreshold < 0) { - throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); - } - if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) { - throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + - MAX_FILE_STATE_THRESHOLD); - } + checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger."); + checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, + "The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD); + this.fileStateThreshold = fileStateSizeThreshold; - this.basePath = validateAndNormalizeUri(checkpointDataUri); } @@ -163,6 +162,19 @@ public Path getBasePath() { return basePath; } + /** + * Gets the threshold below which state is stored as part of the metadata, rather than in files. + * This threshold ensures that the backend does not create a large amount of very small files, + * where potentially the file pointers are larger than the state itself. + * + *

By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}. + * + * @return The file size threshold, in bytes. + */ + public int getMinFileSizeThreshold() { + return fileStateThreshold; + } + // ------------------------------------------------------------------------ // initialization and cleanup // ------------------------------------------------------------------------ @@ -189,7 +201,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry) throws IOException { + return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java index 042700c051190..4c933ef954375 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackendFactory; +import java.io.IOException; + /** * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} * from a configuration. @@ -35,28 +37,26 @@ public class FsStateBackendFactory implements StateBackendFactoryinternal state type hierarchy. + * + *

The internal state classes give access to the namespace getters and setters and access to + * additional functionality, like raw value access or state merging. + * + *

The public API state hierarchy is intended to be programmed against by Flink applications. + * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not + * intended to be used by user applications. These internal methods are considered of limited use to users and + * only confusing, and are usually not regarded as stable across releases. + * + *

Each specific type in the internal state hierarchy extends the type from the public + * state hierarchy. The following illustrates the relationship between the public- and the internal + * hierarchy at the example of a subset of the classes: + * + *

+ *             State
+ *               |
+ *               +-------------------InternalKvState
+ *               |                         |
+ *          MergingState                   |
+ *               |                         |
+ *               +-----------------InternalMergingState
+ *               |                         |
+ *      +--------+------+                  |
+ *      |               |                  |
+ * ReducingState    ListState        +-----+-----------------+
+ *      |               |            |                       |
+ *      |               +-----------   -----------------InternalListState
+ *      |                            |
+ *      +------------------InternalReducingState
+ * 
+ */ +package org.apache.flink.runtime.state.internal; \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 2cc116427554b..6e6b034ac1de6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -90,7 +90,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + TaskKvStateRegistry kvStateRegistry) { return new HeapKeyedStateBackend<>( kvStateRegistry, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index 9a39182b56289..7ab71cb9502cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -62,6 +62,7 @@ public void testGetSnapshottingSettings() throws Exception { 191929L, 123, ExternalizedCheckpointSettings.none(), + null, false); CheckpointStatsTracker tracker = new CheckpointStatsTracker( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 7949ef0985004..976da48b2b94b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.junit.Test; @@ -67,7 +66,7 @@ public void testCoordinatorShutsDownOnFailure() { JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, - 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true)); + 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); @@ -126,7 +125,7 @@ public void testCoordinatorShutsDownOnSuccess() { JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, - 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true)); + 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 47e68267f2ce0..8f565dd0327f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -106,7 +106,7 @@ private ExecutionGraph createExecutionGraphAndEnableCheckpointing( ClassLoader.getSystemClassLoader(), new UnregisteredMetricsGroup()); - executionGraph.enableSnapshotCheckpointing( + executionGraph.enableCheckpointing( 100, 100, 100, @@ -118,6 +118,7 @@ private ExecutionGraph createExecutionGraphAndEnableCheckpointing( counter, store, null, + null, CheckpointStatsTrackerTest.createTestTracker()); JobVertex jobVertex = new JobVertex("MockVertex"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 46ce3f43c09a8..3090172c3fbf4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -112,7 +112,7 @@ public static void setupExecutionGraph() throws Exception { mock(JobSnapshottingSettings.class), new UnregisteredMetricsGroup()); - runtimeGraph.enableSnapshotCheckpointing( + runtimeGraph.enableCheckpointing( 100, 100, 100, @@ -124,6 +124,7 @@ public static void setupExecutionGraph() throws Exception { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, + null, statsTracker); Map> userAccumulators = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java index 667dbcadd0265..2508d5ce62a7f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java @@ -20,11 +20,14 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.Test; import java.util.Arrays; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class JobSnapshottingSettingsTest { @@ -42,6 +45,7 @@ public void testIsJavaSerializable() throws Exception { 112, 12, ExternalizedCheckpointSettings.externalizeCheckpoints(true), + new MemoryStateBackend(), false); JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings); @@ -55,5 +59,7 @@ public void testIsJavaSerializable() throws Exception { assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints()); assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation()); assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce()); + assertNotNull(copy.getDefaultStateBackend()); + assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index cbb077ce2a053..115b06cb20245 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -225,6 +225,7 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { 0, 1, ExternalizedCheckpointSettings.none(), + null, true)); BlockingStatefulInvokable.initializeStaticHelpers(slots); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index c5f6d99dcea83..727fc6507a4c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -829,6 +829,7 @@ public void testCancelWithSavepoint() throws Exception { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -954,6 +955,7 @@ public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -1059,6 +1061,7 @@ public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -1161,6 +1164,7 @@ public void testSavepointRestoreSettings() throws Exception { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -1207,6 +1211,7 @@ public void testSavepointRestoreSettings() throws Exception { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); newJobGraph.setSnapshotSettings(newSnapshottingSettings); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index feb3d4d6ea075..529c10085d1e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -229,7 +229,7 @@ private JobGraph createSimpleJobGraph() { JobGraph jg = new JobGraph("test job", jobVertex); jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, - 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), true)); + 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), null, true)); return jg; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java new file mode 100644 index 0000000000000..a64faf13dd26b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * This test validates that state backends are properly loaded from configuration. + */ +public class StateBackendLoadingTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + private final ClassLoader cl = getClass().getClassLoader(); + + private final String backendKey = CoreOptions.STATE_BACKEND.key(); + + // ------------------------------------------------------------------------ + + @Test + public void testNoStateBackendDefined() throws Exception { + assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null)); + } + + @Test + public void testInstantiateMemoryBackendByDefault() throws Exception { + StateBackend backend = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + + assertTrue(backend instanceof MemoryStateBackend); + } + + @Test + public void testLoadMemoryStateBackend() throws Exception { + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + final Configuration config = new Configuration(); + config.setString(backendKey, "jobmanager"); + + StateBackend backend = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + + assertTrue(backend instanceof MemoryStateBackend); + } + + @Test + public void testLoadFileSystemStateBackend() throws Exception { + final String checkpointDir = new Path(tmp.getRoot().toURI()).toString(); + final Path expectedPath = new Path(checkpointDir); + final int threshold = 1000000; + + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "filesystem"); + config1.setString("state.checkpoints.dir", checkpointDir); + config1.setString("state.backend.fs.checkpointdir", checkpointDir); + config1.setInteger("state.backend.fs.memory-threshold", threshold); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, FsStateBackendFactory.class.getName()); + config2.setString("state.checkpoints.dir", checkpointDir); + config2.setString("state.backend.fs.checkpointdir", checkpointDir); + config2.setInteger("state.backend.fs.memory-threshold", threshold); + + StateBackend backend1 = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(config1, cl, null); + + StateBackend backend2 = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(config2, cl, null); + + assertTrue(backend1 instanceof FsStateBackend); + assertTrue(backend2 instanceof FsStateBackend); + + FsStateBackend fs1 = (FsStateBackend) backend1; + FsStateBackend fs2 = (FsStateBackend) backend2; + + assertEquals(expectedPath, fs1.getBasePath()); + assertEquals(expectedPath, fs2.getBasePath()); + assertEquals(threshold, fs1.getMinFileSizeThreshold()); + assertEquals(threshold, fs2.getMinFileSizeThreshold()); + } + + /** + * This test makes sure that failures properly manifest when the state backend could not be loaded. + */ + @Test + public void testLoadingFails() throws Exception { + final Configuration config = new Configuration(); + + // try a value that is neither recognized as a name, nor corresponds to a class + config.setString(backendKey, "does.not.exist"); + try { + AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null); + fail("should fail with an exception"); + } catch (DynamicCodeLoadingException ignored) { + // expected + } + + // try a class that is not a factory + config.setString(backendKey, java.io.File.class.getName()); + try { + AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null); + fail("should fail with an exception"); + } catch (DynamicCodeLoadingException ignored) { + // expected + } + + // a factory that fails + config.setString(backendKey, FailingFactory.class.getName()); + try { + AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null); + fail("should fail with an exception"); + } catch (IOException ignored) { + // expected + } + } + + // ------------------------------------------------------------------------ + + static final class FailingFactory implements StateBackendFactory { + private static final long serialVersionUID = 1L; + + @Override + public StateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException { + throw new IOException("fail!"); + } + } +} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 75f1fd4f4973c..31e72ddacdaab 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -822,6 +822,7 @@ class JobManagerITCase(_system: ActorSystem) 60000, 1, ExternalizedCheckpointSettings.none, + null, true)) // Submit job... @@ -881,6 +882,7 @@ class JobManagerITCase(_system: ActorSystem) 60000, 1, ExternalizedCheckpointSettings.none, + null, true)) // Submit job... @@ -948,6 +950,7 @@ class JobManagerITCase(_system: ActorSystem) 60000, 1, ExternalizedCheckpointSettings.none, + null, true)) // Submit job... diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index ddd05154d1006..9f0d65633d739 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -82,7 +82,7 @@ public class StreamGraphGenerator { public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; // The StreamGraph that is being built, this is initialized at the beginning. - private StreamGraph streamGraph; + private final StreamGraph streamGraph; private final StreamExecutionEnvironment env; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index a4bb1657be058..003eff909142c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -539,6 +539,7 @@ private void configureCheckpointing() { cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), externalizedCheckpointSettings, + streamGraph.getStateBackend(), isExactlyOnce); jobGraph.setSnapshotSettings(settings); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 938ffd280182d..1e208eefd7b48 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -20,9 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; @@ -43,13 +40,10 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StateBackendFactory; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -63,6 +57,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,7 +142,7 @@ public abstract class StreamTask> private StreamConfig configuration; /** Our state backend. We use this to create checkpoint streams and a keyed state backend. */ - private AbstractStateBackend stateBackend; + private StateBackend stateBackend; /** Keyed state backend for the head operator, if it is keyed. There can only ever be one. */ private AbstractKeyedStateBackend keyedStateBackend; @@ -713,61 +708,20 @@ private void checkRestorePreconditions(int operatorChainLength) { // State backend // ------------------------------------------------------------------------ - private AbstractStateBackend createStateBackend() throws Exception { - AbstractStateBackend stateBackend = configuration.getStateBackend(getUserCodeClassLoader()); + private StateBackend createStateBackend() throws Exception { + final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader()); - if (stateBackend != null) { + if (fromJob != null) { // backend has been configured on the environment LOG.info("Using user-defined state backend: {}.", stateBackend); - } else { - // see if we have a backend specified in the configuration - Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); - String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null); - - if (backendName == null) { - LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)"); - backendName = "jobmanager"; - } - - switch (backendName.toLowerCase()) { - case "jobmanager": - LOG.info("State backend is set to heap memory (checkpoint to jobmanager)"); - stateBackend = new MemoryStateBackend(); - break; - - case "filesystem": - FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig); - LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", - backend.getBasePath()); - stateBackend = backend; - break; - - case "rocksdb": - backendName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory"; - // fall through to the 'default' case that uses reflection to load the backend - // that way we can keep RocksDB in a separate module - - default: - try { - @SuppressWarnings("rawtypes") - Class clazz = - Class.forName(backendName, false, getUserCodeClassLoader()). - asSubclass(StateBackendFactory.class); - - stateBackend = clazz.newInstance().createFromConfig(flinkConfig); - } catch (ClassNotFoundException e) { - throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName); - } catch (ClassCastException e) { - throw new IllegalConfigurationException("The class configured under '" + - CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" + - backendName + ')'); - } catch (Throwable t) { - throw new IllegalConfigurationException("Cannot create configured state backend", t); - } - } + return fromJob; + } + else { + return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault( + getEnvironment().getTaskManagerInfo().getConfiguration(), + getUserCodeClassLoader(), + LOG); } - - return stateBackend; } public OperatorStateBackend createOperatorStateBackend( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index 51294ce0cf1aa..e266ea1d053f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -183,7 +183,7 @@ public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String public AbstractKeyedStateBackend createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer keySerializer, int numberOfKeyGroups, - KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception { + KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) { throw new UnsupportedOperationException(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 3d01fdd47f26c..382605189797c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -807,33 +807,39 @@ public static final class MockStateBackend implements StateBackendFactory() { - @Override - public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { - return Mockito.mock(OperatorStateBackend.class); - } - }); - - Mockito.when(stateBackendMock.createKeyedStateBackend( - Mockito.any(Environment.class), - Mockito.any(JobID.class), - Mockito.any(String.class), - Mockito.any(TypeSerializer.class), - Mockito.any(int.class), - Mockito.any(KeyGroupRange.class), - Mockito.any(TaskKvStateRegistry.class))) - .thenAnswer(new Answer() { - @Override - public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { - return Mockito.mock(AbstractKeyedStateBackend.class); - } - }); + try { + Mockito.when(stateBackendMock.createOperatorStateBackend( + Mockito.any(Environment.class), + Mockito.any(String.class))) + .thenAnswer(new Answer() { + @Override + public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + return Mockito.mock(OperatorStateBackend.class); + } + }); + + Mockito.when(stateBackendMock.createKeyedStateBackend( + Mockito.any(Environment.class), + Mockito.any(JobID.class), + Mockito.any(String.class), + Mockito.any(TypeSerializer.class), + Mockito.any(int.class), + Mockito.any(KeyGroupRange.class), + Mockito.any(TaskKvStateRegistry.class))) + .thenAnswer(new Answer() { + @Override + public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + return Mockito.mock(AbstractKeyedStateBackend.class); + } + }); + } + catch (Exception e) { + // this is needed, because the signatures of the mocked methods throw 'Exception' + throw new RuntimeException(e); + } return stateBackendMock; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index 79665dda6e912..4677242bb67c8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -109,7 +109,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry) throws IOException { throw new SuccessException(); } } From 537be203dab0614383645e859bbafb6ebfeb3161 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 27 Feb 2017 16:12:37 +0100 Subject: [PATCH 3/4] [FLINK-5928] [checkpoints] Add CheckpointCoordinatorExternalizedCheckpointsTest Problem: there were only unit tests for the checkpoint instances available that don't test the behaviour of the checkpoint coordinator with respect to externalized checkpoints. --- ...oordinatorExternalizedCheckpointsTest.java | 197 ++++++++++++++++++ .../checkpoint/CheckpointCoordinatorTest.java | 2 +- 2 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java new file mode 100644 index 0000000000000..9f94f2fc795be --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * CheckpointCoordinator tests for externalized checkpoints. + * + *

This is separate from {@link CheckpointCoordinatorTest}, because that + * test is already huge and covers many different configurations. + */ +public class CheckpointCoordinatorExternalizedCheckpointsTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + /** + * Triggers multiple externalized checkpoints and verifies that the metadata + * files have been created. + */ + @Test + public void testTriggerAndConfirmSimpleExternalizedCheckpoint() + throws Exception { + final JobID jid = new JobID(); + + final ExternalizedCheckpointSettings externalizedCheckpointSettings = + ExternalizedCheckpointSettings.externalizeCheckpoints(false); + + final File checkpointDir = tmp.newFolder(); + + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + final ExecutionAttemptID attemptID2 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1); + ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2); + + Map jobVertices = new HashMap<>(); + jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex()); + jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex()); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + externalizedCheckpointSettings, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + checkpointDir.getAbsolutePath(), + Executors.directExecutor()); + + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + // --------------- + // trigger checkpoint 1 + // --------------- + + { + final long timestamp1 = System.currentTimeMillis(); + + coord.triggerCheckpoint(timestamp1, false); + + long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next() + .getKey(); + + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1)); + + CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint(); + + verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1); + verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2); + } + + // --------------- + // trigger checkpoint 2 + // --------------- + + { + final long timestamp2 = System.currentTimeMillis() + 7; + coord.triggerCheckpoint(timestamp2, false); + + long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2)); + + CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint(); + verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2); + verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2); + } + + // --------------- + // trigger checkpoint 3 + // --------------- + + { + final long timestamp3 = System.currentTimeMillis() + 146; + coord.triggerCheckpoint(timestamp3, false); + + long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3)); + + CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint(); + verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3); + verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2); + } + + coord.shutdown(JobStatus.FINISHED); + } + + /** + * Verifies an externalized completed checkpoint instance. + * + *

The provided JobID, checkpoint ID, timestamp need to match. Also, the + * external pointer and external metadata need to be notNull and exist (currently + * assuming that they are file system based). + * + * @param checkpoint Completed checkpoint to check. + * @param jid JobID of the job the checkpoint belongs to. + * @param checkpointId Checkpoint ID of the checkpoint to check. + * @param timestamp Timestamp of the checkpoint to check. + */ + private static void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) { + assertEquals(jid, checkpoint.getJobId()); + assertEquals(checkpointId, checkpoint.getCheckpointID()); + assertEquals(timestamp, checkpoint.getTimestamp()); + assertNotNull(checkpoint.getExternalPointer()); + assertNotNull(checkpoint.getExternalizedMetadata()); + FileStateHandle fsHandle = (FileStateHandle) checkpoint.getExternalizedMetadata(); + assertTrue(new File(fsHandle.getFilePath().getPath()).exists()); + } + + private static void verifyExternalizedCheckpointRestore( + CompletedCheckpoint checkpoint, + Map jobVertices, + ExecutionVertex... vertices) throws IOException { + + CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint( + checkpoint.getJobId(), + jobVertices, + checkpoint.getExternalPointer(), + Thread.currentThread().getContextClassLoader(), + false); + + for (ExecutionVertex vertex : vertices) { + assertEquals(checkpoint.getTaskState(vertex.getJobvertexId()), loaded.getTaskState(vertex.getJobvertexId())); + } + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index d8e46faddd916..16913705efe0c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2586,7 +2586,7 @@ private static ChainedStateHandle generateChainedPartitiona return ChainedStateHandle.wrapSingleHandle(operatorStateHandle); } - private static ExecutionJobVertex mockExecutionJobVertex( + static ExecutionJobVertex mockExecutionJobVertex( JobVertexID jobVertexID, int parallelism, int maxParallelism) { From 88e4700cce630f8ae869abff22acfd46ab999aa0 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 27 Feb 2017 16:58:14 +0100 Subject: [PATCH 4/4] [FLINK-5928] [checkpoints] Use custom metadata file for externalized checkpoints --- .../runtime/checkpoint/PendingCheckpoint.java | 25 ++++++++-- .../checkpoint/savepoint/SavepointStore.java | 47 ++++++++++++++++--- .../savepoint/SavepointStoreTest.java | 23 ++++++++- 3 files changed, 83 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index a321c824f0588..b9e952ccda97e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -30,7 +30,6 @@ import java.util.concurrent.Executor; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; - import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; @@ -46,7 +45,6 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -220,10 +218,27 @@ public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException { // but the checkpoints think more generic. we need to work with file handles // here until the savepoint serializer accepts a generic stream factory - final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); - final String externalPointer = metadataHandle.getFilePath().getParent().toString(); + // We have this branch here, because savepoints and externalized checkpoints + // currently behave differently. + // Savepoints: + // - Metadata file in unique directory + // - External pointer can be the directory + // Externalized checkpoints: + // - Multiple metadata files per directory possible (need to be unique) + // - External pointer needs to be the file itself + // + // This should be unified as part of the JobManager metadata stream factories. + if (props.isSavepoint()) { + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().getParent().toString(); + + return finalizeInternal(metadataHandle, externalPointer); + } else { + final FileStateHandle metadataHandle = SavepointStore.storeExternalizedCheckpointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().toString(); - return finalizeInternal(metadataHandle, externalPointer); + return finalizeInternal(metadataHandle, externalPointer); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 5c8ac6bce9853..7beb1b8dd6ace 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -60,7 +60,13 @@ public class SavepointStore { /** Magic number for sanity checks against stored savepoints. */ public static final int MAGIC_NUMBER = 0x4960672d; - private static final String META_DATA_FILE = "_metadata "; + private static final String SAVEPOINT_METADATA_FILE = "_metadata"; + + /** + * Metadata file for an externalized checkpoint, random suffix added + * during store, because the parent directory is not unique. + */ + static final String EXTERNALIZED_CHECKPOINT_METADATA_FILE = "checkpoint_metadata-"; /** * Creates a savepoint directory. @@ -122,7 +128,8 @@ public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) */ public static String storeSavepoint(String directory, T savepoint) throws IOException { // write and create the file handle - FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint); + FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, + SAVEPOINT_METADATA_FILE, savepoint); // we return the savepoint directory path here! // The directory path also works to resume from and is more elegant than the direct @@ -135,19 +142,47 @@ public static String storeSavepoint(String directory, T sa * * @param directory Target directory to store savepoint in * @param savepoint Savepoint to be stored - * + * + * @return State handle to the checkpoint metadata + * @throws IOException Failures during store are forwarded + */ + public static FileStateHandle storeSavepointToHandle(String directory, T savepoint) throws IOException { + return storeSavepointToHandle(directory, SAVEPOINT_METADATA_FILE, savepoint); + } + + /** + * Stores the externalized checkpoint metadata file to a state handle. + * + * @param directory Target directory to store savepoint in + * @param savepoint Savepoint to be stored + * + * @return State handle to the checkpoint metadata + * @throws IOException Failures during store are forwarded + */ + public static FileStateHandle storeExternalizedCheckpointToHandle(String directory, T savepoint) throws IOException { + String fileName = FileUtils.getRandomFilename(EXTERNALIZED_CHECKPOINT_METADATA_FILE); + return storeSavepointToHandle(directory, fileName, savepoint); + } + + /** + * Stores the savepoint metadata file to a state handle. + * + * @param directory Target directory to store savepoint in + * @param savepoint Savepoint to be stored + * * @return State handle to the checkpoint metadata * @throws IOException Failures during store are forwarded */ - public static FileStateHandle storeSavepointToHandle( + static FileStateHandle storeSavepointToHandle( String directory, + String filename, T savepoint) throws IOException { checkNotNull(directory, "Target directory"); checkNotNull(savepoint, "Savepoint"); final Path basePath = new Path(directory); - final Path metadataFilePath = new Path(basePath, META_DATA_FILE); + final Path metadataFilePath = new Path(basePath, filename); final FileSystem fs = FileSystem.get(basePath.toUri()); @@ -219,7 +254,7 @@ public static Tuple2 loadSavepointWithHandle( // If this is a directory, we need to find the meta data file if (status.isDir()) { - Path candidatePath = new Path(path, META_DATA_FILE); + Path candidatePath = new Path(path, SAVEPOINT_METADATA_FILE); if (fs.exists(candidatePath)) { path = candidatePath; LOG.info("Using savepoint file in {}", path); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java index dc19e474c255c..1eb805599967c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.checkpoint.savepoint; import java.io.File; -import java.util.Arrays; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -189,6 +189,27 @@ public void testCleanupOnStoreFailure() throws Exception { assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length); } + /** + * Tests that multiple externalized checkpoints can be stored to the same + * directory. + */ + @Test + public void testStoreExternalizedCheckpointsToSameDirectory() throws Exception { + String root = tmp.newFolder().getAbsolutePath(); + FileSystem fs = FileSystem.get(new Path(root).toUri()); + + // Store + SavepointV1 savepoint = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24)); + + FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint); + fs.exists(store1.getFilePath()); + assertTrue(store1.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE)); + + FileStateHandle store2 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint); + fs.exists(store2.getFilePath()); + assertTrue(store2.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE)); + } + private static class NewSavepointSerializer implements SavepointSerializer { private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();