Skip to content

Commit

Permalink
[FLINK-3051] [streaming] Add mechanisms to control the maximum number…
Browse files Browse the repository at this point in the history
… of concurrent checkpoints

This closes #1408
  • Loading branch information
StephanEwen committed Nov 26, 2015
1 parent 4097666 commit 55fd5f3
Show file tree
Hide file tree
Showing 15 changed files with 990 additions and 322 deletions.

Large diffs are not rendered by default.

Expand Up @@ -32,19 +32,15 @@
public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {

private final CheckpointCoordinator coordinator;
private final long interval;
private final UUID leaderSessionID;

public CheckpointCoordinatorDeActivator(
CheckpointCoordinator coordinator,
long interval,
UUID leaderSessionID) {

LOG.info("Create CheckpointCoordinatorDeActivator");

this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");

this.interval = interval;
this.leaderSessionID = leaderSessionID;
}

Expand All @@ -55,11 +51,10 @@ public void handleMessage(Object message) {

if (status == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startPeriodicCheckpointScheduler(interval);
}
else {
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopPeriodicCheckpointScheduler();
coordinator.stopCheckpointScheduler();
}
}

Expand Down
Expand Up @@ -117,7 +117,7 @@ public CompletedCheckpoint toCompletedCheckpoint() {
if (notYetAcknowledgedTasks.isEmpty()) {
CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId,
checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
discard(null, false);
dispose(null, false);

return completed;
}
Expand Down Expand Up @@ -150,11 +150,15 @@ public boolean acknowledgeTask(ExecutionAttemptID attemptID, SerializedValue<Sta
/**
* Discards the pending checkpoint, releasing all held resources.
*/
public void discard(ClassLoader userClassLoader, boolean discardStateHandle) {
public void discard(ClassLoader userClassLoader) {
dispose(userClassLoader, true);
}

private void dispose(ClassLoader userClassLoader, boolean releaseState) {
synchronized (lock) {
discarded = true;
numAcknowledgedTasks = -1;
if (discardStateHandle) {
if (releaseState) {
for (StateForTask state : collectedStates) {
state.discard(userClassLoader);
}
Expand Down
Expand Up @@ -182,9 +182,6 @@ public class ExecutionGraph implements Serializable {
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;

/** Flag that indicate whether the executed dataflow should be periodically snapshotted */
private boolean snapshotCheckpointsEnabled;

/** Flag to indicate whether the Graph has been archived */
private boolean isArchived = false;

Expand Down Expand Up @@ -341,9 +338,12 @@ public ScheduleMode getScheduleMode() {
public boolean isArchived() {
return isArchived;
}

public void enableSnapshotCheckpointing(
long interval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
Expand All @@ -368,11 +368,13 @@ public void enableSnapshotCheckpointing(
// disable to make sure existing checkpoint coordinators are cleared
disableSnaphotCheckpointing();

// create the coordinator that triggers and commits checkpoints and holds the state
snapshotCheckpointsEnabled = true;
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
jobID,
interval,
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
Expand All @@ -384,10 +386,7 @@ public void enableSnapshotCheckpointing(
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener(
checkpointCoordinator.createJobStatusListener(
actorSystem,
interval,
leaderSessionID));
checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
}

/**
Expand All @@ -401,16 +400,11 @@ public void disableSnaphotCheckpointing() throws Exception {
throw new IllegalStateException("Job must be in CREATED state");
}

snapshotCheckpointsEnabled = false;
if (checkpointCoordinator != null) {
checkpointCoordinator.shutdown();
checkpointCoordinator = null;
}
}

public boolean isSnapshotCheckpointsEnabled() {
return snapshotCheckpointsEnabled;
}

public CheckpointCoordinator getCheckpointCoordinator() {
return checkpointCoordinator;
Expand Down
Expand Up @@ -22,17 +22,17 @@

import java.util.List;

import static java.util.Objects.requireNonNull;

/**
* The JobSnapshottingSettings are attached to a JobGraph and describe the settings
* for the asynchronous snapshotting of the JobGraph, such as interval, and which vertices
* The JobCheckpointingSettings are attached to a JobGraph and describe the settings
* for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices
* need to participate.
*/
public class JobSnapshottingSettings implements java.io.Serializable{

private static final long serialVersionUID = -2593319571078198180L;

/** The default time in which pending checkpoints need to be acknowledged before timing out */
public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 10 minutes

private final List<JobVertexID> verticesToTrigger;

Expand All @@ -43,26 +43,32 @@ public class JobSnapshottingSettings implements java.io.Serializable{
private final long checkpointInterval;

private final long checkpointTimeout;


public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
List<JobVertexID> verticesToAcknowledge,
List<JobVertexID> verticesToConfirm,
long checkpointInterval)
{
this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT);
}

private final long minPauseBetweenCheckpoints;

private final int maxConcurrentCheckpoints;


public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
List<JobVertexID> verticesToAcknowledge,
List<JobVertexID> verticesToConfirm,
long checkpointInterval, long checkpointTimeout)
long checkpointInterval, long checkpointTimeout,
long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints)
{
this.verticesToTrigger = verticesToTrigger;
this.verticesToAcknowledge = verticesToAcknowledge;
this.verticesToConfirm = verticesToConfirm;
// sanity checks
if (checkpointInterval < 1 || checkpointTimeout < 1 ||
minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1)
{
throw new IllegalArgumentException();
}

this.verticesToTrigger = requireNonNull(verticesToTrigger);
this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge);
this.verticesToConfirm = requireNonNull(verticesToConfirm);
this.checkpointInterval = checkpointInterval;
this.checkpointTimeout = checkpointTimeout;
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -87,11 +93,22 @@ public long getCheckpointTimeout() {
return checkpointTimeout;
}

public long getMinPauseBetweenCheckpoints() {
return minPauseBetweenCheckpoints;
}

public int getMaxConcurrentCheckpoints() {
return maxConcurrentCheckpoints;
}

// --------------------------------------------------------------------------------------------

@Override
public String toString() {
return String.format("SnapshotSettings: interval=%d, timeout=%d, trigger=%s, ack=%s, commit=%s",
checkpointInterval, checkpointTimeout, verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +
"maxConcurrent=%d, trigger=%s, ack=%s, commit=%s",
checkpointInterval, checkpointTimeout,
minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
}
}
Expand Up @@ -913,6 +913,8 @@ class JobManager(
executionGraph.enableSnapshotCheckpointing(
snapshotSettings.getCheckpointInterval,
snapshotSettings.getCheckpointTimeout,
snapshotSettings.getMinPauseBetweenCheckpoints,
snapshotSettings.getMaxConcurrentCheckpoints,
triggerVertices,
ackVertices,
confirmVertices,
Expand Down

0 comments on commit 55fd5f3

Please sign in to comment.