From 9abb5e148138746a61b9be77c1daf5e5b925257a Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Sat, 12 Nov 2016 20:19:15 +0100 Subject: [PATCH 1/2] [FLINK-5057] [task] Pick cancellation timeout from task manager config --- .../java/org/apache/flink/runtime/taskmanager/Task.java | 5 +++-- .../java/org/apache/flink/runtime/taskmanager/TaskTest.java | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b960e68cb2b97..3254fc1fbe300 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -299,8 +299,9 @@ public Task( this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig(); this.taskStateHandles = taskStateHandles; - this.taskCancellationInterval = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL); - this.taskCancellationTimeout = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT); + Configuration tmConfig = taskManagerConfig.getConfiguration(); + this.taskCancellationInterval = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL); + this.taskCancellationTimeout = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT); this.memoryManager = Preconditions.checkNotNull(memManager); this.ioManager = Preconditions.checkNotNull(ioManager); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 5d26050cf6982..8177bf79d10b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -781,7 +781,7 @@ private Task createTask( ResultPartitionConsumableNotifier consumableNotifier, PartitionStateChecker partitionStateChecker, Executor executor, - Configuration taskConfig, + Configuration taskManagerConfig, ExecutionConfig execConfig) throws IOException { JobID jobId = new JobID(); @@ -813,7 +813,7 @@ private Task createTask( 1, 1, invokable.getName(), - taskConfig); + new Configuration()); return new Task( jobInformation, @@ -834,7 +834,7 @@ private Task createTask( checkpointResponder, libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), + new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class), consumableNotifier, partitionStateChecker, From f715b20b33fda65893d3f7e88d531ca9b956d3e1 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 14 Nov 2016 09:02:15 +0100 Subject: [PATCH 2/2] Fix TaskStopTest --- .../org/apache/flink/runtime/taskmanager/TaskStopTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index 276e09020a357..d80dab3c66ed2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -64,6 +64,9 @@ public void doMocking(AbstractInvokable taskMock) throws Exception { TaskInfo taskInfoMock = mock(TaskInfo.class); when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName"); + TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class); + when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration()); + task = new Task( mock(JobInformation.class), new TaskInformation( @@ -89,7 +92,7 @@ public void doMocking(AbstractInvokable taskMock) throws Exception { mock(CheckpointResponder.class), mock(LibraryCacheManager.class), mock(FileCache.class), - mock(TaskManagerRuntimeInfo.class), + tmRuntimeInfo, mock(TaskMetricGroup.class), mock(ResultPartitionConsumableNotifier.class), mock(PartitionStateChecker.class),