diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 777617d7e640a..8b960cf997be6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -992,6 +992,24 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH = "/confstore"; + @Private + @Unstable + public static final String RM_SCHEDCONF_ZK_STORE_MAX_RETRY_ATTEMPTS = + YARN_PREFIX + "scheduler.configuration.zk-store.max-retry-attempts"; + + @Private + @Unstable + public static final int DEFAULT_RM_SCHEDCONF_ZK_STORE_MAX_RETRY_ATTEMPTS = 30; + + @Private + @Unstable + public static final String RM_SCHEDCONF_ZK_STORE_RETRY_DELAY_MS = + YARN_PREFIX + "scheduler.configuration.zk-store.retry-delay-ms"; + + @Private + @Unstable + public static final int DEFAULT_RM_SCHEDCONF_ZK_STORE_RETRY_DELAY_MS = 1000; + @Private @Unstable public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java index ad0a4a4c6293b..97f8bea9096a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -64,6 +64,12 @@ public class ZKConfigurationStore extends YarnConfigurationStore { private static final String CONF_VERSION_PATH = "CONF_VERSION"; private static final String NODEEXISTS_MSG = "Encountered NodeExists error." + " Skipping znode creation since another RM has already created it"; + + @VisibleForTesting + protected int maxRetryAttempts; + @VisibleForTesting + protected long retryDelayMs; + private String znodeParentPath; private String zkVersionPath; private String logsPath; @@ -85,6 +91,14 @@ public void initialize(Configuration config, Configuration schedConf, this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS); + + this.maxRetryAttempts = conf.getInt( + YarnConfiguration.RM_SCHEDCONF_ZK_STORE_MAX_RETRY_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_STORE_MAX_RETRY_ATTEMPTS); + this.retryDelayMs = conf.getLong( + YarnConfiguration.RM_SCHEDCONF_ZK_STORE_RETRY_DELAY_MS, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_STORE_RETRY_DELAY_MS); + this.zkManager = rmContext.getResourceManager().createAndStartZKManager(conf); this.zkAcl = ZKCuratorManager.getZKAcls(conf); @@ -202,26 +216,44 @@ public void confirmMutation(LogMutation pendingMutation, @Override public synchronized Configuration retrieve() { - byte[] serializedSchedConf; - try { - serializedSchedConf = getZkData(confStorePath); - } catch (Exception e) { - LOG.error("Failed to retrieve configuration from zookeeper store", e); - return null; - } - try { - Map map = - unsafeCast(deserializeObject(serializedSchedConf)); - Configuration c = new Configuration(false); - for (Map.Entry e : map.entrySet()) { - c.set(e.getKey(), e.getValue()); + Configuration config = null; + int attempts = 0; + + do { + try { + if (attempts > 0) { + long randomDelay = (long) (Math.random() * 1000); + long delay = retryDelayMs + randomDelay; + Thread.sleep(delay); + } + + byte[] serializedSchedConf = getZkData(confStorePath); + + if (serializedSchedConf == null || serializedSchedConf.length == 0) { + LOG.warn("Configuration data from ZooKeeper path " + confStorePath + + " is null or empty (attempt " + (attempts + 1) + ")"); + } else { + + Map map = + unsafeCast(deserializeObject(serializedSchedConf)); + Configuration c = new Configuration(false); + for (Map.Entry e : map.entrySet()) { + c.set(e.getKey(), e.getValue()); + } + config = c; + } + } catch (Exception e) { + LOG.warn("Failed to retrieve and deserialize configuration from " + confStorePath + + " (attempt " + (attempts + 1) + ")", e); } - return c; - } catch (Exception e) { - LOG.error("Exception while deserializing scheduler configuration " + - "from store", e); + attempts++; + } while (config == null && attempts < maxRetryAttempts); + + if (config == null) { + LOG.error("Failed to retrieve configuration from ZooKeeper path " + confStorePath + + " after " + attempts + " attempts"); } - return null; + return config; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index 87a9e7d368fe9..ce9d256a9d809 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -418,6 +418,8 @@ public void testFailoverAfterRemoveQueue() throws Exception { @Timeout(value = 3) @SuppressWarnings("checkstyle:linelength") public void testDeserializationIsNotVulnerable() throws Exception { + conf.setInt(YarnConfiguration.RM_SCHEDCONF_ZK_STORE_MAX_RETRY_ATTEMPTS, 0); + conf.setInt(YarnConfiguration.RM_SCHEDCONF_ZK_STORE_RETRY_DELAY_MS, 0); confStore.initialize(conf, schedConf, rmContext); String confStorePath = getZkPath("CONF_STORE");