From a87b2779dea9339df3272501bc3db599c4d8e065 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Sat, 12 Nov 2016 20:19:15 +0100 Subject: [PATCH 1/2] [FLINK-5057] [task] Pick cancellation timeout from task manager config --- .../org/apache/flink/runtime/taskmanager/Task.java | 5 +++-- .../apache/flink/runtime/taskmanager/TaskTest.java | 13 +++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 56aea1b5c574c..514a8d27e9089 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -287,11 +287,12 @@ public Task( this.nameOfInvokableClass = taskInformation.getInvokableClassName(); this.operatorState = operatorState; - this.taskCancellationInterval = taskConfiguration.getLong( + Configuration tmConfig = taskManagerConfig.getConfiguration(); + this.taskCancellationInterval = tmConfig.getLong( ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS, ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS); - this.taskCancellationTimeout = taskConfiguration.getLong( + this.taskCancellationTimeout = tmConfig.getLong( ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS, ConfigConstants.DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index b5056ed3e60e6..7bf73bce92f67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -752,10 +752,11 @@ private Task createTask(Class invokable, return createTask(invokable, libCache, networkEnvironment, new Configuration(), new ExecutionConfig()); } - private Task createTask(Class invokable, - LibraryCacheManager libCache, - NetworkEnvironment networkEnvironment, - Configuration taskConfig, + private Task createTask( + Class invokable, + LibraryCacheManager libCache, + NetworkEnvironment networkEnvironment, + Configuration taskManagerConfig, ExecutionConfig execConfig) throws IOException { JobID jobId = new JobID(); @@ -777,7 +778,7 @@ private Task createTask(Class invokable, "Test Task", 1, invokable.getName(), - taskConfig); + new Configuration()); return new Task( jobInformation, @@ -798,7 +799,7 @@ private Task createTask(Class invokable, new FiniteDuration(60, TimeUnit.SECONDS), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), + new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class)); } From cb8c777c98d54ae448255759e77614a25a431857 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 14 Nov 2016 09:02:15 +0100 Subject: [PATCH 2/2] Fix TaskStopTest --- .../org/apache/flink/runtime/taskmanager/TaskStopTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index 8c41a9b73582d..028a49a0e4ccf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -47,6 +47,7 @@ import java.util.Collections; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, FiniteDuration.class }) @@ -54,6 +55,8 @@ public class TaskStopTest { private Task task; public void doMocking(AbstractInvokable taskMock) throws Exception { + TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class); + when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration()); task = new Task( mock(JobInformation.class), @@ -79,7 +82,7 @@ public void doMocking(AbstractInvokable taskMock) throws Exception { mock(FiniteDuration.class), mock(LibraryCacheManager.class), mock(FileCache.class), - mock(TaskManagerRuntimeInfo.class), + tmRuntimeInfo, mock(TaskMetricGroup.class)); Field f = task.getClass().getDeclaredField("invokable"); @@ -91,7 +94,7 @@ public void doMocking(AbstractInvokable taskMock) throws Exception { f2.set(task, ExecutionState.RUNNING); } - @Test(timeout = 10000) + @Test(timeout = 20000) public void testStopExecution() throws Exception { StoppableTestTask taskMock = new StoppableTestTask(); doMocking(taskMock);