Skip to content

Commit

Permalink
[FLINK-4985] [checkpointing] Report canceled / declined checkpoints t…
Browse files Browse the repository at this point in the history
…o the Checkpoint Coordinator
  • Loading branch information
StephanEwen committed Nov 8, 2016
1 parent 0a79dd5 commit 48a4813
Show file tree
Hide file tree
Showing 31 changed files with 643 additions and 316 deletions.
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
Expand Down Expand Up @@ -278,10 +279,10 @@ public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targe

if (result.isSuccess()) {
return result.getPendingCheckpoint().getCompletionFuture();
} else {
}
else {
Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message());
Future<CompletedCheckpoint> failed = FlinkCompletableFuture.completedExceptionally(cause);
return failed;
return FlinkCompletableFuture.completedExceptionally(cause);
}
}

Expand All @@ -299,6 +300,7 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) throws Exce
return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
}

@VisibleForTesting
CheckpointTriggerResult triggerCheckpoint(
long timestamp,
CheckpointProperties props,
Expand Down Expand Up @@ -397,7 +399,7 @@ CheckpointTriggerResult triggerCheckpoint(

// we lock with a special lock to make sure that trigger requests do not overtake each other.
// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
// may issue blocking operations. Using a different lock than teh coordinator-wide lock,
// may issue blocking operations. Using a different lock than the coordinator-wide lock,
// we avoid blocking the processing of 'acknowledge/decline' messages during that time.
synchronized (triggerLock) {
final long checkpointID;
Expand Down Expand Up @@ -525,81 +527,74 @@ else if (!props.forceCheckpoint()) {
}

/**
* Receives a {@link DeclineCheckpoint} message and returns whether the
* message was associated with a pending checkpoint.
* Receives a {@link DeclineCheckpoint} message for a pending checkpoint.
*
* @param message Checkpoint decline from the task manager
*
* @return Flag indicating whether the declined checkpoint was associated
* with a pending checkpoint.
*/
public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
if (shutdown || message == null) {
return false;
return;
}
if (!job.equals(message.getJob())) {
LOG.error("Received DeclineCheckpoint message for wrong job: {}", message);
return false;
throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
message.getJob() + " while this coordinator handles job " + job);
}

final long checkpointId = message.getCheckpointId();
final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");

PendingCheckpoint checkpoint;

// Flag indicating whether the ack message was for a known pending
// checkpoint.
boolean isPendingCheckpoint;

synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
return;
}

checkpoint = pendingCheckpoints.get(checkpointId);

if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true;

LOG.info("Discarding checkpoint " + checkpointId
+ " because of checkpoint decline from task " + message.getTaskExecutionId());
LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
checkpointId, message.getTaskExecutionId(), reason);

pendingCheckpoints.remove(checkpointId);
checkpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);

boolean haveMoreRecentPending = false;
// we don't have to schedule another "dissolving" checkpoint any more because the
// cancellation barriers take care of breaking downstream alignments
// we only need to make sure that suspended queued requests are resumed

boolean haveMoreRecentPending = false;
for (PendingCheckpoint p : pendingCheckpoints.values()) {
if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
haveMoreRecentPending = true;
break;
}
}
if (!haveMoreRecentPending && !triggerRequestQueued) {
LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic());
} else if (!haveMoreRecentPending) {
LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);

if (!haveMoreRecentPending) {
triggerQueuedRequests();
}
} else if (checkpoint != null) {
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
} else {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
}
else if (LOG.isDebugEnabled()) {
if (recentPendingCheckpoints.contains(checkpointId)) {
isPendingCheckpoint = true;
LOG.info("Received another decline checkpoint message for now expired checkpoint attempt " + checkpointId);
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
checkpointId, reason);
} else {
isPendingCheckpoint = false;
// message is for an unknown checkpoint. might be so old that we don't even remember it any more
LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
checkpointId, reason);
}
}
}

return isPendingCheckpoint;
}

/**
Expand Down Expand Up @@ -643,9 +638,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true;

if (checkpoint.acknowledgeTask(
message.getTaskExecutionId(),
message.getSubtaskState())) {
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
if (checkpoint.isFullyAcknowledged()) {
completed = checkpoint.finalizeCheckpoint();

Expand All @@ -672,8 +665,8 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
}
} else {
// checkpoint did not accept message
LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId
+ " , task " + message.getTaskExecutionId());
LOG.error("Received duplicate or invalid acknowledge message for checkpoint {} , task {}",
checkpointId, message.getTaskExecutionId());
}
}
else if (checkpoint != null) {
Expand Down
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.decline;

/**
* Base class of all exceptions that indicate a declined checkpoint.
*/
public abstract class CheckpointDeclineException extends Exception {

private static final long serialVersionUID = 1L;

public CheckpointDeclineException(String message) {
super(message);
}

public CheckpointDeclineException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.decline;

/**
* Exception indicating that a checkpoint was declined because a cancellation
* barrier was received.
*/
public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException {

private static final long serialVersionUID = 1L;

public CheckpointDeclineOnCancellationBarrierException() {
super("Task received cancellation from one of its inputs");
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.decline;

/**
* Exception indicating that a checkpoint was declined because a newer checkpoint
* barrier was received on an input before the pending checkpoint's barrier.
*/
public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException {

private static final long serialVersionUID = 1L;

public CheckpointDeclineSubsumedException(long newCheckpointId) {
super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received.");
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.decline;

/**
* Exception indicating that a checkpoint was declined because a task does not support
* checkpointing.
*/
public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException {

private static final long serialVersionUID = 1L;

public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
super("Task '" + taskName + "'does not support checkpointing");
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.decline;

/**
* Exception indicating that a checkpoint was declined because a task was not
* ready to perform a checkpoint.
*/
public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException {

private static final long serialVersionUID = 1L;

public CheckpointDeclineTaskNotReadyException(String taskName) {
super("Task " + taskName + " was not running");
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.decline;

/**
* Exception indicating that a checkpoint was declined because one of the input
* stream reached its end before the alignment was complete.
*/
public final class InputEndOfStreamException extends CheckpointDeclineException {

private static final long serialVersionUID = 1L;

public InputEndOfStreamException() {
super("Checkpoint was declined because one input stream is finished");
}
}
Expand Up @@ -174,9 +174,16 @@ public interface Environment {
* @param checkpointMetaData the meta data for this checkpoint
* @param subtaskState All state handles for the checkpointed state
*/
void acknowledgeCheckpoint(
CheckpointMetaData checkpointMetaData,
SubtaskState subtaskState);
void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState);

/**
* Declines a checkpoint. This tells the checkpoint coordinator that this task will
* not be able to successfully complete a certain checkpoint.
*
* @param checkpointId The ID of the declined checkpoint.
* @param cause An optional reason why the checkpoint was declined.
*/
void declineCheckpoint(long checkpointId, Throwable cause);

/**
* Marks task execution failed for an external reason (a reason other than the task code itself
Expand Down

0 comments on commit 48a4813

Please sign in to comment.