Skip to content

Commit

Permalink
[FLINK-5063] [checkpointing] Discard state handles of declined or exp…
Browse files Browse the repository at this point in the history
…ired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint message which belongs
to the job maintained by the checkpoint coordinator, it should either record the state handles
for later processing or discard to free the resources. The latter case can happen if a
checkpoint has been expired and late acknowledge checkpoint messages arrive. Furthremore, it
can happen if a Task sent a decline checkpoint message while other Tasks where still drawing
a checkpoint. This PR changes the behaviour such that state handles belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the PendingCheckpoint.

This closes #2812
  • Loading branch information
tillrohrmann authored and StephanEwen committed Nov 16, 2016
1 parent bf06a1c commit 72b295b
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 35 deletions.
Expand Up @@ -638,35 +638,62 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
if (checkpoint != null && !checkpoint.isDiscarded()) { if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true; isPendingCheckpoint = true;


if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) { switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
if (checkpoint.isFullyAcknowledged()) { case SUCCESS:
completed = checkpoint.finalizeCheckpoint(); if (checkpoint.isFullyAcknowledged()) {
completed = checkpoint.finalizeCheckpoint();


completedCheckpointStore.addCheckpoint(completed); completedCheckpointStore.addCheckpoint(completed);


LOG.info("Completed checkpoint " + checkpointId + " (in " + LOG.info("Completed checkpoint " + checkpointId + " (in " +
completed.getDuration() + " ms)"); completed.getDuration() + " ms)");


if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) { for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}"); builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
} }


LOG.debug(builder.toString()); LOG.debug(builder.toString());
} }


pendingCheckpoints.remove(checkpointId); pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId); rememberRecentCheckpointId(checkpointId);


dropSubsumedCheckpoints(completed.getCheckpointID()); dropSubsumedCheckpoints(completed.getCheckpointID());


triggerQueuedRequests(); triggerQueuedRequests();
} }
} else { break;
// checkpoint did not accept message case DUPLICATE:
LOG.error("Received duplicate or invalid acknowledge message for checkpoint {} , task {}", LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
checkpointId, message.getTaskExecutionId()); message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
break;
case UNKNOWN:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the task's execution attempt id was unknown. Discarding " +
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob());

try {
message.getSubtaskState().discardState();
} catch (Exception e) {
LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
}
break;
case DISCARDED:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the pending checkpoint had been discarded. Discarding the " +
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

try {
message.getSubtaskState().discardState();
} catch (Exception e) {
LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
}
} }
} }
else if (checkpoint != null) { else if (checkpoint != null) {
Expand All @@ -678,11 +705,20 @@ else if (checkpoint != null) {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed) // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) { if (recentPendingCheckpoints.contains(checkpointId)) {
isPendingCheckpoint = true; isPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId); LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
} }
else { else {
LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
isPendingCheckpoint = false; isPendingCheckpoint = false;
} }

try {
// try to discard the state so that we don't have lingering state lying around
message.getSubtaskState().discardState();
} catch (Exception e) {
LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
}
} }
} }


Expand Down
Expand Up @@ -35,7 +35,9 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;


import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -64,6 +66,9 @@ public class PendingCheckpoint {


private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks; private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;


/** Set of acknowledged tasks */
private final Set<ExecutionAttemptID> acknowledgedTasks;

/** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */ /** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */
private final boolean isPeriodic; private final boolean isPeriodic;


Expand Down Expand Up @@ -109,6 +114,8 @@ public PendingCheckpoint(


checkArgument(verticesToConfirm.size() > 0, checkArgument(verticesToConfirm.size() > 0,
"Checkpoint needs at least one vertex that commits the checkpoint"); "Checkpoint needs at least one vertex that commits the checkpoint");

acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -228,34 +235,47 @@ public CompletedCheckpoint finalizeCheckpoint() throws Exception {
} }
} }
} }


public boolean acknowledgeTask( /**
ExecutionAttemptID attemptID, * Acknowledges the task with the given execution attempt id and the given subtask state.
SubtaskState checkpointedSubtaskState) { *
* @param executionAttemptId of the acknowledged task
* @param subtaskState of the acknowledged task
* @return TaskAcknowledgeResult of the operation
*/
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
SubtaskState subtaskState) {


synchronized (lock) { synchronized (lock) {


if (discarded) { if (discarded) {
return false; return TaskAcknowledgeResult.DISCARDED;
} }


final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID); final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);


if (vertex == null) { if (vertex == null) {
return false; if (acknowledgedTasks.contains(executionAttemptId)) {
return TaskAcknowledgeResult.DUPLICATE;
} else {
return TaskAcknowledgeResult.UNKNOWN;
}
} else {
acknowledgedTasks.add(executionAttemptId);
} }


if (null != checkpointedSubtaskState) { if (null != subtaskState) {


JobVertexID jobVertexID = vertex.getJobvertexId(); JobVertexID jobVertexID = vertex.getJobvertexId();
int subtaskIndex = vertex.getParallelSubtaskIndex(); int subtaskIndex = vertex.getParallelSubtaskIndex();
TaskState taskState = taskStates.get(jobVertexID); TaskState taskState = taskStates.get(jobVertexID);


if (null == taskState) { if (null == taskState) {
ChainedStateHandle<StreamStateHandle> nonPartitionedState = ChainedStateHandle<StreamStateHandle> nonPartitionedState =
checkpointedSubtaskState.getLegacyOperatorState(); subtaskState.getLegacyOperatorState();
ChainedStateHandle<OperatorStateHandle> partitioneableState = ChainedStateHandle<OperatorStateHandle> partitioneableState =
checkpointedSubtaskState.getManagedOperatorState(); subtaskState.getManagedOperatorState();
//TODO this should go away when we remove chained state, assigning state to operators directly instead //TODO this should go away when we remove chained state, assigning state to operators directly instead
int chainLength; int chainLength;
if (nonPartitionedState != null) { if (nonPartitionedState != null) {
Expand All @@ -276,17 +296,27 @@ public boolean acknowledgeTask(
} }


long duration = System.currentTimeMillis() - checkpointTimestamp; long duration = System.currentTimeMillis() - checkpointTimestamp;
checkpointedSubtaskState.setDuration(duration); subtaskState.setDuration(duration);


taskState.putState(subtaskIndex, checkpointedSubtaskState); taskState.putState(subtaskIndex, subtaskState);
} }


++numAcknowledgedTasks; ++numAcknowledgedTasks;


return true; return TaskAcknowledgeResult.SUCCESS;
} }
} }


