Skip to content

Commit

Permalink
[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tr…
Browse files Browse the repository at this point in the history
…acker

This closes #3215.
  • Loading branch information
uce committed Jan 30, 2017
1 parent 126fb17 commit dcfa3fb
Show file tree
Hide file tree
Showing 30 changed files with 490 additions and 125 deletions.
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
Expand All @@ -42,10 +41,13 @@ public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
@Override
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();

JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
JobSnapshottingSettings settings = graph.getJobSnapshottingSettings();

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
JobSnapshottingSettings settings = tracker.getSnapshottingSettings();
if (settings == null) {
return "{}";
}

gen.writeStartObject();
{
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
Expand Down Expand Up @@ -54,8 +53,10 @@ public String handleRequest(AccessExecutionGraph graph, Map<String, String> para
return "{}";
}

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
return "{}";
}

AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);

Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
Expand Down Expand Up @@ -72,8 +71,10 @@ public String handleRequest(AccessExecutionGraph graph, Map<String, String> para
return "{}";
}

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
return "{}";
}

AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);

Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
Expand Down Expand Up @@ -54,8 +53,10 @@ public String handleRequest(AccessExecutionGraph graph, Map<String, String> para
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
return "{}";
}

gen.writeStartObject();

Expand Down
Expand Up @@ -20,7 +20,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
Expand Down Expand Up @@ -60,9 +59,7 @@ public void testSimpleConfig() throws Exception {
true);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.getSnapshottingSettings()).thenReturn(settings);
when(graph.getJobSnapshottingSettings()).thenReturn(settings);

CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down Expand Up @@ -98,9 +95,7 @@ public void testAtLeastOnce() throws Exception {
false); // at least once

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.getSnapshottingSettings()).thenReturn(settings);
when(graph.getJobSnapshottingSettings()).thenReturn(settings);

CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down Expand Up @@ -130,9 +125,7 @@ public void testEnabledExternalizedCheckpointSettings() throws Exception {
false); // at least once

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.getSnapshottingSettings()).thenReturn(settings);
when(graph.getJobSnapshottingSettings()).thenReturn(settings);

CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
Expand Down Expand Up @@ -89,9 +88,7 @@ public void testCheckpointNotFound() throws Exception {
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -238,16 +235,14 @@ public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {

// ------------------------------------------------------------------------

static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand All @@ -258,7 +253,7 @@ static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Except
return mapper.readTree(json);
}

static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
long duration = ThreadLocalRandom.current().nextInt(128);

JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
Expand Down Expand Up @@ -179,9 +178,7 @@ public void testCheckpointStatsRequest() throws Exception {
when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
Expand Down Expand Up @@ -129,9 +128,7 @@ public void testCheckpointNotFound() throws Exception {
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -186,9 +183,7 @@ public void testJobVertexNotFound() throws Exception {
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand All @@ -209,9 +204,7 @@ private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throw
when(snapshot.getHistory()).thenReturn(history);

AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
when(tracker.createSnapshot()).thenReturn(snapshot);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);

CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;

Expand All @@ -30,7 +31,9 @@
/**
* Base class for checkpoint statistics.
*/
public abstract class AbstractCheckpointStats {
public abstract class AbstractCheckpointStats implements Serializable {

private static final long serialVersionUID = 1041218202028265151L;

/** ID of this checkpoint. */
final long checkpointId;
Expand Down
Expand Up @@ -307,7 +307,9 @@ boolean replacePendingCheckpointById(AbstractCheckpointStats completedOrFailed)
*
* <p>The iteration order is in reverse insertion order.
*/
private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats> {
private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats>, Serializable {

private static final long serialVersionUID = 726376482426055490L;

/** Copy of the checkpointsArray array at the point when this iterable was created. */
private final AbstractCheckpointStats[] checkpointsArray;
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -52,9 +51,7 @@
* <p>The statistics are accessed via {@link #createSnapshot()} and exposed via
* both the web frontend and the {@link Metric} system.
*/
public class CheckpointStatsTracker implements Serializable {

private static final long serialVersionUID = 1694085244807339288L;
public class CheckpointStatsTracker {

/**
* Lock used to update stats and creating snapshots. Updates always happen
Expand All @@ -67,9 +64,6 @@ public class CheckpointStatsTracker implements Serializable {
*/
private final ReentrantLock statsReadWriteLock = new ReentrantLock();

/** The job vertices taking part in the checkpoints. */
private final List<ExecutionJobVertex> jobVertices;

/** Total number of subtasks to checkpoint. */
private final int totalSubtaskCount;

Expand All @@ -85,6 +79,9 @@ public class CheckpointStatsTracker implements Serializable {
/** History of checkpoints. */
private final CheckpointStatsHistory history;

/** The job vertices taking part in the checkpoints. */
private final transient List<ExecutionJobVertex> jobVertices;

/** The latest restored checkpoint. */
@Nullable
private RestoredCheckpointStats latestRestoredCheckpoint;
Expand Down Expand Up @@ -217,6 +214,11 @@ PendingCheckpointStats reportPendingCheckpoint(
return pending;
}

/**
* Callback when a checkpoint is restored.
*
* @param restored The restored checkpoint stats.
*/
void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
checkNotNull(restored, "Restored checkpoint");

Expand Down
Expand Up @@ -35,8 +35,7 @@
*/
public class CompletedCheckpointStats extends AbstractCheckpointStats {

/** Callback for the {@link CompletedCheckpoint} instance to notify about discard. */
private final DiscardCallback discardCallback;
private static final long serialVersionUID = 138833868551861343L;

/** Total checkpoint state size over all subtasks. */
private final long stateSize;
Expand Down Expand Up @@ -69,16 +68,16 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
* @param externalPath Optional external path if persisted externally.
*/
CompletedCheckpointStats(
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
int totalSubtaskCount,
Map<JobVertexID, TaskStateStats> taskStats,
int numAcknowledgedSubtasks,
long stateSize,
long alignmentBuffered,
SubtaskStateStats latestAcknowledgedSubtask,
@Nullable String externalPath) {
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
int totalSubtaskCount,
Map<JobVertexID, TaskStateStats> taskStats,
int numAcknowledgedSubtasks,
long stateSize,
long alignmentBuffered,
SubtaskStateStats latestAcknowledgedSubtask,
@Nullable String externalPath) {

super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, "Did not acknowledge all subtasks.");
Expand All @@ -87,7 +86,6 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
this.alignmentBuffered = alignmentBuffered;
this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask);
this.externalPath = externalPath;
this.discardCallback = new DiscardCallback();
}

@Override
Expand Down Expand Up @@ -145,7 +143,7 @@ public boolean isDiscarded() {
* @return Callback for the {@link CompletedCheckpoint}.
*/
DiscardCallback getDiscardCallback() {
return discardCallback;
return new DiscardCallback();
}

/**
Expand Down

0 comments on commit dcfa3fb

Please sign in to comment.