Permalink
Browse files

[FLINK-4410] [runtime] Rework checkpoint stats tracking

  • Loading branch information...
1 parent c1fee3b commit 579bc96446d598a2cfe8237b4ebd62d8c9df3483 @uce uce committed Dec 23, 2016
Showing with 4,679 additions and 405 deletions.
  1. +8 −2 flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
  2. +1 −1 .../flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
  3. +192 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
  4. +34 −9 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
  5. +53 −6 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
  6. +184 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
  7. +386 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
  8. +102 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
  9. +62 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatus.java
  10. +447 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
  11. +18 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
  12. +174 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
  13. +107 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummary.java
  14. +153 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
  15. +130 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStats.java
  16. +73 −24 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
  17. +190 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
  18. +103 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
  19. +2 −35 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
  20. +176 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
  21. +277 −0 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
  22. +4 −6 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
  23. +1 −1 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
  24. +1 −9 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
  25. +3 −4 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
  26. +1 −13 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
  27. +6 −26 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
  28. +11 −19 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
  29. +0 −13 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
  30. +16 −1 flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
  31. +2 −15 flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
  32. +114 −68 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
  33. +27 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
  34. +1 −5 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
  35. +153 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
  36. +196 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
  37. +48 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsStatusTest.java
  38. +327 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
  39. +105 −0 ...untime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
  40. +1 −1 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
  41. +25 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
  42. +2 −2 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
  43. +1 −4 ...me/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
  44. +60 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
  45. +96 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/MinMaxAvgStatsTest.java
  46. +256 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
  47. +65 −5 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
  48. +49 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
  49. +57 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
  50. +93 −0 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
  51. +1 −2 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
  52. +31 −115 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
  53. +3 −2 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
  54. +8 −4 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
  55. +8 −7 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
  56. +1 −2 flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
  57. +18 −3 flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
  58. +16 −1 ...streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -588,7 +588,12 @@
/** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable";
- /** Flag to disable checkpoint stats. */
+ /**
+ * Flag to disable checkpoint stats.
+ *
+ * @deprecated Not possible to disable any longer. Use history size of 0.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";
/** Config parameter defining the number of checkpoints to remember for recent history. */
@@ -1226,7 +1231,8 @@
/** By default, submitting jobs from the web-frontend is allowed. */
public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;
- /** Default flag to disable checkpoint stats. */
+ /** Config key has been deprecated. Therefore, no default value required. */
+ @Deprecated
public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
/** Default number of checkpoints to remember for recent history. */
@@ -74,7 +74,7 @@ public void testJobManagerJMXMetricAccess() throws Exception {
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
- 500, 500, 50, 5, ExternalizedCheckpointSettings.none()));
+ 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true));
flink.waitForActorsToBeAlive();
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for checkpoint statistics.
+ */
+public abstract class AbstractCheckpointStats {
+
+ /** ID of this checkpoint. */
+ final long checkpointId;
+
+ /** Timestamp when the checkpoint was triggered at the coordinator. */
+ final long triggerTimestamp;
+
+ /** {@link TaskStateStats} accessible by their ID. */
+ final Map<JobVertexID, TaskStateStats> taskStats;
+
+ /** Total number of subtasks over all tasks. */
+ final int numberOfSubtasks;
+
+ /** Properties of the checkpoint. */
+ final CheckpointProperties props;
+
+ AbstractCheckpointStats(
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props,
+ int numberOfSubtasks,
+ Map<JobVertexID, TaskStateStats> taskStats) {
+
+ this.checkpointId = checkpointId;
+ this.triggerTimestamp = triggerTimestamp;
+ this.taskStats = checkNotNull(taskStats);
+ checkArgument(taskStats.size() > 0, "Empty task stats");
+ checkArgument(numberOfSubtasks > 0, "Non-positive number of subtasks");
+ this.numberOfSubtasks = numberOfSubtasks;
+ this.props = checkNotNull(props);
+ }
+
+ /**
+ * Returns the status of this checkpoint.
+ *
+ * @return Status of this checkpoint
+ */
+ public abstract CheckpointStatsStatus getStatus();
+
+ /**
+ * Returns the number of acknowledged subtasks.
+ *
+ * @return The number of acknowledged subtasks.
+ */
+ public abstract int getNumberOfAcknowledgedSubtasks();
+
+ /**
+ * Returns the total checkpoint state size over all subtasks.
+ *
+ * @return Total checkpoint state size over all subtasks.
+ */
+ public abstract long getStateSize();
+
+ /**
+ * Returns the total buffered bytes during alignment over all subtasks.
+ *
+ * <p>Can return <code>-1</code> if the runtime did not report this.
+ *
+ * @return Total buffered bytes during alignment over all subtasks.
+ */
+ public abstract long getAlignmentBuffered();
+
+ /**
+ * Returns the latest acknowledged subtask stats or <code>null</code> if
+ * none was acknowledged yet.
+ *
+ * @return Latest acknowledged subtask stats or <code>null</code>
+ */
+ @Nullable
+ public abstract SubtaskStateStats getLatestAcknowledgedSubtaskStats();
+
+ /**
+ * Returns the ID of this checkpoint.
+ *
+ * @return ID of this checkpoint.
+ */
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ /**
+ * Returns the timestamp when the checkpoint was triggered.
+ *
+ * @return Timestamp when the checkpoint was triggered.
+ */
+ public long getTriggerTimestamp() {
+ return triggerTimestamp;
+ }
+
+ /**
+ * Returns the properties of this checkpoint.
+ *
+ * @return Properties of this checkpoint.
+ */
+ public CheckpointProperties getProperties() {
+ return props;
+ }
+
+ /**
+ * Returns the total number of subtasks involved in this checkpoint.
+ *
+ * @return Total number of subtasks involved in this checkpoint.
+ */
+ public int getNumberOfSubtasks() {
+ return numberOfSubtasks;
+ }
+
+ /**
+ * Returns the task state stats for the given job vertex ID or
+ * <code>null</code> if no task with such an ID is available.
+ *
+ * @param jobVertexId Job vertex ID of the task stats to look up.
+ * @return The task state stats instance for the given ID or <code>null</code>.
+ */
+ public TaskStateStats getTaskStateStats(JobVertexID jobVertexId) {
+ return taskStats.get(jobVertexId);
+ }
+
+ /**
+ * Returns all task state stats instances.
+ *
+ * @return All task state stats instances.
+ */
+ public Collection<TaskStateStats> getAllTaskStateStats() {
+ return taskStats.values();
+ }
+
+ /**
+ * Returns the ack timestamp of the latest acknowledged subtask or
+ * <code>-1</code> if none was acknowledged yet.
+ *
+ * @return Ack timestamp of the latest acknowledged subtask or <code>-1</code>.
+ */
+ public long getLatestAckTimestamp() {
+ SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+ if (subtask != null) {
+ return subtask.getAckTimestamp();
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Returns the duration of this checkpoint calculated as the time since
+ * triggering until the latest acknowledged subtask or <code>-1</code> if
+ * no subtask was acknowledged yet.
+ *
+ * @return Duration of this checkpoint or <code>-1</code> if no subtask was acknowledged yet.
+ */
+ public long getEndToEndDuration() {
+ SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
+ if (subtask != null) {
+ return Math.max(0, subtask.getAckTimestamp() - triggerTimestamp);
+ } else {
+ return -1;
+ }
+ }
+
+}
@@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -40,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
@@ -147,8 +147,9 @@
/** Flag marking the coordinator as shut down (not accepting any messages any more) */
private volatile boolean shutdown;
- /** Helper for tracking checkpoint statistics */
- private final CheckpointStatsTracker statsTracker;
+ /** Optional tracker for checkpoint statistics. */
+ @Nullable
+ private CheckpointStatsTracker statsTracker;
/** Default checkpoint properties **/
private final CheckpointProperties checkpointProperties;
@@ -170,7 +171,6 @@ public CheckpointCoordinator(
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
String checkpointDirectory,
- CheckpointStatsTracker statsTracker,
Executor executor) {
// sanity checks
@@ -209,7 +209,6 @@ public CheckpointCoordinator(
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
- this.statsTracker = checkNotNull(statsTracker);
this.timer = new Timer("Checkpoint Timer", true);
@@ -231,6 +230,15 @@ public CheckpointCoordinator(
this.executor = checkNotNull(executor);
}
+ /**
+ * Sets the checkpoint stats tracker.
+ *
+ * @param statsTracker The checkpoint stats tracker.
+ */
+ public void setCheckpointStatsTracker(@Nullable CheckpointStatsTracker statsTracker) {
+ this.statsTracker = statsTracker;
+ }
+
// --------------------------------------------------------------------------------------------
// Clean shutdown
// --------------------------------------------------------------------------------------------
@@ -428,11 +436,19 @@ CheckpointTriggerResult triggerCheckpoint(
checkpointID,
timestamp,
ackTasks,
- isPeriodic,
props,
targetDirectory,
executor);
+ if (statsTracker != null) {
+ PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
+ checkpointID,
+ timestamp,
+ props);
+
+ checkpoint.setStatsCallback(callback);
+ }
+
// schedule the timer that will clean up the expired checkpoints
TimerTask canceller = new TimerTask() {
@Override
@@ -632,7 +648,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws C
if (checkpoint != null && !checkpoint.isDiscarded()) {
- switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
+ switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetaData())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
@@ -769,8 +785,6 @@ public void run() {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
-
- statsTracker.onCompletedCheckpoint(completedCheckpoint);
}
private void rememberRecentCheckpointId(long id) {
@@ -876,6 +890,17 @@ public boolean restoreLatestCheckpointedState(
stateAssignmentOperation.assignStates();
+ if (statsTracker != null) {
+ long restoreTimestamp = System.currentTimeMillis();
+ RestoredCheckpointStats restored = new RestoredCheckpointStats(
+ latest.getCheckpointID(),
+ latest.getProperties(),
+ restoreTimestamp,
+ latest.getExternalPath());
+
+ statsTracker.reportRestoredCheckpoint(restored);
+ }
+
return true;
}
}
Oops, something went wrong.

0 comments on commit 579bc96

Please sign in to comment.