From f7024439ead5e3848c705659bfe221b8ce50f154 Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Wed, 10 Jan 2018 15:43:20 +0800 Subject: [PATCH 1/2] [FLINK-8399] [runtime] use independent configurations for the different timeouts in slot manager --- .../configuration/ResourceManagerOptions.java | 21 +++++++++++++++++++ .../slotmanager/SlotManagerConfiguration.java | 20 +++++++----------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java index e2d96bb031313..9d6ab6dae567d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -58,6 +58,27 @@ public class ResourceManagerOptions { .defaultValue(600) .withDeprecatedKeys("yarn.heap-cutoff-min"); + /** + * The timeout for requesting slot to a task manager, in milliseconds. + */ + public static final ConfigOption TASK_MANAGER_REQUEST_TIMEOUT = ConfigOptions + .key("slotmanager.taskmanager.request-timeout") + .defaultValue(30000); + + /** + * The timeout for a slot request to be discarded, in milliseconds. + */ + public static final ConfigOption SLOT_REQUEST_TIMEOUT = ConfigOptions + .key("slotmanager.slot.request-timeout") + .defaultValue(600000); + + /** + * The timeout for an idle task manager to be released, in milliseconds. + */ + public static final ConfigOption TASK_MANAGER_TIMEOUT = ConfigOptions + .key("slotmanager.taskmanager.timeout") + .defaultValue(30000); + /** * Prefix for passing custom environment variables to Flink's master process. * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index 75cad07c7acdb..09f566700a539 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -19,11 +19,10 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.Preconditions; -import scala.concurrent.duration.Duration; public class SlotManagerConfiguration { @@ -53,16 +52,13 @@ public Time getTaskManagerTimeout() { } public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { - final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); - final Time timeout; + final Time taskManagerRequestTimeout = Time.milliseconds( + configuration.getInteger(ResourceManagerOptions.TASK_MANAGER_REQUEST_TIMEOUT)); + final Time slotRequestTimeout = Time.milliseconds( + configuration.getInteger(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)); + final Time taskManagerTimeout = Time.milliseconds( + configuration.getInteger(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); - try { - timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); - } catch (NumberFormatException e) { - throw new ConfigurationException("Could not parse the resource manager's timeout " + - "value " + AkkaOptions.ASK_TIMEOUT + '.', e); - } - - return new SlotManagerConfiguration(timeout, timeout, timeout); + return new SlotManagerConfiguration(taskManagerRequestTimeout, slotRequestTimeout, taskManagerTimeout); } } From 0a6b70140a257dec62b43c31750c7da979cd5c50 Mon Sep 17 00:00:00 2001 From: "shuai.xus" Date: Mon, 15 Jan 2018 14:20:29 +0800 Subject: [PATCH 2/2] refine the config names --- .../configuration/ResourceManagerOptions.java | 21 +++++++++++-------- .../slotmanager/SlotManagerConfiguration.java | 6 +++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java index 9d6ab6dae567d..2f2ba4e1aaf2a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -61,23 +61,26 @@ public class ResourceManagerOptions { /** * The timeout for requesting slot to a task manager, in milliseconds. */ - public static final ConfigOption TASK_MANAGER_REQUEST_TIMEOUT = ConfigOptions - .key("slotmanager.taskmanager.request-timeout") - .defaultValue(30000); + public static final ConfigOption TASK_MANAGER_REQUEST_TIMEOUT = ConfigOptions + .key("slotmanager.rpc-timeout") + .defaultValue(30000L) + .withDescription("The timeout for rpc request with task manager."); /** * The timeout for a slot request to be discarded, in milliseconds. */ - public static final ConfigOption SLOT_REQUEST_TIMEOUT = ConfigOptions - .key("slotmanager.slot.request-timeout") - .defaultValue(600000); + public static final ConfigOption SLOT_REQUEST_TIMEOUT = ConfigOptions + .key("slotmanager.request-timeout") + .defaultValue(600000L) + .withDescription("The timeout for a slot request to be discarded."); /** * The timeout for an idle task manager to be released, in milliseconds. */ - public static final ConfigOption TASK_MANAGER_TIMEOUT = ConfigOptions - .key("slotmanager.taskmanager.timeout") - .defaultValue(30000); + public static final ConfigOption TASK_MANAGER_TIMEOUT = ConfigOptions + .key("slotmanager.taskmanager-timeout") + .defaultValue(30000L) + .withDescription("The timeout for an idle task manager to be released."); /** * Prefix for passing custom environment variables to Flink's master process. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index 09f566700a539..dc37d551f28a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -53,11 +53,11 @@ public Time getTaskManagerTimeout() { public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { final Time taskManagerRequestTimeout = Time.milliseconds( - configuration.getInteger(ResourceManagerOptions.TASK_MANAGER_REQUEST_TIMEOUT)); + configuration.getLong(ResourceManagerOptions.TASK_MANAGER_REQUEST_TIMEOUT)); final Time slotRequestTimeout = Time.milliseconds( - configuration.getInteger(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)); + configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)); final Time taskManagerTimeout = Time.milliseconds( - configuration.getInteger(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); + configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); return new SlotManagerConfiguration(taskManagerRequestTimeout, slotRequestTimeout, taskManagerTimeout); }