Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ function check_logs_for_exceptions {
| grep -v "java.io.InvalidClassException: org.apache.flink.formats.avro.typeutils.AvroSerializer" \
| grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
| grep -v "java.lang.Exception: Artificial failure" \
| grep -v "org.apache.flink.runtime.checkpoint.decline" \
| grep -v "org.apache.flink.runtime.checkpoint.CheckpointException" \
| grep -v "org.elasticsearch.ElasticsearchException" \
| grep -v "Elasticsearch exception" \
| grep -ic "exception" || true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public void testJobManagerJMXMetricAccess() throws Exception {
5,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false),
false,
0),
null));

ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
Expand All @@ -33,6 +32,7 @@
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
Expand Down Expand Up @@ -185,15 +185,13 @@ public class CheckpointCoordinator {

private boolean isPreferCheckpointForRecovery;

private final CheckpointFailureManager failureManager;

// --------------------------------------------------------------------------------------------

public CheckpointCoordinator(
JobID job,
long baseInterval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
CheckpointRetentionPolicy retentionPolicy,
CheckpointCoordinatorConfiguration chkConfig,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
Expand All @@ -202,31 +200,29 @@ public CheckpointCoordinator(
StateBackend checkpointStateBackend,
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory,
boolean isPreferCheckpointForRecovery) {
CheckpointFailureManager failureManager) {

// sanity checks
checkNotNull(checkpointStateBackend);
checkArgument(baseInterval > 0, "Checkpoint base interval must be larger than zero");
checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");

// max "in between duration" can be one year - this is to prevent numeric overflows
long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
}

// it does not make sense to schedule checkpoints more often then the desired
// time between checkpoints
long baseInterval = chkConfig.getCheckpointInterval();
if (baseInterval < minPauseBetweenCheckpoints) {
baseInterval = minPauseBetweenCheckpoints;
}

this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = checkpointTimeout;
this.checkpointTimeout = chkConfig.getCheckpointTimeout();
this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
this.maxConcurrentCheckpointAttempts = chkConfig.getMaxConcurrentCheckpoints();
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
Expand All @@ -236,7 +232,8 @@ public CheckpointCoordinator(
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery;
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);

this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
Expand All @@ -249,7 +246,7 @@ public CheckpointCoordinator(
this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

this.checkpointProperties = CheckpointProperties.forCheckpoint(retentionPolicy);
this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());

try {
this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
Expand Down Expand Up @@ -342,7 +339,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {

// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
failPendingCheckpoint(pending, CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
}
pendingCheckpoints.clear();

Expand Down Expand Up @@ -439,6 +436,10 @@ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false);
return true;
} catch (CheckpointException e) {
long latestGeneratedCheckpointId = getCheckpointIdCounter().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we actually need to get the latest id again from the counter and cannot just remember it in a field when we do checkpointID = checkpointIdCounter.getAndIncrement(); in triggerCheckpoint(...) as part of and checkpointIdCounter.setCount(nextCheckpointId); in restore(...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to get the latest generated checkpoint id is the simplest and most direct way. In this way, we do not consider any potential problems:

  • The concurrent problem, although at present, there is no risk to introduce a field to store, but based on Checkpoint IDCounter, there is no need to worry about this problem at all.
  • Introducing a global field will bring potential misuse risks and hidden dangers for subsequent use in other logic in the future.

// here we can not get the failed pending checkpoint's id,
// so we pass the negative latest generated checkpoint id as a special flag
failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId);
return false;
}
}
Expand All @@ -459,7 +460,7 @@ public PendingCheckpoint triggerCheckpoint(
synchronized (lock) {
// abort if the coordinator has been shutdown in the meantime
if (shutdown) {
throw new CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
}

// Don't allow periodic checkpoint if scheduling has been disabled
Expand Down Expand Up @@ -599,7 +600,7 @@ public PendingCheckpoint triggerCheckpoint(
if (!checkpoint.isDiscarded()) {
LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
pendingCheckpoints.remove(checkpointID);
rememberRecentCheckpointId(checkpointID);

Expand All @@ -614,7 +615,7 @@ public PendingCheckpoint triggerCheckpoint(
// since we released the lock in the meantime, we need to re-check
// that the conditions still hold.
if (shutdown) {
throw new CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
}
else if (!props.forceCheckpoint()) {
if (triggerRequestQueued) {
Expand Down Expand Up @@ -699,7 +700,7 @@ else if (!props.forceCheckpoint()) {
checkpointID, job, numUnsuccessful, t);

if (!checkpoint.isDiscarded()) {
checkpoint.abort(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
}

try {
Expand Down Expand Up @@ -891,11 +892,12 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro
try {
try {
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
}
catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
if (!pendingCheckpoint.isDiscarded()) {
pendingCheckpoint.abort(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}

throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
Expand Down Expand Up @@ -1002,7 +1004,7 @@ private void dropSubsumedCheckpoints(long checkpointId) {
// remove all pending checkpoints that are lesser than the current completed checkpoint
if (p.getCheckpointId() < checkpointId && p.canBeSubsumed()) {
rememberRecentCheckpointId(p.getCheckpointId());
p.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
failPendingCheckpoint(p, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
entries.remove();
}
}
Expand Down Expand Up @@ -1275,7 +1277,7 @@ public void stopCheckpointScheduler() {
public void abortPendingCheckpoints(CheckpointException exception) {
synchronized (lock) {
for (PendingCheckpoint p : pendingCheckpoints.values()) {
p.abort(exception.getCheckpointFailureReason());
failPendingCheckpoint(p, exception.getCheckpointFailureReason());
}

pendingCheckpoints.clear();
Expand Down Expand Up @@ -1329,10 +1331,13 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th

LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause);

if (cause == null || cause instanceof CheckpointDeclineException) {
pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause);
if (cause == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any harm in just rewriting this as

		if (cause == null) {
			failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED);
		} else if (cause instanceof CheckpointException) {
			CheckpointException exception = (CheckpointException) cause;
			failPendingCheckpoint(pendingCheckpoint, exception.getCheckpointFailureReason(), cause);
		} else {
			failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause);
		}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all CheckpointException objects come from the Flink code, this style is OK.

failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED);
} else if (cause instanceof CheckpointException) {
CheckpointException exception = (CheckpointException) cause;
failPendingCheckpoint(pendingCheckpoint, exception.getCheckpointFailureReason(), cause);
} else {
pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause);
failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that you actually did not touch the AbstractStreamOperator part. With your PR, if operator fail to complete the snapshot in method #snapshotState(long , long , CheckpointOptions , CheckpointStreamFactory ) and it would only decline that checkpoint and return as an Exception. In other words, CheckpointFailureManager would only process the failed checkpoint in this line which results in not increasing continuousFailureCounter in CheckpointFailureManager. Is this reasonable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, CheckpointFailureManager can take more effect in the future but not now. This is an intermediate step of the whole three PRs. In this PR, we need to keep compatible with setFailOnCheckpointingErrors, it's the most important thing. Otherwise, it will change many user behaviors. We have considered counting more failure reason before, but it will make more changes and make this PR more complex. So your thought is right but not for now.

The purpose of this PR is to introduce the CheckpointFailureManager and do further residual refactor work for the first PR #7571.

}

rememberRecentCheckpointId(checkpointId);
Expand Down Expand Up @@ -1384,4 +1389,21 @@ public void run() {
});
}
}

private void failPendingCheckpoint(
final PendingCheckpoint pendingCheckpoint,
final CheckpointFailureReason reason,
final Throwable cause) {

CheckpointException exception = new CheckpointException(reason, cause);
pendingCheckpoint.abort(reason, cause);
failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId());
}

private void failPendingCheckpoint(
final PendingCheckpoint pendingCheckpoint,
final CheckpointFailureReason reason) {

failPendingCheckpoint(pendingCheckpoint, reason, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.util.FlinkRuntimeException;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The checkpoint failure manager which centralized manage checkpoint failure processing logic.
*/
public class CheckpointFailureManager {

private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;

private final int tolerableCpFailureNumber;
private final FailJobCallback failureCallback;
private final AtomicInteger continuousFailureCounter;
private final Set<Long> countedCheckpointIds;

public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) {
checkArgument(tolerableCpFailureNumber >= 0,
"The tolerable checkpoint failure number is illegal, " +
"it must be greater than or equal to 0 .");
this.tolerableCpFailureNumber = tolerableCpFailureNumber;
this.continuousFailureCounter = new AtomicInteger(0);
this.failureCallback = checkNotNull(failureCallback);
this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
}

/**
* Handle checkpoint exception with a handler callback.
*
* @param exception the checkpoint exception.
* @param checkpointId the failed checkpoint id used to count the continuous failure number based on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about mentioning the special meaning of negative id values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will give more description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added the description about the checkpoint id parameter. And Have created a issue (FLINK-12514) to track the refactor work about the counting mechanism based on ordered checkpoint id in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, great. After all those changes I might do a final pass over all changes later but so far I think the outcome should be that we can merge soon.

* checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure
* happens before the checkpoint id generation. In this case, it will be specified a negative
* latest generated checkpoint id as a special flag.
*/
public void handleCheckpointException(CheckpointException exception, long checkpointId) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @StefanRRichter I have refactored the mechanism of counting based on your suggestion, it considers the checkpoint id's sequence. But when I am implementing, I meet a problem: the CheckpointException caused by Trigger phase may not get the checkpoint id.

Currently, the method triggerCheckpoint has two results:

  • Gets a pending checkpoint (can get the checkpoint id)
  • Throws a CheckpointException (whether could get checkpoint id or not depends on the exception's throw-point in this method)

So, I can not get the checkpoint id here.

My thought is that we could inject the checkpoint id into the CheckpointException(it seems the semantic looks strange?), if we can not inject it, we can use a default value(-1). Then in CheckpointFailureManager, if we can not get a normal checkpoint (we get -1 which means the checkpoint is not been generated in trigger phase), we would ignore this case. Actually, it seems this case is not the scene which we want to tolerance.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanghua I just had a quick lock at the implementation of the counting and it looks pretty complicated now and I think there is even a bug, e.g. if the latest checkpoint completes and clears the map and then an older checkpoint that was still ongoing fails it will add to the count. I think that the proper counting is maybe something that is a task in itself and in order to move quick with this task I suggest that in this step:

  • we implement your initial idea for the simpler counting.
  • but already include the checkpoint id in the reporting so we can later refine the counting based on order.
  • we create an issue for that refinement so that it is not forgotten and can be discussed.
    Overall, this is an implementation detail that should not stop us from progessing fast at this point.

About your problem of getting the checkpoint id in some cases. My thought is that we cannot even include it in the exception because some exceptions can happen before an id was assigned. However, if it happens in the trigger phase, we know that it happend for the very latest checkpoint. If we wanted, we could virtually consider such triggers like a checkpoints that fall between ids for our order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StefanRRichter Thanks for your suggestion, actually, I have not checked and verified the new logic of counting. I was implementing it and found it has been blocked by the checkpoint id. About the checkpoint id, I agree with you, adding it into checkpoint exception is not a good idea.

So I will roll back the current implementation to the old one and create an issue to refactor the counting mechanism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StefanRRichter Still the old question, if I include the checkpoint id in this method(your second suggestion), what arg should be passed in here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, what information do we need? You can say this is the checkpoint after the latest checkpoint id and before the next checkpoint id. So somehow it needs reporting of the current latest and also that it was not a "true" checkpoint id. Not saying this is the best way to encode it, but potentially (-latestGeneratedCheckpointId) would be a hacky way to provide all the info: latest checkpoint and negative becaue it is a virtual sucessor to that id, but never true part of the sequence of checkpoint ids.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add the checkpoint id as parameter with explanation to the docs. Especially after the discussion it sould be clear that this justifies mentioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see one important issue in this method that we should address: receiving multiple error reports for the same checkpoint id will continue to increase the count. For example, if the dfs goes away and suddenly all tasks report a problem for the same ongoing checkpoint, this would drive up the count, right? On checkpoint id should only be able to increase the count once. Special case is probably failing triggers that currently report negative checkpoint ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! You are right, I will consider and fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added judge logic when counting, considering the failure reason of special case(negative checkpoint ids) are all ignored, it's OK we do not need to take care of it.

if (tolerableCpFailureNumber == UNLIMITED_TOLERABLE_FAILURE_NUMBER) {
return;
}

CheckpointFailureReason reason = exception.getCheckpointFailureReason();
switch (reason) {
case PERIODIC_SCHEDULER_SHUTDOWN:
case ALREADY_QUEUED:
case TOO_MANY_CONCURRENT_CHECKPOINTS:
case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
case NOT_ALL_REQUIRED_TASKS_RUNNING:
case CHECKPOINT_SUBSUMED:
case CHECKPOINT_COORDINATOR_SUSPEND:
case CHECKPOINT_COORDINATOR_SHUTDOWN:
case JOB_FAILURE:
case JOB_FAILOVER_REGION:
//for compatibility purposes with user job behavior
case CHECKPOINT_DECLINED_TASK_NOT_READY:
case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
case CHECKPOINT_DECLINED_SUBSUMED:
case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:

case EXCEPTION:
case CHECKPOINT_EXPIRED:
case TASK_CHECKPOINT_FAILURE:
case TRIGGER_CHECKPOINT_FAILURE:
case FINALIZE_CHECKPOINT_FAILURE:
//ignore
break;

case CHECKPOINT_DECLINED:
//we should make sure one checkpoint only be counted once
if (countedCheckpointIds.add(checkpointId)) {
continuousFailureCounter.incrementAndGet();
}

break;

default:
throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name());
}

if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did not use equal is here. So if I accept your suggestion, here should change to continuousFailureCounter.get() > tolerableCpFailureNumber || continuousFailureCounter.get() < 0. The continuousFailureCounter.get() < 0 means overflow, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. I would even propose that if tolerableCpFailureNumber == Integer.MAX_VALUE that we consider this as "unlimited failures allowed" and don't even query the current count, e.g.
if (tolerableCpFailureNumber != Integer.MAX_VALUE && continuousFailureCounter.get() > tolerableCpFailureNumber)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepted! Have updated the code.

clearCount();
failureCallback.failJob();
}
}

/**
* Handle checkpoint success.
*
* @param checkpointId the failed checkpoint id used to count the continuous failure number based on
* checkpoint id sequence.
*/
public void handleCheckpointSuccess(long checkpointId) {
clearCount();
}

private void clearCount() {
continuousFailureCounter.set(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In think either we don't need an atomic integer as failure counter or this is broken for corner cases, e.g. missing a count. Maybe the methods should always be invoked under the lock of the checkpoint coordinator anyways. Also I think that it might make sense to track the errors by checkpoint id. For example, how do you count if cp-2 fails before a parallel cp-1 is successful. Wouln't this reset the count to 0 even if the latest checkpoint failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint can be executed parallelly. I think the "continuousFailureCounter" will just tract the invokable order, not checkpoint sequence number. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this topic was not explicitly discussed in the design doc, but I feel like it might make sense to have the failure count somehow correlated with the checkpoint sequence. My reason is the following line of thought: why should an old checkpoint that finally succeeds after a while clear the failure count that was increased by newer checkpoints that already failed, for example if the failure is persistent and will continue to happen for all future checkpoints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Actually, I think our focus is different. Just like I referenced the variable name continuousFailureCounter. I'm more concerned about “continuous” in the parallel scene. I think you are more concerned about "failure counter" based on checkpoint sequence(trigger order).

To tolerate successive checkpoint failures, there are usually two purposes:

  • Users don't want successful checkpoints to be too far away from the current time (if they can't succeed from now on);
  • In many cases, continuous failures are caused by third-party system exceptions when interacting with third-party systems. If it is short-term exceptions, then we are not inclined to restart job. If it is long-term exceptions, then we are willing to restart Job.

The first one is more likely to agree with you, while the second one is more likely to be related to the behavior at the time of execution. In your example, if the old checkpoint succeeds (possibly indicating that the third-party system is back to normal), it is also acceptable to reset the counter to reduce the risk of restart. My point is that the cost and risk of Job restart is very high, and judging by runtime continuity will make us less radical. What do you think about my opinion?

But I admit that it's easier for users to understand that from your point of view.

Copy link
Contributor

@StefanRRichter StefanRRichter May 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About your second point, if an old checkpoint succeeds and everything is back to normal, that would also mean that the next newer checkpoint will also very likely succeed and reset the counter anyways. So this scenario is (kind of) subsumed by the strategy I propsed: if the failure cause is gone, the next newer checkpoint from the last failing checkpoint will also clear the count. And if users are concerned about restart cost, they can set a higher tolerance. You can also argue the opposite way: it is expensive to wait a long time before restart if the failure was persitent and you have to replay a lot of past events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking deeply, I agree with you. Following the checkpoint's sequence is a better choice. Accept.

countedCheckpointIds.clear();
}

/**
* A callback interface about how to fail a job.
*/
public interface FailJobCallback {

void failJob();

}

}
Loading