Skip to content

Commit

Permalink
[FLINK-4715] Fail TaskManager with fatal error if task cancellation i…
Browse files Browse the repository at this point in the history
…s stuck

- Splits the cancellation up into two threads:
  * The `TaskCanceler` calls `cancel` on the invokable and `interrupt`
    on the executing Thread. It then exists.
  * The `TaskCancellationWatchDog` kicks in after the task cancellation
    timeout (current default: 30 secs) and periodically calls `interrupt`
    on the executing Thread. If the Thread does not terminate within
    the task cancellation timeout (new config value, default 3 mins), the task
    manager is notified about a fatal error, leading to termination of the JVM.
- The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
(default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval).

Backported with slight adjustments from the master branch.
  • Loading branch information
uce committed Oct 27, 2016
1 parent 529534f commit cc6655b
Show file tree
Hide file tree
Showing 8 changed files with 748 additions and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
package org.apache.flink.api.common;

import com.esotericsoftware.kryo.Serializer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.util.Preconditions;


import java.io.Serializable;
import java.util.Collections;
Expand All @@ -32,6 +30,8 @@
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* A config to define the behavior of the program execution. It allows to define (among other
* options) the following settings:
Expand Down Expand Up @@ -116,6 +116,12 @@ public class ExecutionConfig implements Serializable {

private long taskCancellationIntervalMillis = -1;

/**
* Timeout after which an ongoing task cancellation will lead to a fatal
* TaskManager error, usually killing the JVM.
*/
private long taskCancellationTimeoutMillis = -1;

// ------------------------------- User code values --------------------------------------------

private GlobalJobParameters globalJobParameters;
Expand Down Expand Up @@ -219,7 +225,7 @@ public int getParallelism() {
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
"The parallelism of an operator must be at least 1.");

this.parallelism = parallelism;
Expand All @@ -244,6 +250,38 @@ public ExecutionConfig setTaskCancellationInterval(long interval) {
return this;
}

/**
* Returns the timeout (in milliseconds) after which an ongoing task
* cancellation leads to a fatal TaskManager error.
*
* <p>The value <code>0</code> disables the timeout. In this case a stuck
* cancellation will not lead to a fatal error.
*/
@PublicEvolving
public long getTaskCancellationTimeout() {
return this.taskCancellationTimeoutMillis;
}

/**
* Sets the timeout (in milliseconds) after which an ongoing task cancellation
* is considered failed, leading to a fatal TaskManager error.
*
* <p>By default, this is deactivated.
*
* <p>The cluster default is configured via {@link org.apache.flink.configuration.ConfigConstants#TASK_CANCELLATION_TIMEOUT_MILLIS}.
*
* <p>The value <code>0</code> disables the timeout. In this case a stuck
* cancellation will not lead to a fatal error.
*
* @param timeout The task cancellation timeout (in milliseconds).
*/
@PublicEvolving
public ExecutionConfig setTaskCancellationTimeout(long timeout) {
checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
this.taskCancellationTimeoutMillis = timeout;
return this;
}

/**
* Sets the restart strategy to be used for recovery.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ public final class ConfigConstants {
@PublicEvolving
public static final String TASK_CANCELLATION_INTERVAL_MILLIS = "task.cancellation-interval";

/**
* Timeout in milliseconds after which a task cancellation times out and
* leads to a fatal TaskManager error.
*/
@PublicEvolving
public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = "task.cancellation.timeout";

// --------------------------- Runtime Algorithms -------------------------------

/**
Expand Down Expand Up @@ -859,6 +866,13 @@ public final class ConfigConstants {
* */
public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 30000;

/**
* Default timeout in milliseconds after which a task cancellation times out
* and leads to a fatal TaskManager error. This has been backported from 1.2 and
* deactivated by default.
*/
public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; // deactivated

// ------------------------ Runtime Algorithms ------------------------

/**
Expand Down
Loading

0 comments on commit cc6655b

Please sign in to comment.