From be65857942fba7a8916be757dce78691db8a607d Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 18 Oct 2016 09:50:30 +0200 Subject: [PATCH 1/2] [FLINK-4715] Remove superfluous test --- .../runtime/taskmanager/TaskCancelTest.java | 258 ------------------ .../test/cancelling/CancelingTestBase.java | 54 ++-- 2 files changed, 35 insertions(+), 277 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java deleted file mode 100644 index 64b25e15edc54..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.network.api.reader.RecordReader; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; -import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus; -import org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound; -import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; -import org.apache.flink.types.IntValue; - -import org.junit.Test; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; -import static org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus; -import static org.apache.flink.util.Preconditions.checkNotNull; - -public class TaskCancelTest { - - @Test - public void testCancelUnion() throws Exception { - // Test config - int numberOfSources = 8; - int sourceParallelism = 4; - - TestingCluster flink = null; - - try { - // Start a cluster for the given test config - final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, sourceParallelism); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096); - config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048); - - flink = new TestingCluster(config, false); - flink.start(); - - // Setup - final JobGraph jobGraph = new JobGraph("Cancel Big Union"); - - JobVertex[] sources = new JobVertex[numberOfSources]; - SlotSharingGroup group = new SlotSharingGroup(); - - // Create multiple sources - for (int i = 0; i < sources.length; i++) { - sources[i] = new JobVertex("Source " + i); - sources[i].setInvokableClass(InfiniteSource.class); - sources[i].setParallelism(sourceParallelism); - sources[i].setSlotSharingGroup(group); - - jobGraph.addVertex(sources[i]); - group.addVertexToGroup(sources[i].getID()); - } - - // Union all sources - JobVertex union = new JobVertex("Union"); - union.setInvokableClass(AgnosticUnion.class); - union.setParallelism(sourceParallelism); - - jobGraph.addVertex(union); - - // Each source creates a separate result - for (JobVertex source : sources) { - union.connectNewDataSetAsInput( - source, - DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED); - } - - // run the job - flink.submitJobDetached(jobGraph); - - // Wait for the job to make some progress and then cancel - awaitRunning( - flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), - jobGraph.getJobID(), - TestingUtils.TESTING_DURATION()); - - Thread.sleep(5000); - - cancelJob( - flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), - jobGraph.getJobID(), - TestingUtils.TESTING_DURATION()); - - // Wait for the job to be cancelled - JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED, - flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), - TestingUtils.TESTING_DURATION()); - } - finally { - if (flink != null) { - flink.shutdown(); - } - } - } - - // --------------------------------------------------------------------------------------------- - - /** - * Requests the {@link JobManager} to cancel a running job. - * - * @param jobManager The JobManager actor. - * @param jobId The JobID of the job to cancel. - * @param timeout Duration in which the JobManager must have responded. - */ - public static void cancelJob(ActorGateway jobManager, JobID jobId, FiniteDuration timeout) - throws Exception { - - checkNotNull(jobManager); - checkNotNull(jobId); - checkNotNull(timeout); - - Future ask = jobManager.ask(new CancelJob(jobId), timeout); - - Object result = Await.result(ask, timeout); - - if (result instanceof CancellationSuccess) { - // Success - CancellationSuccess success = (CancellationSuccess) result; - - if (!success.jobID().equals(jobId)) { - throw new Exception("JobManager responded for wrong job ID. Request: " - + jobId + ", response: " + success.jobID() + "."); - } - } - else if (result instanceof CancellationFailure) { - // Failure - CancellationFailure failure = (CancellationFailure) result; - - throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".", - failure.cause()); - } - else { - throw new Exception("Unexpected response to cancel request: " + result); - } - } - - public static void awaitRunning(ActorGateway jobManager, JobID jobId, FiniteDuration timeout) - throws Exception { - - checkNotNull(jobManager); - checkNotNull(jobId); - checkNotNull(timeout); - - while (true) { - Future ask = jobManager.ask( - new RequestJobStatus(jobId), - timeout); - - Object result = Await.result(ask, timeout); - - if (result instanceof CurrentJobStatus) { - // Success - CurrentJobStatus status = (CurrentJobStatus) result; - - if (!status.jobID().equals(jobId)) { - throw new Exception("JobManager responded for wrong job ID. Request: " - + jobId + ", response: " + status.jobID() + "."); - } - - if (status.status() == JobStatus.RUNNING) { - return; - } - else if (status.status().isGloballyTerminalState()) { - throw new Exception("JobStatus changed to " + status.status() - + " while waiting for job to start running."); - } - } - else if (result instanceof JobNotFound) { - // Not found - throw new Exception("Cannot find job with ID " + jobId + "."); - } - else { - throw new Exception("Unexpected response to cancel request: " + result); - } - } - - } - - // --------------------------------------------------------------------------------------------- - - public static class InfiniteSource extends AbstractInvokable { - - @Override - public void invoke() throws Exception { - RecordWriter writer = new RecordWriter<>(getEnvironment().getWriter(0)); - - final IntValue val = new IntValue(); - - try { - for (int i = 0; true; i++) { - if (Thread.interrupted()) { - return; - } - - val.setValue(i); - writer.emit(val); - } - } - finally { - writer.clearBuffers(); - } - } - } - - public static class AgnosticUnion extends AbstractInvokable { - - @Override - public void invoke() throws Exception { - UnionInputGate union = new UnionInputGate(getEnvironment().getAllInputGates()); - RecordReader reader = new RecordReader<>( - union, IntValue.class, getEnvironment().getTaskManagerInfo().getTmpDirectories()); - - //noinspection StatementWithEmptyBody - while (reader.next() != null) {} - } - } -} diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index 8a08f15e051a3..8d8ee64133778 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -19,32 +19,36 @@ package org.apache.flink.test.cancelling; -import java.util.concurrent.TimeUnit; +import org.apache.flink.api.common.Plan; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.TestLogger; -import org.junit.Assert; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.Plan; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning; -import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; +import org.apache.flink.util.TestLogger; import org.apache.hadoop.fs.FileSystem; - import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; +import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; +import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; + /** * */ @@ -115,17 +119,30 @@ public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTime executor.submitJobDetached(jobGraph); // Wait for the job to make some progress and then cancel - awaitRunning( + JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, executor.getLeaderGateway(TestingUtils.TESTING_DURATION()), - jobGraph.getJobID(), TestingUtils.TESTING_DURATION()); Thread.sleep(msecsTillCanceling); - cancelJob( - executor.getLeaderGateway(TestingUtils.TESTING_DURATION()), - jobGraph.getJobID(), - new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS)); + FiniteDuration timeout = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS); + + ActorGateway jobManager = executor.getLeaderGateway(TestingUtils.TESTING_DURATION()); + + Future ask = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout); + + Object result = Await.result(ask, timeout); + + if (result instanceof CancellationSuccess) { + // all good + } else if (result instanceof CancellationFailure) { + // Failure + CancellationFailure failure = (CancellationFailure) result; + throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".", + failure.cause()); + } else { + throw new Exception("Unexpected response to cancel request: " + result); + } // Wait for the job to be cancelled JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED, @@ -137,7 +154,6 @@ public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTime e.printStackTrace(); Assert.fail(e.getMessage()); } - } private JobGraph getJobGraph(final Plan plan) throws Exception { From b47a7303f17cba0695bfb952999d41d6e3972529 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 18 Oct 2016 09:50:36 +0200 Subject: [PATCH 2/2] [FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck - Splits the cancellation up into two threads: * The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on the executing Thread. It then exists. * The `TaskCancellationWatchDog` kicks in after the task cancellation timeout (current default: 30 secs) and periodically calls `interrupt` on the executing Thread. If the Thread does not terminate within the task cancellation timeout (new config value, default 3 mins), the task manager is notified about a fatal error, leading to termination of the JVM. - The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS` (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval). --- .../flink/api/common/ExecutionConfig.java | 45 ++- .../flink/configuration/ConfigConstants.java | 9 +- .../configuration/TaskManagerOptions.java | 64 ++++ .../flink/runtime/taskmanager/Task.java | 153 ++++++++-- ...skManagerProcessReapingFatalErrorTest.java | 40 +++ .../TaskManagerProcessReapingTest.java | 241 +-------------- .../TaskManagerProcessReapingTestBase.java | 275 ++++++++++++++++++ .../flink/runtime/taskmanager/TaskTest.java | 253 +++++++++++++++- 8 files changed, 796 insertions(+), 284 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3daf9cf00633c..3cde5e76dcce7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -19,11 +19,10 @@ package org.apache.flink.api.common; import com.esotericsoftware.kryo.Serializer; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.util.Preconditions; - +import org.apache.flink.configuration.TaskManagerOptions; import java.io.Serializable; import java.util.Collections; @@ -32,6 +31,8 @@ import java.util.Map; import java.util.Objects; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * A config to define the behavior of the program execution. It allows to define (among other * options) the following settings: @@ -135,6 +136,12 @@ public class ExecutionConfig implements Serializable, Archiveable 0, "The maximum parallelism must be greater than 0."); + checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0."); this.maxParallelism = maxParallelism; } @@ -326,6 +333,36 @@ public ExecutionConfig setTaskCancellationInterval(long interval) { return this; } + /** + * Returns the timeout (in milliseconds) after which an ongoing task + * cancellation leads to a fatal TaskManager error. + * + *

The value 0 means that the timeout is disabled. In + * this case a stuck cancellation will not lead to a fatal error. + */ + @PublicEvolving + public long getTaskCancellationTimeout() { + return this.taskCancellationTimeoutMillis; + } + + /** + * Sets the timeout (in milliseconds) after which an ongoing task cancellation + * is considered failed, leading to a fatal TaskManager error. + * + *

The cluster default is configured via {@link TaskManagerOptions#TASK_CANCELLATION_TIMEOUT}. + * + *

The value 0 disables the timeout. In this case a stuck + * cancellation will not lead to a fatal error. + * + * @param timeout The task cancellation timeout (in milliseconds). + */ + @PublicEvolving + public ExecutionConfig setTaskCancellationTimeout(long timeout) { + checkArgument(timeout >= 0, "Timeout needs to be >= 0."); + this.taskCancellationTimeoutMillis = timeout; + return this; + } + /** * Sets the restart strategy to be used for recovery. * diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 9061e87cd4fce..0561fd7d98571 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -273,9 +273,10 @@ public final class ConfigConstants { public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause"; /** - * Time interval between two successive task cancellation attempts in milliseconds. + * Deprecated. Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}. */ @PublicEvolving + @Deprecated public static final String TASK_CANCELLATION_INTERVAL_MILLIS = "task.cancellation-interval"; // --------------------------- Runtime Algorithms ------------------------------- @@ -948,7 +949,6 @@ public final class ConfigConstants { @Deprecated public static final String SAVEPOINT_FS_DIRECTORY_KEY = "savepoints.state.backend.fs.dir"; - // ------------------------------------------------------------------------ // Default Values // ------------------------------------------------------------------------ @@ -1086,8 +1086,9 @@ public final class ConfigConstants { public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false; /** - * The default interval (in milliseconds) to wait between consecutive task cancellation attempts (= 30000 msec). - * */ + * Deprecated. Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}. + */ + @Deprecated public static final long DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS = 30000; // ------------------------ Runtime Algorithms ------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java new file mode 100644 index 0000000000000..0f60a4d971f05 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to TaskManager and Task settings. + */ +@PublicEvolving +public class TaskManagerOptions { + + // ------------------------------------------------------------------------ + // TaskManager Options + // ------------------------------------------------------------------------ + + // @TODO Migrate 'taskmanager.*' config options from ConfigConstants + + // ------------------------------------------------------------------------ + // Task Options + // ------------------------------------------------------------------------ + + /** + * Time interval in milliseconds between two successive task cancellation + * attempts. + */ + public static final ConfigOption TASK_CANCELLATION_INTERVAL = + key("task.cancellation.interval") + .defaultValue(30000L) + .withDeprecatedKeys("task.cancellation-interval"); + + /** + * Timeout in milliseconds after which a task cancellation times out and + * leads to a fatal TaskManager error. A value of 0 deactivates + * the watch dog. + */ + public static final ConfigOption TASK_CANCELLATION_TIMEOUT = + key("task.cancellation.timeout") + .defaultValue(180000L); + + // ------------------------------------------------------------------------ + + /** Not intended to be instantiated */ + private TaskManagerOptions() { + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index bd522bd929596..f09e88a37426f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.taskmanager; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; @@ -64,6 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.net.URL; import java.util.HashMap; @@ -75,6 +77,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -236,6 +239,9 @@ public class Task implements Runnable, TaskActions { /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ private long taskCancellationInterval; + /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ + private long taskCancellationTimeout; + /** *

IMPORTANT: This constructor may not start any work that would need to * be undone in the case of a failing task deployment.

@@ -270,9 +276,9 @@ public Task( this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig()); this.taskStateHandles = tdd.getTaskStateHandles(); - this.taskCancellationInterval = jobConfiguration.getLong( - ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, - ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS); + Configuration taskConfig = tdd.getTaskConfiguration(); + this.taskCancellationInterval = taskConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL); + this.taskCancellationTimeout = taskConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT); this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); @@ -401,6 +407,16 @@ public Thread getExecutingThread() { return executingThread; } + @VisibleForTesting + long getTaskCancellationInterval() { + return taskCancellationInterval; + } + + @VisibleForTesting + long getTaskCancellationTimeout() { + return taskCancellationTimeout; + } + // ------------------------------------------------------------------------ // Task Execution // ------------------------------------------------------------------------ @@ -498,6 +514,11 @@ else if (current == ExecutionState.CANCELING) { taskCancellationInterval = executionConfig.getTaskCancellationInterval(); } + if (executionConfig.getTaskCancellationTimeout() >= 0) { + // override task cancellation timeout from Flink config if set in ExecutionConfig + taskCancellationTimeout = executionConfig.getTaskCancellationTimeout(); + } + // now load the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); @@ -568,7 +589,6 @@ else if (current == ExecutionState.CANCELING) { taskStateHandles = null; } - // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- @@ -876,12 +896,16 @@ else if (current == ExecutionState.RUNNING) { // because the canceling may block on user code, we cancel from a separate thread // we do not reuse the async call handler, because that one may be blocked, in which // case the canceling could not continue + + // The canceller calls cancel and interrupts the executing thread once Runnable canceler = new TaskCanceler( LOG, invokable, executingThread, taskNameWithSubtask, taskCancellationInterval, + taskCancellationTimeout, + taskManagerConnection, producedPartitions, inputGates); Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler, @@ -1171,16 +1195,30 @@ private static class TaskCanceler implements Runnable { private final AbstractInvokable invokable; private final Thread executer; private final String taskName; - private final long taskCancellationIntervalMillis; private final ResultPartition[] producedPartitions; private final SingleInputGate[] inputGates; + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Watch Dog thread */ + @Nullable + private final Thread watchDogThread; + public TaskCanceler( Logger logger, AbstractInvokable invokable, Thread executer, String taskName, - long cancelationInterval, + long cancellationInterval, + long cancellationTimeout, + TaskManagerConnection taskManager, ResultPartition[] producedPartitions, SingleInputGate[] inputGates) { @@ -1188,26 +1226,46 @@ public TaskCanceler( this.invokable = invokable; this.executer = executer; this.taskName = taskName; - this.taskCancellationIntervalMillis = cancelationInterval; + this.interruptInterval = cancellationInterval; + this.interruptTimeout = cancellationTimeout; + this.taskManager = taskManager; this.producedPartitions = producedPartitions; this.inputGates = inputGates; + + if (cancellationTimeout > 0) { + // The watch dog repeatedly interrupts the executor until + // the cancellation timeout kicks in (at which point the + // task manager is notified about a fatal error) or the + // executor has terminated. + this.watchDogThread = new Thread( + executer.getThreadGroup(), + new TaskCancelerWatchDog(), + "WatchDog for " + taskName + " cancellation"); + this.watchDogThread.setDaemon(true); + } else { + this.watchDogThread = null; + } } @Override public void run() { try { + if (watchDogThread != null) { + watchDogThread.start(); + } + // the user-defined cancel method may throw errors. // we need do continue despite that try { invokable.cancel(); - } - catch (Throwable t) { + } catch (Throwable t) { logger.error("Error while canceling the task", t); } // Early release of input and output buffer pools. We do this // in order to unblock async Threads, which produce/consume the - // intermediate streams outside of the main Task Thread. + // intermediate streams outside of the main Task Thread (like + // the Kafka consumer). // // Don't do this before cancelling the invokable. Otherwise we // will get misleading errors in the logs. @@ -1230,16 +1288,46 @@ public void run() { // interrupt the running thread initially executer.interrupt(); try { - executer.join(taskCancellationIntervalMillis); + executer.join(interruptInterval); } catch (InterruptedException e) { // we can ignore this } - // it is possible that the user code does not react immediately. for that - // reason, we spawn a separate thread that repeatedly interrupts the user code until - // it exits + if (watchDogThread != null) { + watchDogThread.interrupt(); + watchDogThread.join(); + } + } catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private class TaskCancelerWatchDog implements Runnable { + + @Override + public void run() { + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. while (executer.isAlive()) { + long now = System.nanoTime(); + // build the stack trace of where the thread is stuck, for the log StringBuilder bld = new StringBuilder(); StackTraceElement[] stack = executer.getStackTrace(); @@ -1247,21 +1335,34 @@ public void run() { bld.append(e).append('\n'); } - logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", - taskName, bld.toString()); + if (now >= deadline) { + long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + String msg = String.format("Task '%s' did not react to cancelling signal in " + + "the last %d seconds, but is stuck in method:\n %s", + taskName, + duration, + bld.toString()); - executer.interrupt(); - try { - executer.join(taskCancellationIntervalMillis); - } - catch (InterruptedException e) { - // we can ignore this + taskManager.notifyFatalError(msg, null); + + return; // done, don't forget to leave the loop + } else { + logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", + taskName, bld.toString()); + + executer.interrupt(); + try { + long timeLeftNanos = Math.min(intervalNanos, deadline - now); + long timeLeftMillis = TimeUnit.MILLISECONDS.convert(timeLeftNanos, TimeUnit.NANOSECONDS); + + if (timeLeftMillis > 0) { + executer.join(timeLeftMillis); + } + } catch (InterruptedException ignored) { + } } } } - catch (Throwable t) { - logger.error("Error in the task canceler", t); - } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java new file mode 100644 index 0000000000000..1f0e84d58bd61 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingFatalErrorTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import akka.actor.ActorRef; +import org.apache.flink.runtime.messages.TaskManagerMessages; + +import static org.junit.Assert.assertTrue; + +/** + * Tests that the TaskManager process properly exits when the TaskManager actor dies. + */ +public class TaskManagerProcessReapingFatalErrorTest extends TaskManagerProcessReapingTestBase { + + @Override + void onTaskManagerProcessRunning(ActorRef taskManager) { + taskManager.tell(new TaskManagerMessages.FatalError("ouch", null), ActorRef.noSender()); + } + + @Override + void onTaskManagerProcessTerminated(String processOutput) { + assertTrue("Did not log expected message", processOutput.contains("ouch")); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java index 85d6ede183cde..8aed021f84bea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java @@ -19,248 +19,17 @@ package org.apache.flink.runtime.taskmanager; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.PoisonPill; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.FlinkResourceManager; -import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.util.NetUtils; - -import org.junit.Test; - -import scala.Some; -import scala.Tuple2; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.net.InetAddress; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; -import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; -import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; - /** * Tests that the TaskManager process properly exits when the TaskManager actor dies. */ -public class TaskManagerProcessReapingTest { - - @Test - public void testReapProcessOnFailure() { - Process taskManagerProcess = null; - ActorSystem jmActorSystem = null; - - final StringWriter processOutput = new StringWriter(); - - try { - String javaCommand = getJavaCommandPath(); - - // check that we run this test only if the java command - // is available on this machine - if (javaCommand == null) { - System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----"); - return; - } - - // create a logging file for the process - File tempLogFile = File.createTempFile("testlogconfig", "properties"); - tempLogFile.deleteOnExit(); - CommonTestUtils.printLog4jDebugConfig(tempLogFile); - - final InetAddress localhost = InetAddress.getByName("localhost"); - final int jobManagerPort = NetUtils.getAvailablePort(); - - // start a JobManager - Tuple2 localAddress = new Tuple2(localhost.getHostAddress(), jobManagerPort); - jmActorSystem = AkkaUtils.createActorSystem( - new Configuration(), new Some>(localAddress)); - - ActorRef jmActor = JobManager.startJobManagerActors( - new Configuration(), - jmActorSystem, - JobManager.class, - MemoryArchivist.class)._1; - - // start a ResourceManager - StandaloneLeaderRetrievalService standaloneLeaderRetrievalService = - new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor)); - - FlinkResourceManager.startResourceManagerActors( - new Configuration(), - jmActorSystem, - standaloneLeaderRetrievalService, - StandaloneResourceManager.class); - - final int taskManagerPort = NetUtils.getAvailablePort(); - - // start the task manager process - String[] command = new String[] { - javaCommand, - "-Dlog.level=DEBUG", - "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), - "-Xms256m", "-Xmx256m", - "-classpath", getCurrentClasspath(), - TaskManagerTestEntryPoint.class.getName(), - String.valueOf(jobManagerPort), String.valueOf(taskManagerPort) - }; - - ProcessBuilder bld = new ProcessBuilder(command); - taskManagerProcess = bld.start(); - new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput); - - // grab the reference to the TaskManager. try multiple times, until the process - // is started and the TaskManager is up - String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s", - org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort), - TaskManager.TASK_MANAGER_NAME()); - - ActorRef taskManagerRef = null; - Throwable lastError = null; - for (int i = 0; i < 40; i++) { - try { - taskManagerRef = TaskManager.getTaskManagerRemoteReference( - taskManagerActorName, jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS)); - break; - } - catch (Throwable t) { - // TaskManager probably not ready yet - lastError = t; - } - Thread.sleep(500); - } +public class TaskManagerProcessReapingTest extends TaskManagerProcessReapingTestBase { - assertTrue("TaskManager process died", isProcessAlive(taskManagerProcess)); - - if (taskManagerRef == null) { - if (lastError != null) { - lastError.printStackTrace(); - } - fail("TaskManager process did not launch the TaskManager properly. Failed to look up " - + taskManagerActorName); - } - - // kill the TaskManager actor - taskManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); - - // wait for max 5 seconds for the process to terminate - { - long now = System.currentTimeMillis(); - long deadline = now + 10000; - - while (now < deadline && isProcessAlive(taskManagerProcess)) { - Thread.sleep(100); - now = System.currentTimeMillis(); - } - } - - assertFalse("TaskManager process did not terminate upon actor death", isProcessAlive(taskManagerProcess)); - - int returnCode = taskManagerProcess.exitValue(); - assertEquals("TaskManager died, but not because of the process reaper", - TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode); - } - catch (Exception e) { - e.printStackTrace(); - printProcessLog(processOutput.toString()); - fail(e.getMessage()); - } - catch (Error e) { - e.printStackTrace(); - printProcessLog(processOutput.toString()); - throw e; - } - finally { - if (taskManagerProcess != null) { - taskManagerProcess.destroy(); - } - if (jmActorSystem != null) { - jmActorSystem.shutdown(); - } - } + @Override + void onTaskManagerProcessRunning(ActorRef taskManager) { + // kill the TaskManager actor + taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - private static void printProcessLog(String log) { - System.out.println("-----------------------------------------"); - System.out.println(" BEGIN SPAWNED PROCESS LOG"); - System.out.println("-----------------------------------------"); - System.out.println(log); - System.out.println("-----------------------------------------"); - System.out.println(" END SPAWNED PROCESS LOG"); - System.out.println("-----------------------------------------"); - } - - // -------------------------------------------------------------------------------------------- - - public static class TaskManagerTestEntryPoint { - - public static void main(String[] args) { - try { - int jobManagerPort = Integer.parseInt(args[0]); - int taskManagerPort = Integer.parseInt(args[1]); - - Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256); - - TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg); - - // wait forever - Object lock = new Object(); - synchronized (lock) { - lock.wait(); - } - } - catch (Throwable t) { - System.exit(1); - } - } - } - - private static class PipeForwarder extends Thread { - - private final StringWriter target; - private final InputStream source; - - public PipeForwarder(InputStream source, StringWriter target) { - super("Pipe Forwarder"); - setDaemon(true); - - this.source = source; - this.target = target; - - start(); - } - - @Override - public void run() { - try { - int next; - while ((next = source.read()) != -1) { - target.write(next); - } - } - catch (IOException e) { - // terminate - } - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java new file mode 100644 index 0000000000000..c7913f7ebbed6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.NetUtils; +import org.junit.Test; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.net.InetAddress; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests that the TaskManager process properly exits when the TaskManager actor dies. + */ +public abstract class TaskManagerProcessReapingTestBase { + + /** + * Called after the task manager has been started up. After calling this + * method, the test base checks that the process exits. + */ + abstract void onTaskManagerProcessRunning(ActorRef taskManager); + + /** + * Called after the task manager has successfully terminated. + */ + void onTaskManagerProcessTerminated(String processOutput) { + // Default does nothing + } + + @Test + public void testReapProcessOnFailure() { + Process taskManagerProcess = null; + ActorSystem jmActorSystem = null; + + final StringWriter processOutput = new StringWriter(); + + try { + String javaCommand = getJavaCommandPath(); + + // check that we run this test only if the java command + // is available on this machine + if (javaCommand == null) { + System.out.println("---- Skipping TaskManagerProcessReapingTest : Could not find java executable ----"); + return; + } + + // create a logging file for the process + File tempLogFile = File.createTempFile("testlogconfig", "properties"); + tempLogFile.deleteOnExit(); + CommonTestUtils.printLog4jDebugConfig(tempLogFile); + + final InetAddress localhost = InetAddress.getByName("localhost"); + final int jobManagerPort = NetUtils.getAvailablePort(); + + // start a JobManager + Tuple2 localAddress = new Tuple2(localhost.getHostAddress(), jobManagerPort); + jmActorSystem = AkkaUtils.createActorSystem( + new Configuration(), new Some>(localAddress)); + + ActorRef jmActor = JobManager.startJobManagerActors( + new Configuration(), + jmActorSystem, + JobManager.class, + MemoryArchivist.class)._1; + + // start a ResourceManager + StandaloneLeaderRetrievalService standaloneLeaderRetrievalService = + new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor)); + + FlinkResourceManager.startResourceManagerActors( + new Configuration(), + jmActorSystem, + standaloneLeaderRetrievalService, + StandaloneResourceManager.class); + + final int taskManagerPort = NetUtils.getAvailablePort(); + + // start the task manager process + String[] command = new String[] { + javaCommand, + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), + "-Xms256m", "-Xmx256m", + "-classpath", getCurrentClasspath(), + TaskManagerTestEntryPoint.class.getName(), + String.valueOf(jobManagerPort), String.valueOf(taskManagerPort) + }; + + ProcessBuilder bld = new ProcessBuilder(command); + taskManagerProcess = bld.start(); + new PipeForwarder(taskManagerProcess.getErrorStream(), processOutput); + + // grab the reference to the TaskManager. try multiple times, until the process + // is started and the TaskManager is up + String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s", + org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort), + TaskManager.TASK_MANAGER_NAME()); + + ActorRef taskManagerRef = null; + Throwable lastError = null; + for (int i = 0; i < 40; i++) { + try { + taskManagerRef = TaskManager.getTaskManagerRemoteReference( + taskManagerActorName, jmActorSystem, new FiniteDuration(25, TimeUnit.SECONDS)); + break; + } + catch (Throwable t) { + // TaskManager probably not ready yet + lastError = t; + } + Thread.sleep(500); + } + + assertTrue("TaskManager process died", isProcessAlive(taskManagerProcess)); + + if (taskManagerRef == null) { + if (lastError != null) { + lastError.printStackTrace(); + } + fail("TaskManager process did not launch the TaskManager properly. Failed to look up " + + taskManagerActorName); + } + + // kill the TaskManager actor + onTaskManagerProcessRunning(taskManagerRef); + + // wait for max 5 seconds for the process to terminate + { + long now = System.currentTimeMillis(); + long deadline = now + 10000; + + while (now < deadline && isProcessAlive(taskManagerProcess)) { + Thread.sleep(100); + now = System.currentTimeMillis(); + } + } + + assertFalse("TaskManager process did not terminate upon actor death", isProcessAlive(taskManagerProcess)); + + int returnCode = taskManagerProcess.exitValue(); + assertEquals("TaskManager died, but not because of the process reaper", + TaskManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode); + + onTaskManagerProcessTerminated(processOutput.toString()); + } + catch (Exception e) { + e.printStackTrace(); + printProcessLog(processOutput.toString()); + fail(e.getMessage()); + } + catch (Error e) { + e.printStackTrace(); + printProcessLog(processOutput.toString()); + throw e; + } + finally { + if (taskManagerProcess != null) { + taskManagerProcess.destroy(); + } + if (jmActorSystem != null) { + jmActorSystem.shutdown(); + } + } + } + + private static void printProcessLog(String log) { + System.out.println("-----------------------------------------"); + System.out.println(" BEGIN SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + System.out.println(log); + System.out.println("-----------------------------------------"); + System.out.println(" END SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + } + + // -------------------------------------------------------------------------------------------- + + public static class TaskManagerTestEntryPoint { + + public static void main(String[] args) { + try { + int jobManagerPort = Integer.parseInt(args[0]); + int taskManagerPort = Integer.parseInt(args[1]); + + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); + cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256); + + TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg); + + // wait forever + Object lock = new Object(); + synchronized (lock) { + lock.wait(); + } + } + catch (Throwable t) { + System.exit(1); + } + } + } + + private static class PipeForwarder extends Thread { + + private final StringWriter target; + private final InputStream source; + + public PipeForwarder(InputStream source, StringWriter target) { + super("Pipe Forwarder"); + setDaemon(true); + + this.source = source; + this.target = target; + + start(); + } + + @Override + public void run() { + try { + int next; + while ((next = source.read()) != -1) { + target.write(next); + } + } + catch (IOException e) { + // terminate + } + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 9a13cdeab07fb..c38d23a9c6b9c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -45,10 +46,12 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -87,11 +90,12 @@ * execution listener, which simply put the messages in a queue to be picked * up by the test and validated. */ -public class TaskTest { +public class TaskTest extends TestLogger { private static OneShotLatch awaitLatch; private static OneShotLatch triggerLatch; - + private static OneShotLatch cancelLatch; + private ActorGateway taskManagerGateway; private ActorGateway jobManagerGateway; private ActorGateway listenerGateway; @@ -117,6 +121,7 @@ public void createQueuesAndActors() { awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); + cancelLatch = new OneShotLatch(); } @After @@ -565,6 +570,123 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** + * Tests that interrupt happens via watch dog if canceller is stuck in cancel. + * Task cancellation blocks the task canceller. Interrupt after cancel via + * cancellation watch dog. + */ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + task.getExecutingThread().join(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertFalse("Unexpected FatalError message", msg instanceof TaskManagerMessages.FatalError); + } + } + + /** + * The invoke() method holds a lock (trigger awaitLatch after acquisition) + * and cancel cannot complete because it also tries to acquire the same lock. + * This is resolved by the watch dog, no fatal error. + */ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + task.getExecutingThread().join(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertFalse("Unexpected FatalError message", msg instanceof TaskManagerMessages.FatalError); + } + } + + /** + * The invoke() method blocks infinitely, but cancel() does not block. Only + * resolved by a fatal error. + */ + @Test + public void testFatalErrorAfterUninterruptibleInvoke() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableUninterruptibleBlockingInvoke.class, config); + + try { + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + for (int i = 0; i < 10; i++) { + Object msg = taskManagerMessages.poll(1, TimeUnit.SECONDS); + if (msg instanceof TaskManagerMessages.FatalError) { + return; // success + } + } + + fail("Did not receive expected task manager message"); + } finally { + // Interrupt again to clean up Thread + cancelLatch.trigger(); + task.getExecutingThread().interrupt(); + task.getExecutingThread().join(); + } + } + + /** + * Tests that the task configuration is respected and overwritten by the execution config. + */ + @Test + public void testTaskConfig() throws Exception { + long interval = 28218123; + long timeout = interval + 19292; + + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), interval); + config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), timeout); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setTaskCancellationInterval(interval + 1337); + executionConfig.setTaskCancellationTimeout(timeout - 1337); + + Task task = createTask(InvokableBlockingInInvoke.class, config, executionConfig); + + assertEquals(interval, task.getTaskCancellationInterval()); + assertEquals(timeout, task.getTaskCancellationTimeout()); + + task.startTaskThread(); + + awaitLatch.await(); + + assertEquals(executionConfig.getTaskCancellationInterval(), task.getTaskCancellationInterval()); + assertEquals(executionConfig.getTaskCancellationTimeout(), task.getTaskCancellationTimeout()); + + task.getExecutingThread().interrupt(); + task.getExecutingThread().join(); + } + // ------------------------------------------------------------------------ private void setInputGate(Task task, SingleInputGate inputGate) { @@ -597,13 +719,33 @@ private void setState(Task task, ExecutionState state) { } private Task createTask(Class invokable) { + return createTask(invokable, new Configuration(), new ExecutionConfig()); + } + + private Task createTask(Class invokable, Configuration config) { + LibraryCacheManager libCache = mock(LibraryCacheManager.class); + when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); + return createTask(invokable, libCache, config, new ExecutionConfig()); + } + + private Task createTask(Class invokable, Configuration config, ExecutionConfig execConfig) { LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); - return createTask(invokable, libCache); + return createTask(invokable, libCache, config, execConfig); + } + + private Task createTask( + Class invokable, + LibraryCacheManager libCache) { + + return createTask(invokable, libCache, new Configuration(), new ExecutionConfig()); } - private Task createTask(Class invokable, - LibraryCacheManager libCache) { + private Task createTask( + Class invokable, + LibraryCacheManager libCache, + Configuration config, + ExecutionConfig execConfig) { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); @@ -615,7 +757,17 @@ private Task createTask(Class invokable, when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - return createTask(invokable, libCache, network, consumableNotifier, partitionStateChecker, executor); + return createTask(invokable, libCache, network, consumableNotifier, partitionStateChecker, executor, config, execConfig); + } + + private Task createTask( + Class invokable, + LibraryCacheManager libCache, + NetworkEnvironment networkEnvironment, + ResultPartitionConsumableNotifier consumableNotifier, + PartitionStateChecker partitionStateChecker, + Executor executor) { + return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionStateChecker, executor, new Configuration(), new ExecutionConfig()); } private Task createTask( @@ -624,9 +776,11 @@ private Task createTask( NetworkEnvironment networkEnvironment, ResultPartitionConsumableNotifier consumableNotifier, PartitionStateChecker partitionStateChecker, - Executor executor) { + Executor executor, + Configuration config, + ExecutionConfig execConfig) { - TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable); + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable, config, execConfig); InputSplitProvider inputSplitProvider = new TaskInputSplitProvider( jobManagerGateway, @@ -655,19 +809,23 @@ private Task createTask( executor); } - private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class invokable) { - SerializedValue execConfig; + private TaskDeploymentDescriptor createTaskDeploymentDescriptor( + Class invokable, + Configuration taskConfig, + ExecutionConfig execConfig) { + + SerializedValue serializedExecConfig; try { - execConfig = new SerializedValue<>(new ExecutionConfig()); + serializedExecConfig = new SerializedValue<>(execConfig); } catch (IOException e) { throw new RuntimeException(e); } - + return new TaskDeploymentDescriptor( new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(), - execConfig, + serializedExecConfig, "Test Task", 1, 0, 1, 0, - new Configuration(), new Configuration(), + new Configuration(), taskConfig, invokable.getName(), Collections.emptyList(), Collections.emptyList(), @@ -869,4 +1027,71 @@ public void invoke() throws Exception { throw new CancelTaskException(); } } + + public static final class InvokableInterruptableSharedLockInInvokeAndCancel extends AbstractInvokable { + + private final Object lock = new Object(); + + @Override + public void invoke() throws Exception { + synchronized (lock) { + awaitLatch.trigger(); + wait(); + } + } + + @Override + public void cancel() throws Exception { + synchronized (lock) { + cancelLatch.trigger(); + } + } + } + + public static final class InvokableBlockingInCancel extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + awaitLatch.trigger(); + + try { + cancelLatch.await(); + synchronized (this) { + wait(); + } + } catch (InterruptedException ignored) { + synchronized (this) { + notifyAll(); // notify all that are stuck in cancel + } + } + } + + @Override + public void cancel() throws Exception { + synchronized (this) { + cancelLatch.trigger(); + wait(); + } + } + } + + public static final class InvokableUninterruptibleBlockingInvoke extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + while (!cancelLatch.isTriggered()) { + try { + synchronized (this) { + awaitLatch.trigger(); + wait(); + } + } catch (InterruptedException ignored) { + } + } + } + + @Override + public void cancel() throws Exception { + } + } }