/**
* Result of the {@link PendingCheckpoint#acknowledgedTasks} method.
*/
public enum TaskAcknowledgeResult {
SUCCESS, // successful acknowledge of the task
DUPLICATE, // acknowledge message is a duplicate
UNKNOWN, // unknown task acknowledged
DISCARDED // pending checkpoint has been discarded
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Cancellation // Cancellation
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down Expand Up @@ -350,6 +380,7 @@ private void dispose(boolean releaseState) throws Exception {
} finally { } finally {
taskStates.clear(); taskStates.clear();
notYetAcknowledgedTasks.clear(); notYetAcknowledgedTasks.clear();
acknowledgedTasks.clear();
} }
} }
} }
Expand Down
Expand Up @@ -84,6 +84,7 @@
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -996,6 +997,112 @@ public void handleMessagesForNonExistingCheckpoints() {
} }
} }


/**
* Tests that late acknowledge checkpoint messages are properly cleaned up. Furthermore it tests
* that unknown checkpoint messages for the same job a are cleaned up as well. In contrast
* checkpointing messages from other jobs should not be touched. A late acknowledge
* message is an acknowledge message which arrives after the checkpoint has been declined.
*
* @throws Exception
*/
@Test
public void testStateCleanupForLateOrUnknownMessages() throws Exception {
final JobID jobId = new JobID();

final ExecutionAttemptID triggerAttemptId = new ExecutionAttemptID();
final ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptId);

final ExecutionAttemptID ackAttemptId1 = new ExecutionAttemptID();
final ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptId1);

final ExecutionAttemptID ackAttemptId2 = new ExecutionAttemptID();
final ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptId2);

final long timestamp = 1L;

CheckpointCoordinator coord = new CheckpointCoordinator(
jobId,
20000L,
20000L,
0L,
1,
ExternalizedCheckpointSettings.none(),
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2},
new ExecutionVertex[0],
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
new DisabledCheckpointStatsTracker());

assertTrue(coord.triggerCheckpoint(timestamp, false));

assertEquals(1, coord.getNumberOfPendingCheckpoints());

PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();

long checkpointId = pendingCheckpoint.getCheckpointId();

CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);

SubtaskState triggerSubtaskState = mock(SubtaskState.class);

// acknowledge the first trigger vertex
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));

SubtaskState unknownSubtaskState = mock(SubtaskState.class);

// receive an acknowledge message for an unknown vertex
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState));

// we should discard acknowledge messages from an unknown vertex belonging to our job
verify(unknownSubtaskState, times(1)).discardState();

SubtaskState differentJobSubtaskState = mock(SubtaskState.class);

// receive an acknowledge message from an unknown job
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));

// we should not interfere with different jobs
verify(differentJobSubtaskState, never()).discardState();

// duplicate acknowledge message for the trigger vertex
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));

// duplicate acknowledge messages for a known vertex should not trigger discarding the state
verify(triggerSubtaskState, never()).discardState();

// let the checkpoint fail at the first ack vertex
coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId));

assertTrue(pendingCheckpoint.isDiscarded());

// check that we've cleaned up the already acknowledged state
verify(triggerSubtaskState, times(1)).discardState();

SubtaskState ackSubtaskState = mock(SubtaskState.class);

// late acknowledge message from the second ack vertex
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointMetaData, ackSubtaskState));

// check that we also cleaned up this state
verify(ackSubtaskState, times(1)).discardState();

// receive an acknowledge message from an unknown job
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));

// we should not interfere with different jobs
verify(differentJobSubtaskState, never()).discardState();

SubtaskState unknownSubtaskState2 = mock(SubtaskState.class);

// receive an acknowledge message for an unknown vertex
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState2));

// we should discard acknowledge messages from an unknown vertex belonging to our job
verify(unknownSubtaskState2, times(1)).discardState();
}

@Test @Test
public void testPeriodicTriggering() { public void testPeriodicTriggering() {
try { try {
Expand Down

0 comments on commit 72b295b

Please sign in to comment.