Skip to content

Commit

Permalink
[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST…
Browse files Browse the repository at this point in the history
… endpoint

This commit also makes the CheckpointStatsHistory object serializable by removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.

This closes #4750.
  • Loading branch information
tillrohrmann committed Oct 2, 2017
1 parent b41f5a6 commit ac82bec
Show file tree
Hide file tree
Showing 8 changed files with 1,200 additions and 148 deletions.
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.runtime.checkpoint;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -38,7 +38,7 @@
* the maximum number of elements. At this point, the elements wrap around
* and the least recently added entry is overwritten.
*
* <p>Access happens via an checkpointsIterable over the statistics and a map that
* <p>Access happens via checkpointsHistory over the statistics and a map that
* exposes the checkpoint by their ID. Both of these are only guaranteed
* to reflect the latest state after a call to {@link #createSnapshot()}.
*
Expand All @@ -49,8 +49,8 @@ public class CheckpointStatsHistory implements Serializable {

private static final long serialVersionUID = 7090320677606528415L;

/** Iterable over all available stats. Only updated on {@link #createSnapshot()}. */
private final Iterable<AbstractCheckpointStats> checkpointsIterable;
/** List over all available stats. Only updated on {@link #createSnapshot()}. */
private final List<AbstractCheckpointStats> checkpointsHistory;

/** Map of all available stats keyed by their ID. Only updated on {@link #createSnapshot()}. */
private final Map<Long, AbstractCheckpointStats> checkpointsById;
Expand All @@ -61,7 +61,7 @@ public class CheckpointStatsHistory implements Serializable {
/** Flag indicating whether this the history is read-only. */
private final boolean readOnly;

/** Array of checkpointsArray. Writes go aginst this array. */
/** Array of checkpointsArray. Writes go against this array. */
private transient AbstractCheckpointStats[] checkpointsArray;

/** Next position in {@link #checkpointsArray} to write to. */
Expand Down Expand Up @@ -107,14 +107,14 @@ public class CheckpointStatsHistory implements Serializable {
*
* @param readOnly Flag indicating whether the history is read-only.
* @param maxSize Maximum history size.
* @param checkpointsIterable Checkpoints iterable.
* @param checkpointsHistory Checkpoints iterable.
* @param checkpointsById Checkpoints by ID.
*/
private CheckpointStatsHistory(
boolean readOnly,
int maxSize,
AbstractCheckpointStats[] checkpointArray,
Iterable<AbstractCheckpointStats> checkpointsIterable,
List<AbstractCheckpointStats> checkpointsHistory,
Map<Long, AbstractCheckpointStats> checkpointsById,
CompletedCheckpointStats latestCompletedCheckpoint,
FailedCheckpointStats latestFailedCheckpoint,
Expand All @@ -124,15 +124,15 @@ private CheckpointStatsHistory(
checkArgument(maxSize >= 0, "Negative maximum size");
this.maxSize = maxSize;
this.checkpointsArray = checkpointArray;
this.checkpointsIterable = checkNotNull(checkpointsIterable);
this.checkpointsHistory = checkNotNull(checkpointsHistory);
this.checkpointsById = checkNotNull(checkpointsById);
this.latestCompletedCheckpoint = latestCompletedCheckpoint;
this.latestFailedCheckpoint = latestFailedCheckpoint;
this.latestSavepoint = latestSavepoint;
}

public Iterable<AbstractCheckpointStats> getCheckpoints() {
return checkpointsIterable;
public List<AbstractCheckpointStats> getCheckpoints() {
return checkpointsHistory;
}

public AbstractCheckpointStats getCheckpointById(long checkpointId) {
Expand Down Expand Up @@ -164,18 +164,25 @@ CheckpointStatsHistory createSnapshot() {
throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
}

Iterable<AbstractCheckpointStats> checkpointsIterable;
List<AbstractCheckpointStats> checkpointsHistory;
Map<Long, AbstractCheckpointStats> checkpointsById;

checkpointsById = new HashMap<>(checkpointsArray.length);

if (maxSize == 0) {
checkpointsIterable = Collections.emptyList();
checkpointsHistory = Collections.emptyList();
} else {
// Create snapshot iterator (copies the array)
checkpointsIterable = new CheckpointsStatsHistoryIterable(checkpointsArray, nextPos);
AbstractCheckpointStats[] newCheckpointsArray = new AbstractCheckpointStats[checkpointsArray.length];

System.arraycopy(checkpointsArray, nextPos, newCheckpointsArray, 0, checkpointsArray.length - nextPos);
System.arraycopy(checkpointsArray, 0, newCheckpointsArray, checkpointsArray.length - nextPos, nextPos);

checkpointsHistory = Arrays.asList(newCheckpointsArray);

for (AbstractCheckpointStats checkpoint : checkpointsIterable) {
// reverse the order such that we start with the youngest checkpoint
Collections.reverse(checkpointsHistory);

for (AbstractCheckpointStats checkpoint : checkpointsHistory) {
checkpointsById.put(checkpoint.getCheckpointId(), checkpoint);
}
}
Expand All @@ -196,7 +203,7 @@ CheckpointStatsHistory createSnapshot() {
true,
maxSize,
null,
checkpointsIterable,
checkpointsHistory,
checkpointsById,
latestCompletedCheckpoint,
latestFailedCheckpoint,
Expand Down Expand Up @@ -301,88 +308,4 @@ boolean replacePendingCheckpointById(AbstractCheckpointStats completedOrFailed)

return false;
}

/**
* Iterable over the current checkpoint history.
*
* <p>The iteration order is in reverse insertion order.
*/
private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats>, Serializable {

private static final long serialVersionUID = 726376482426055490L;

/** Copy of the checkpointsArray array at the point when this iterable was created. */
private final AbstractCheckpointStats[] checkpointsArray;

/** The starting position from which to iterate over the array. */
private final int startPos;

/**
* Creates the iterable by creating a copy of the checkpoints array.
*
* @param checkpointsArray Checkpoints to iterate over. This array is copied.
* @param nextPos The next write position for the array
*/
CheckpointsStatsHistoryIterable(AbstractCheckpointStats[] checkpointsArray, int nextPos) {
// Copy the array
this.checkpointsArray = Arrays.copyOf(checkpointsArray, checkpointsArray.length);

// Start from nextPos, because that's were the oldest element is
this.startPos = nextPos == checkpointsArray.length ? checkpointsArray.length - 1 : nextPos - 1;
}

@Override
public Iterator<AbstractCheckpointStats> iterator() {
return new CheckpointsSnapshotIterator();
}

/**
* Iterator over the checkpoints array.
*/
private class CheckpointsSnapshotIterator implements Iterator<AbstractCheckpointStats> {

/** The current position. */
private int currentPos;

/** The remaining number of elements to iterate over. */
private int remaining;

/**
* Creates the iterator.
*/
CheckpointsSnapshotIterator() {
this.currentPos = startPos;
this.remaining = checkpointsArray.length;
}

@Override
public boolean hasNext() {
return remaining > 0;
}

@Override
public AbstractCheckpointStats next() {
if (hasNext()) {
AbstractCheckpointStats stats = checkpointsArray[currentPos--];

// Wrap around if needed
if (currentPos == -1) {
currentPos = checkpointsArray.length - 1;
}

remaining--;

return stats;
} else {
throw new NoSuchElementException();
}
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}

}
Expand Up @@ -96,6 +96,7 @@ public CheckpointStatsHistory getHistory() {
*
* @return Latest restored checkpoint or <code>null</code>.
*/
@Nullable
public RestoredCheckpointStats getLatestRestoredCheckpoint() {
return latestRestoredCheckpoint;
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
Expand All @@ -41,6 +42,7 @@
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
Expand Down Expand Up @@ -158,6 +160,14 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executionGraphCache,
executor);

CheckpointStatisticsHandler checkpointStatisticsHandler = new CheckpointStatisticsHandler(
restAddressFuture,
leaderRetriever,
timeout,
CheckpointStatisticsHeaders.getInstance(),
executionGraphCache,
executor);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand All @@ -180,6 +190,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler));

// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
Expand Down

0 comments on commit ac82bec

Please sign in to comment.