From 9215d38f3695a735f0fe6869713ff6fad20f231f Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 18 May 2018 14:36:19 +0800 Subject: [PATCH 1/2] [FLINK-9326] TaskManagerOptions.NUM_TASK_SLOTS does not work for local/embedded mode --- .../api/environment/LocalStreamEnvironment.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index 8295e3c54d0a8..b9d320f6b930d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -103,9 +103,16 @@ public JobExecutionResult execute(String jobName) throws Exception { configuration.setInteger(RestOptions.PORT, 0); } + int numSlotsPerTaskManager; + if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) { + numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); + } else { + numSlotsPerTaskManager = jobGraph.getMaximumParallelism(); + } + MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) - .setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism()) + .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { From de9dea83c45798a5c879fecfd26b4bf9b073e9e7 Mon Sep 17 00:00:00 2001 From: yanghua Date: Sat, 26 May 2018 14:18:50 +0800 Subject: [PATCH 2/2] refactor code --- .../streaming/api/environment/LocalStreamEnvironment.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index b9d320f6b930d..6eec705ca4e62 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -103,12 +103,7 @@ public JobExecutionResult execute(String jobName) throws Exception { configuration.setInteger(RestOptions.PORT, 0); } - int numSlotsPerTaskManager; - if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) { - numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); - } else { - numSlotsPerTaskManager = jobGraph.getMaximumParallelism(); - } + int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration)