From 1d1624ec8688c4bdc0fe89553163138b2a1b8531 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Wed, 7 Dec 2022 11:24:58 -0800 Subject: [PATCH 1/4] YARN-11391 Add yarn RM DNS support --- .../hadoop/net/MockDomainNameResolver.java | 5 + .../org/apache/hadoop/yarn/conf/HAUtil.java | 238 +++++++++++++++++- .../hadoop/yarn/conf/YarnConfiguration.java | 9 + .../hadoop/yarn/client/ClientRMProxy.java | 8 +- .../ConfiguredRMFailoverProxyProvider.java | 56 ++++- .../src/main/resources/yarn-default.xml | 9 + ...TestConfiguredRMFailoverProxyProvider.java | 232 +++++++++++++++++ 7 files changed, 540 insertions(+), 17 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java index 10416e22116b6..4ae0892bc3c4a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/MockDomainNameResolver.java @@ -95,4 +95,9 @@ public String[] getAllResolvedHostnameByDomainName( public void setAddressMap(Map addresses) { this.addrs = addresses; } + + @VisibleForTesting + public void setPtrMap(Map ptrMap) { + this.ptrMap = ptrMap; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index c85e211bd31c9..a75c731ea43b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -19,11 +19,22 @@ package org.apache.hadoop.yarn.conf; import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.DomainNameResolver; +import org.apache.hadoop.net.DomainNameResolverFactory; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -39,8 +50,29 @@ public class HAUtil { public static final String BAD_CONFIG_MESSAGE_PREFIX = "Invalid configuration! "; + private final static List RM_ADDRESS_CONFIG_KEYS = Arrays.asList( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.RM_WEBAPP_ADDRESS, + YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ); + + private static DomainNameResolver dnr; + private static Collection originalRMIDs = null; + private HAUtil() { /* Hidden constructor */ } + public static DomainNameResolver getDnr() { + return dnr; + } + + public static void setDnrByConfiguration(Configuration conf) { + HAUtil.dnr = DomainNameResolverFactory.newInstance( + conf, YarnConfiguration.RESOLVE_RM_ADDRESS_KEY); + } + private static void throwBadConfigurationException(String msg) { throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg); } @@ -118,6 +150,12 @@ public static void verifyAndSetConfiguration(Configuration conf) * Then set the RM-ids. */ private static void verifyAndSetRMHAIdsList(Configuration conf) { + boolean resolveNeeded = conf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_DEFAULT); + if (resolveNeeded) { + getRMHAId(conf); + } Collection ids = conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS); if (ids.size() < 2) { @@ -227,6 +265,16 @@ public static Collection getRMHAIds(Configuration conf) { return conf.getStringCollection(YarnConfiguration.RM_HA_IDS); } + /** + * Instead of returning RM_HA_IDS in current configurations, it + * would return the originally preset one in case of DNS resolving + * @param conf Configuration. + * @return RM Ids from original xml file + */ + public static Collection getOriginalRMHAIds(Configuration conf) { + return originalRMIDs == null ? getRMHAIds(conf) : originalRMIDs; + } + /** * @param conf Configuration. Please use verifyAndSetRMHAId to check. * @return RM Id on success @@ -235,19 +283,10 @@ public static String getRMHAId(Configuration conf) { int found = 0; String currentRMId = conf.getTrimmed(YarnConfiguration.RM_HA_ID); if(currentRMId == null) { - for(String rmId : getRMHAIds(conf)) { - String key = addSuffix(YarnConfiguration.RM_ADDRESS, rmId); - String addr = conf.get(key); - if (addr == null) { - continue; - } - InetSocketAddress s; - try { - s = NetUtils.createSocketAddr(addr); - } catch (Exception e) { - LOG.warn("Exception in creating socket address " + addr, e); - continue; - } + Map idAddressPairs = getResolvedRMIdPairs(conf); + for (Map.Entry entry : idAddressPairs.entrySet()) { + String rmId = entry.getKey(); + InetSocketAddress s = entry.getValue(); if (!s.isUnresolved() && NetUtils.isLocalAddress(s.getAddress())) { currentRMId = rmId.trim(); found++; @@ -262,6 +301,179 @@ public static String getRMHAId(Configuration conf) { return currentRMId; } + /** + * This function resolves all RMIds with their address. For multi-A DNS records, + * it will resolve all of them, and generate a new Id for each of them. + * + * @param conf Configuration + * @return Map key as RMId, value as its address + */ + public static Map getResolvedRMIdPairs( + Configuration conf) { + boolean resolveNeeded = conf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_DEFAULT); + boolean requireFQDN = conf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_TO_FQDN, + YarnConfiguration.RESOLVE_RM_ADDRESS_TO_FQDN_DEFAULT); + // In case client using DIFFERENT addresses for each service address + // need to categorize them first + Map, List> addressesConfigKeysMap = new HashMap<>(); + Collection rmIds = getOriginalRMHAIds(conf); + for (String configKey : RM_ADDRESS_CONFIG_KEYS) { + List addresses = new ArrayList<>(); + for (String rmId : rmIds) { + String keyToRead = addSuffix(configKey, rmId); + InetSocketAddress address = getInetSocketAddressFromString( + conf.get(keyToRead)); + if (address != null) { + addresses.add(address.getHostName()); + } + } + Collections.sort(addresses); + List configKeysOfTheseAddresses = addressesConfigKeysMap.get(addresses); + if (configKeysOfTheseAddresses == null) { + configKeysOfTheseAddresses = new ArrayList<>(); + addressesConfigKeysMap.put(addresses, configKeysOfTheseAddresses); + } + configKeysOfTheseAddresses.add(configKey); + } + // We need to resolve and override by group (categorized by their input host) + // But since the function is called from "getRMHAId", + // this function would only return value which is corresponded to YarnConfiguration.RM_ADDRESS + Map ret = null; + for (List configKeys : addressesConfigKeysMap.values()) { + Map res = getResolvedIdPairs(conf, resolveNeeded, requireFQDN, getOriginalRMHAIds(conf), + configKeys.get(0), YarnConfiguration.RM_HA_IDS, configKeys); + if (configKeys.contains(YarnConfiguration.RM_ADDRESS)) { + ret = res; + } + } + return ret; + } + + private static Map getResolvedIdPairs( + Configuration conf, boolean resolveNeeded, boolean requireFQDN, Collection ids, + String configKey, String configKeyToReplace, List listOfConfigKeysToReplace) { + Map idAddressPairs = new HashMap<>(); + Map generatedIdToOriginalId = new HashMap<>(); + for (String id : ids) { + String key = addSuffix(configKey, id); + String addr = conf.get(key); // string with port + InetSocketAddress address = getInetSocketAddressFromString(addr); + if (address == null) { + continue; + } + if (resolveNeeded) { + if (dnr == null) { + setDnrByConfiguration(conf); + } + // If the address needs to be resolved, get all of the IP addresses + // from this address and pass them into the map + LOG.info("Multi-A domain name " + addr + + " will be resolved by " + dnr.getClass().getName()); + int port = address.getPort(); + String[] resolvedHostNames; + try { + resolvedHostNames = dnr.getAllResolvedHostnameByDomainName( + address.getHostName(), requireFQDN); + } catch (UnknownHostException e) { + LOG.warn("Exception in resolving socket address " + + address.getHostName(), e); + continue; + } + LOG.info("Resolved addresses for " + addr + + " is " + Arrays.toString(resolvedHostNames)); + if (resolvedHostNames == null || resolvedHostNames.length < 1) { + LOG.warn("Cannot resolve from address " + address.getHostName()); + } else { + // If multiple address resolved, corresponding id needs to be created + for (int i = 0; i < resolvedHostNames.length; i++) { + String generatedRMId = id + "_resolved_" + (i + 1); + idAddressPairs.put(generatedRMId, + new InetSocketAddress(resolvedHostNames[i], port)); + generatedIdToOriginalId.put(generatedRMId, id); + } + } + overrideIdsInConfiguration( + idAddressPairs, generatedIdToOriginalId, configKeyToReplace, + listOfConfigKeysToReplace, conf); + } else { + idAddressPairs.put(id, address); + } + } + return idAddressPairs; + } + + /** + * This function override all RMIds and their addresses by the input Map. + * + * @param idAddressPairs key as Id, value as its address + * @param generatedIdToOriginalId key as generated rmId from multi-A, + * value as its original input Id + * @param configKeyToReplace key to replace + * @param listOfConfigKeysToReplace list of keys to replace/add + * @param conf Configuration + */ + synchronized static void overrideIdsInConfiguration( + Map idAddressPairs, + Map generatedIdToOriginalId, + String configKeyToReplace, List listOfConfigKeysToReplace, + Configuration conf) { + Collection currentIds = getRMHAIds(conf); + Set resolvedIds = new HashSet<>(idAddressPairs.keySet()); + // override rm-ids + if (originalRMIDs == null) { + originalRMIDs = currentIds; + } + // if it is already resolved, we need to form a superset + resolvedIds.addAll((currentIds)); + resolvedIds.removeAll(generatedIdToOriginalId.values()); + conf.setStrings(configKeyToReplace, + resolvedIds.toArray(new String[0])); + // override/add address configuration entries for each rm-id + for (Map.Entry entry : idAddressPairs.entrySet()) { + String rmId = entry.getKey(); + String addr = entry.getValue().getHostName(); + String originalRMId = generatedIdToOriginalId.get(rmId); + if (originalRMId != null) { + // for each required configKeys, get its port and then set it back + for (String configKey : listOfConfigKeysToReplace) { + String keyToRead = addSuffix(configKey, originalRMId); + InetSocketAddress originalAddress = getInetSocketAddressFromString( + conf.get(keyToRead)); + if (originalAddress == null) { + LOG.warn("Missing configuration for key " + keyToRead); + continue; + } + int port = originalAddress.getPort(); + String keyToWrite = addSuffix(configKey, rmId); + conf.setSocketAddr(keyToWrite, new InetSocketAddress(addr, port)); + } + } + } + } + + /** + * Helper function to create InetsocketAddress from string address. + * + * @param addr string format of address accepted by NetUtils.createSocketAddr + * @return InetSocketAddress of input, would return null upon any kinds of invalid inout + */ + public static InetSocketAddress getInetSocketAddressFromString(String addr) { + if (addr == null) { + return null; + } + InetSocketAddress address; + try { + address = NetUtils.createSocketAddr(addr); + } catch (Exception e) { + LOG.warn("Exception in creating socket address " + addr, e); + return null; + } + return address; + } + @VisibleForTesting static String getNeedToSetValueMessage(String confKey) { return confKey + " needs to be set in an HA configuration."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 316a6421889bd..afa968b2abbf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -895,6 +895,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; + /** YARN DNS resolving related configs */ + public static final String RESOLVE_RM_ADDRESS_NEEDED_KEY = RM_HA_PREFIX + "resolve-needed"; + public static final boolean RESOLVE_RM_ADDRESS_NEEDED_DEFAULT = false; + public static final String RESOLVE_RM_ADDRESS_KEY = RM_HA_PREFIX + "resolver.impl"; + public static final String RESOLVE_RM_ADDRESS_TO_FQDN = RM_HA_PREFIX + "resolver.useFQDN"; + public static final boolean RESOLVE_RM_ADDRESS_TO_FQDN_DEFAULT = true; + public static final String RM_ID_REFRESH_INTERVAL = RM_HA_PREFIX + "refresh-period-ms"; + public static final long RM_ID_REFRESH_INTERVAL_DEFAULT = -1; + /** Store the related configuration files in File System */ public static final String FS_BASED_RM_CONF_STORE = RM_PREFIX + "configuration.file-system-based-store"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 35b1906698b7f..52354fff65568 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -148,7 +148,13 @@ public static Text getTokenService(Configuration conf, String address, // Build a list of service addresses to form the service name ArrayList services = new ArrayList(); YarnConfiguration yarnConf = new YarnConfiguration(conf); - for (String rmId : HAUtil.getRMHAIds(conf)) { + boolean resolveNeeded = yarnConf.getBoolean( + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, + YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_DEFAULT); + if (resolveNeeded) { + HAUtil.getResolvedRMIdPairs(yarnConf); + } + for (String rmId : HAUtil.getRMHAIds(yarnConf)) { // Set RM_ID to get the corresponding RM_ADDRESS yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); services.add(SecurityUtil.buildTokenService( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java index 89c8753868efc..60fb4e5b7028d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -50,6 +51,9 @@ public class ConfiguredRMFailoverProxyProvider protected YarnConfiguration conf; protected String[] rmServiceIds; + private long rmIdsLastRefreshTime; + private long rmIdsRefreshInterval; + @Override public void init(Configuration configuration, RMProxy rmProxy, Class protocol) { @@ -57,9 +61,9 @@ public void init(Configuration configuration, RMProxy rmProxy, this.protocol = protocol; this.rmProxy.checkAllowedProtocols(this.protocol); this.conf = new YarnConfiguration(configuration); - Collection rmIds = HAUtil.getRMHAIds(conf); - this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]); - conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); + this.rmIdsRefreshInterval = conf.getLong(YarnConfiguration.RM_ID_REFRESH_INTERVAL, + YarnConfiguration.RM_ID_REFRESH_INTERVAL_DEFAULT); + refreshRMIds(true); conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, @@ -84,6 +88,9 @@ protected T getProxyInternal() { @Override public synchronized ProxyInfo getProxy() { + if (shouldRefreshAddress()) { + refreshRMIds(false); + } String rmId = rmServiceIds[currentProxyIndex]; T current = proxies.get(rmId); if (current == null) { @@ -119,4 +126,47 @@ public synchronized void close() throws IOException { } } } + + private boolean shouldRefreshAddress() { + long currentTime = System.currentTimeMillis(); + return rmIdsRefreshInterval > 0 && + currentTime > rmIdsLastRefreshTime + rmIdsRefreshInterval; + } + + private synchronized void refreshRMIds(boolean forceRefresh) throws IllegalStateException { + if (!forceRefresh && !shouldRefreshAddress()) { + return; + } + String currentRMId = HAUtil.getRMHAId(conf); + InetSocketAddress currentAddress = null; + if (currentRMId != null) { + // explicitly refresh it if not done + currentAddress = HAUtil.getInetSocketAddressFromString(conf.get(HAUtil.addSuffix( + YarnConfiguration.RM_ADDRESS, currentRMId))); + HAUtil.getResolvedRMIdPairs(conf); + } + rmIdsLastRefreshTime = System.currentTimeMillis(); + Collection rmIds = HAUtil.getRMHAIds(conf); + if (rmIds == null || rmIds.isEmpty()) { + String message = "no instances configured."; + LOG.error(message); + throw new IllegalStateException(message); + } + proxies.clear(); + this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]); + // After refresh, we should keep current rm pointing to the previous active one + if (currentRMId != null) { + for (int i = 0; i < rmServiceIds.length; i++) { + String confKey = HAUtil.addSuffix( + YarnConfiguration.RM_ADDRESS, rmServiceIds[i]); + InetSocketAddress thatAddress = + HAUtil.getInetSocketAddressFromString(conf.get(confKey)); + if (currentAddress.equals(thatAddress)) { + currentProxyIndex = i; + break; + } + } + } + conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 80672fb1cc8e4..cc20a2e3d1571 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -668,6 +668,15 @@ false + + + Determines what class to use to resolve resourcemanager address to specific machine + address(es). + + yarn.resourcemanager.ha.resolver.impl + org.apache.hadoop.net.DNSDomainNameResolver + + Enable automatic failover. By default, it is enabled only when HA is enabled diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java new file mode 100644 index 0000000000000..95b8a0e1de607 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.net.MockDomainNameResolver; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +public class TestConfiguredRMFailoverProxyProvider { + private Configuration conf; + private final static List HOST_LIST = Arrays.asList( + "host01", + "host02", + "host03", + "host04" + ); + private final static String SCHEDULER_HOST_ADDRESS = "host11"; + private final static List IP_LIST = Arrays.asList( + new byte[] {10, 0, 0, 1}, + new byte[] {10, 0, 0, 2}, + new byte[] {10, 0, 0, 3}, + new byte[] {10, 0, 0, 4} + ); + private final static String MULTI_A_RM_ID = "rm"; + private final static String MULTI_A_RM_ADDRESS = "rm.com"; + private final static String MULTI_A_RM_SCHEDULER_ADDRESS = "rmscheduler.com"; + private final static long REFRESH_TIME_INTERVAL = 1000; // unit ms + + ConfiguredRMFailoverProxyProvider proxyProvider; + + @Before + public void testInit() { + conf = new Configuration(); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + RMProxy rm1Mock = mock(RMProxy.class); + doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); + proxyProvider = new ConfiguredRMFailoverProxyProvider(); + proxyProvider.init(conf, rm1Mock, ApplicationClientProtocol.class); + } + + public Configuration getDNSConfig() { + Configuration dnsConf = new Configuration(); + dnsConf.set(YarnConfiguration.RESOLVE_RM_ADDRESS_KEY, MockDomainNameResolver.class.getName()); + dnsConf.set(YarnConfiguration.RM_HA_IDS, MULTI_A_RM_ID); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, MULTI_A_RM_ID), MULTI_A_RM_ADDRESS + + ":" + YarnConfiguration.DEFAULT_RM_PORT); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, MULTI_A_RM_ID), MULTI_A_RM_ADDRESS + + ":" + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_SCHEDULER_ADDRESS, MULTI_A_RM_ID), MULTI_A_RM_SCHEDULER_ADDRESS + + ":" + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + dnsConf.setBoolean(YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, true); + dnsConf.setBoolean(YarnConfiguration.RESOLVE_RM_ADDRESS_TO_FQDN, true); + return dnsConf; + } + + /* + * Override the MockDomainNameResolver address mapping in HAUtil + * Source of mapping is from HOST_LIST and IP_LIST except for the exludeIdx one + */ + private void overrideDNSMapping(int excludeIdx) throws UnknownHostException { + // Mapping of domain names and IP addresses + Map addressMap = new HashMap<>(); + // Mapping from IP addresses to fqdns + Map ptrMap = new HashMap<>(); + int idx = 0; + InetAddress[] addresses = new InetAddress[HOST_LIST.size() - 1]; + InetAddress[] schedulerAddresses = new InetAddress[1]; + addressMap.put(MULTI_A_RM_ADDRESS, addresses); + addressMap.put(MULTI_A_RM_SCHEDULER_ADDRESS, schedulerAddresses); + // for addresses + for (int i = 0; i < HOST_LIST.size(); i++) { + if (i == excludeIdx) { + continue; + } + InetAddress address = InetAddress.getByAddress(IP_LIST.get(i)); + String host = HOST_LIST.get(i); + addresses[idx++] = address; + ptrMap.put(address, host); + } + // for schedulerAddress + InetAddress schedulerAddress = InetAddress.getByAddress(new byte[] {10, 1, 1, 1}); + schedulerAddresses[0] = schedulerAddress; + ptrMap.put(schedulerAddress, SCHEDULER_HOST_ADDRESS); + ((MockDomainNameResolver)HAUtil.getDnr()).setAddressMap(addressMap); + ((MockDomainNameResolver)HAUtil.getDnr()).setPtrMap(ptrMap); + } + + @Test(expected = IllegalStateException.class) + public void testInitWithNoInstances() { + Configuration conf = new Configuration();; + RMProxy rm1Mock = mock(RMProxy.class); + doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); + ConfiguredRMFailoverProxyProvider proxyProviderWithNoInstances = new ConfiguredRMFailoverProxyProvider(); + proxyProviderWithNoInstances.init(conf, rm1Mock, ApplicationClientProtocol.class); + } + + @Test + public void testGetProxy() { + FailoverProxyProvider.ProxyInfo proxy = proxyProvider.getProxy(); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,rm4"); + RMProxy rm1Mock = mock(RMProxy.class); + doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); + ConfiguredRMFailoverProxyProvider proxyProviderWithNoInstances = new ConfiguredRMFailoverProxyProvider(); + proxyProviderWithNoInstances.init(conf, rm1Mock, ApplicationClientProtocol.class); + assertEquals(proxyProviderWithNoInstances.rmServiceIds[0], "rm1"); + FailoverProxyProvider.ProxyInfo current = proxyProviderWithNoInstances.getProxy(); + assertEquals(current.proxyInfo, "rm1"); + } + + @Test + public void testInitWithMultiARecordRM() throws UnknownHostException { + Configuration conf = getDNSConfig(); + RMProxy rm1Mock = mock(RMProxy.class); + doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); + ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = + new ConfiguredRMFailoverProxyProvider(); + HAUtil.setDnrByConfiguration(conf); + overrideDNSMapping(1); + proxyProviderWithmultiA.init(conf, rm1Mock, ApplicationClientProtocol.class); + assertEquals(HOST_LIST.size() - 1, proxyProviderWithmultiA.rmServiceIds.length); + for (String rmId : proxyProviderWithmultiA.rmServiceIds) { + assertTrue(rmId.startsWith("rm_resolved_")); + } + } + + @Test + public void testResolveDifferentMultiARecordRM() throws UnknownHostException { + Configuration conf = getDNSConfig(); + RMProxy rm1Mock = mock(RMProxy.class); + doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); + ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = + new ConfiguredRMFailoverProxyProvider(); + HAUtil.setDnrByConfiguration(conf); + overrideDNSMapping(1); + proxyProviderWithmultiA.init(conf, rm1Mock, ApplicationClientProtocol.class); + List rmAddresses = getRMAddresses(proxyProviderWithmultiA.conf, + proxyProviderWithmultiA.rmServiceIds, + YarnConfiguration.RM_ADDRESS); + List rmAdminAddresses = getRMAddresses(proxyProviderWithmultiA.conf, proxyProviderWithmultiA.rmServiceIds, + YarnConfiguration.RM_ADMIN_ADDRESS); + List rmSchedulerAddresses = getRMAddresses(proxyProviderWithmultiA.conf, proxyProviderWithmultiA.rmServiceIds, + YarnConfiguration.RM_SCHEDULER_ADDRESS); + assertEquals(HOST_LIST.size() - 1, rmAddresses.size()); + assertEquals(HOST_LIST.size() - 1, rmAdminAddresses.size()); + assertEquals(1, rmSchedulerAddresses.size()); + assertEquals(SCHEDULER_HOST_ADDRESS + ":8030", rmSchedulerAddresses.get(0)); + } + + @Test + public void testRefreshWithMultiARecordRM() throws UnknownHostException, InterruptedException { + Configuration conf = getDNSConfig(); + conf.setLong(YarnConfiguration.RM_ID_REFRESH_INTERVAL, REFRESH_TIME_INTERVAL); + RMProxy rm1Mock = mock(RMProxy.class); + doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); + ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = + new ConfiguredRMFailoverProxyProvider(); + HAUtil.setDnrByConfiguration(conf); + overrideDNSMapping(0); + proxyProviderWithmultiA.init(conf, rm1Mock, ApplicationClientProtocol.class); + InetSocketAddress oldActiveRMAddress = + conf.getSocketAddr(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, HAUtil.getRMHAId(conf)), + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + List oldRMAddresses = getRMAddresses(conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); + for (String address : oldRMAddresses) { + assertTrue(!address.equals(HOST_LIST.get(0))); + } + // refresh + overrideDNSMapping(2); + Thread.sleep(REFRESH_TIME_INTERVAL * 2); + InetSocketAddress newActiveRMAddress = + conf.getSocketAddr(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, HAUtil.getRMHAId(conf)), + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + List newRMAddresses = getRMAddresses(conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); + assertEquals(HOST_LIST.size() - 1, newRMAddresses.size()); + // active RM should remain same, even without failover + assertEquals(oldActiveRMAddress, newActiveRMAddress); + for (String address : newRMAddresses) { + assertTrue(!address.equals(HOST_LIST.get(2))); + } + } + + private List getRMAddresses(Configuration conf, String[] rmServiceIds, + String configKey) { + List addresses = new ArrayList<>(); + for (String rmId : rmServiceIds) { + String address = conf.get(HAUtil.addSuffix(configKey, rmId)); + if (address != null) { + addresses.add(address); + } + } + return addresses; + } +} From 609df5d69d4b91c3b9eef73d6de14e26b06a4539 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Wed, 7 Dec 2022 21:23:14 -0800 Subject: [PATCH 2/4] YARN-11391 Fix Jenkins reported issue --- .../org/apache/hadoop/yarn/conf/HAUtil.java | 5 +- .../ConfiguredRMFailoverProxyProvider.java | 1 - .../src/main/resources/yarn-default.xml | 34 ++++++++ ...TestConfiguredRMFailoverProxyProvider.java | 80 +++++++++---------- 4 files changed, 73 insertions(+), 47 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index a75c731ea43b8..9869d6f9e0f3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -267,7 +267,7 @@ public static Collection getRMHAIds(Configuration conf) { /** * Instead of returning RM_HA_IDS in current configurations, it - * would return the originally preset one in case of DNS resolving + * would return the originally preset one in case of DNS resolving. * @param conf Configuration. * @return RM Ids from original xml file */ @@ -343,7 +343,8 @@ public static Map getResolvedRMIdPairs( // this function would only return value which is corresponded to YarnConfiguration.RM_ADDRESS Map ret = null; for (List configKeys : addressesConfigKeysMap.values()) { - Map res = getResolvedIdPairs(conf, resolveNeeded, requireFQDN, getOriginalRMHAIds(conf), + Map res = getResolvedIdPairs( + conf, resolveNeeded, requireFQDN, getOriginalRMHAIds(conf), configKeys.get(0), YarnConfiguration.RM_HA_IDS, configKeys); if (configKeys.contains(YarnConfiguration.RM_ADDRESS)) { ret = res; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java index 60fb4e5b7028d..90bce9c006258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cc20a2e3d1571..56d61e779c6a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -677,6 +677,40 @@ org.apache.hadoop.net.DNSDomainNameResolver + + + Determines if the given resourcemanager address is a domain name which needs to + be resolved (using the resolver configured by yarn.resourcemanager.ha.resolver.impl). + This adds a transparency layer in the client so physical server address + can change without changing the client. + + yarn.resourcemanager.ha.resolve-needed + false + + + + + The amount of milliseconds between DNS address re-resolving. + By default, this parameter is set to -1 which disabled the DNS auto-refresh functionality. + Enabling the auto-refresh would help long running sessions (including node managers) able to + find the replaced resource managers (e.g. during node replacement/migration) without downtime. + + yarn.resourcemanager.ha.refresh-period-ms + -1 + + + + + Determines whether the resolved result is fully qualified domain name instead + of pure IP address(es). + In secure environment, this has to be enabled since Kerberos is using fqdn + in machine's principal therefore accessing servers by IP won't be recognized + by the KDC. + + yarn.resourcemanager.ha.resolver.useFQDN + true + + Enable automatic failover. By default, it is enabled only when HA is enabled diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java index 95b8a0e1de607..cd92261923201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java @@ -43,7 +43,6 @@ import static org.mockito.Mockito.mock; public class TestConfiguredRMFailoverProxyProvider { - private Configuration conf; private final static List HOST_LIST = Arrays.asList( "host01", "host02", @@ -62,28 +61,16 @@ public class TestConfiguredRMFailoverProxyProvider { private final static String MULTI_A_RM_SCHEDULER_ADDRESS = "rmscheduler.com"; private final static long REFRESH_TIME_INTERVAL = 1000; // unit ms - ConfiguredRMFailoverProxyProvider proxyProvider; - - @Before - public void testInit() { - conf = new Configuration(); - conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); - RMProxy rm1Mock = mock(RMProxy.class); - doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); - proxyProvider = new ConfiguredRMFailoverProxyProvider(); - proxyProvider.init(conf, rm1Mock, ApplicationClientProtocol.class); - } - public Configuration getDNSConfig() { Configuration dnsConf = new Configuration(); dnsConf.set(YarnConfiguration.RESOLVE_RM_ADDRESS_KEY, MockDomainNameResolver.class.getName()); dnsConf.set(YarnConfiguration.RM_HA_IDS, MULTI_A_RM_ID); - dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, MULTI_A_RM_ID), MULTI_A_RM_ADDRESS + - ":" + YarnConfiguration.DEFAULT_RM_PORT); - dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, MULTI_A_RM_ID), MULTI_A_RM_ADDRESS + - ":" + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); - dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_SCHEDULER_ADDRESS, MULTI_A_RM_ID), MULTI_A_RM_SCHEDULER_ADDRESS + - ":" + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, MULTI_A_RM_ID), + MULTI_A_RM_ADDRESS + ":" + YarnConfiguration.DEFAULT_RM_PORT); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, MULTI_A_RM_ID), + MULTI_A_RM_ADDRESS + ":" + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + dnsConf.set(HAUtil.addSuffix(YarnConfiguration.RM_SCHEDULER_ADDRESS, MULTI_A_RM_ID), + MULTI_A_RM_SCHEDULER_ADDRESS + ":" + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); dnsConf.setBoolean(YarnConfiguration.RESOLVE_RM_ADDRESS_NEEDED_KEY, true); dnsConf.setBoolean(YarnConfiguration.RESOLVE_RM_ADDRESS_TO_FQDN, true); return dnsConf; @@ -121,39 +108,44 @@ private void overrideDNSMapping(int excludeIdx) throws UnknownHostException { ((MockDomainNameResolver)HAUtil.getDnr()).setPtrMap(ptrMap); } - @Test(expected = IllegalStateException.class) - public void testInitWithNoInstances() { - Configuration conf = new Configuration();; + @SuppressWarnings("unchecked") // mock generics + private void initProxyProvider( + Configuration conf, + ConfiguredRMFailoverProxyProvider proxyProvider) { RMProxy rm1Mock = mock(RMProxy.class); doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); - ConfiguredRMFailoverProxyProvider proxyProviderWithNoInstances = new ConfiguredRMFailoverProxyProvider(); - proxyProviderWithNoInstances.init(conf, rm1Mock, ApplicationClientProtocol.class); + proxyProvider.init(conf, rm1Mock, ApplicationClientProtocol.class); + } + + @Test(expected = IllegalStateException.class) + public void testInitWithNoInstances() { + Configuration conf = new Configuration(); + ConfiguredRMFailoverProxyProvider proxyProviderWithNoInstances = + new ConfiguredRMFailoverProxyProvider(); + initProxyProvider(conf, proxyProviderWithNoInstances); } @Test public void testGetProxy() { - FailoverProxyProvider.ProxyInfo proxy = proxyProvider.getProxy(); Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,rm4"); - RMProxy rm1Mock = mock(RMProxy.class); - doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); - ConfiguredRMFailoverProxyProvider proxyProviderWithNoInstances = new ConfiguredRMFailoverProxyProvider(); - proxyProviderWithNoInstances.init(conf, rm1Mock, ApplicationClientProtocol.class); - assertEquals(proxyProviderWithNoInstances.rmServiceIds[0], "rm1"); - FailoverProxyProvider.ProxyInfo current = proxyProviderWithNoInstances.getProxy(); + ConfiguredRMFailoverProxyProvider proxyProvider = + new ConfiguredRMFailoverProxyProvider(); + initProxyProvider(conf, proxyProvider); + assertEquals(proxyProvider.rmServiceIds[0], "rm1"); + FailoverProxyProvider.ProxyInfo current = + proxyProvider.getProxy(); assertEquals(current.proxyInfo, "rm1"); } @Test public void testInitWithMultiARecordRM() throws UnknownHostException { Configuration conf = getDNSConfig(); - RMProxy rm1Mock = mock(RMProxy.class); - doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = new ConfiguredRMFailoverProxyProvider(); HAUtil.setDnrByConfiguration(conf); overrideDNSMapping(1); - proxyProviderWithmultiA.init(conf, rm1Mock, ApplicationClientProtocol.class); + initProxyProvider(conf, proxyProviderWithmultiA); assertEquals(HOST_LIST.size() - 1, proxyProviderWithmultiA.rmServiceIds.length); for (String rmId : proxyProviderWithmultiA.rmServiceIds) { assertTrue(rmId.startsWith("rm_resolved_")); @@ -163,19 +155,19 @@ public void testInitWithMultiARecordRM() throws UnknownHostException { @Test public void testResolveDifferentMultiARecordRM() throws UnknownHostException { Configuration conf = getDNSConfig(); - RMProxy rm1Mock = mock(RMProxy.class); - doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = new ConfiguredRMFailoverProxyProvider(); HAUtil.setDnrByConfiguration(conf); overrideDNSMapping(1); - proxyProviderWithmultiA.init(conf, rm1Mock, ApplicationClientProtocol.class); + initProxyProvider(conf, proxyProviderWithmultiA); List rmAddresses = getRMAddresses(proxyProviderWithmultiA.conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); - List rmAdminAddresses = getRMAddresses(proxyProviderWithmultiA.conf, proxyProviderWithmultiA.rmServiceIds, + List rmAdminAddresses = getRMAddresses(proxyProviderWithmultiA.conf, + proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADMIN_ADDRESS); - List rmSchedulerAddresses = getRMAddresses(proxyProviderWithmultiA.conf, proxyProviderWithmultiA.rmServiceIds, + List rmSchedulerAddresses = getRMAddresses(proxyProviderWithmultiA.conf, + proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_SCHEDULER_ADDRESS); assertEquals(HOST_LIST.size() - 1, rmAddresses.size()); assertEquals(HOST_LIST.size() - 1, rmAdminAddresses.size()); @@ -187,18 +179,17 @@ public void testResolveDifferentMultiARecordRM() throws UnknownHostException { public void testRefreshWithMultiARecordRM() throws UnknownHostException, InterruptedException { Configuration conf = getDNSConfig(); conf.setLong(YarnConfiguration.RM_ID_REFRESH_INTERVAL, REFRESH_TIME_INTERVAL); - RMProxy rm1Mock = mock(RMProxy.class); - doNothing().when(rm1Mock).checkAllowedProtocols(ApplicationClientProtocol.class); ConfiguredRMFailoverProxyProvider proxyProviderWithmultiA = new ConfiguredRMFailoverProxyProvider(); HAUtil.setDnrByConfiguration(conf); overrideDNSMapping(0); - proxyProviderWithmultiA.init(conf, rm1Mock, ApplicationClientProtocol.class); + initProxyProvider(conf, proxyProviderWithmultiA); InetSocketAddress oldActiveRMAddress = conf.getSocketAddr(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, HAUtil.getRMHAId(conf)), YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); - List oldRMAddresses = getRMAddresses(conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); + List oldRMAddresses = getRMAddresses( + conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); for (String address : oldRMAddresses) { assertTrue(!address.equals(HOST_LIST.get(0))); } @@ -209,7 +200,8 @@ public void testRefreshWithMultiARecordRM() throws UnknownHostException, Interru conf.getSocketAddr(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, HAUtil.getRMHAId(conf)), YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); - List newRMAddresses = getRMAddresses(conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); + List newRMAddresses = getRMAddresses( + conf, proxyProviderWithmultiA.rmServiceIds, YarnConfiguration.RM_ADDRESS); assertEquals(HOST_LIST.size() - 1, newRMAddresses.size()); // active RM should remain same, even without failover assertEquals(oldActiveRMAddress, newActiveRMAddress); From d8c801d66e457c3870b0e5b8896dee2b2bdd84c0 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Mon, 12 Dec 2022 18:36:58 -0800 Subject: [PATCH 3/4] YARN-11391 Improve logging format --- .../java/org/apache/hadoop/yarn/conf/HAUtil.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 9869d6f9e0f3c..bb819814e724c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -371,22 +371,19 @@ private static Map getResolvedIdPairs( } // If the address needs to be resolved, get all of the IP addresses // from this address and pass them into the map - LOG.info("Multi-A domain name " + addr + - " will be resolved by " + dnr.getClass().getName()); + LOG.info("Multi-A domain name {} will be resolved by {}" + addr, dnr.getClass().getName()); int port = address.getPort(); String[] resolvedHostNames; try { resolvedHostNames = dnr.getAllResolvedHostnameByDomainName( address.getHostName(), requireFQDN); } catch (UnknownHostException e) { - LOG.warn("Exception in resolving socket address " - + address.getHostName(), e); + LOG.warn("Exception in resolving socket address {}", address.getHostName(), e); continue; } - LOG.info("Resolved addresses for " + addr + - " is " + Arrays.toString(resolvedHostNames)); + LOG.info("Resolved addresses for {} is {}", addr, Arrays.toString(resolvedHostNames)); if (resolvedHostNames == null || resolvedHostNames.length < 1) { - LOG.warn("Cannot resolve from address " + address.getHostName()); + LOG.warn("Cannot resolve from address {}", address.getHostName()); } else { // If multiple address resolved, corresponding id needs to be created for (int i = 0; i < resolvedHostNames.length; i++) { @@ -444,7 +441,7 @@ synchronized static void overrideIdsInConfiguration( InetSocketAddress originalAddress = getInetSocketAddressFromString( conf.get(keyToRead)); if (originalAddress == null) { - LOG.warn("Missing configuration for key " + keyToRead); + LOG.warn("Missing configuration for key {}", keyToRead); continue; } int port = originalAddress.getPort(); @@ -469,7 +466,7 @@ public static InetSocketAddress getInetSocketAddressFromString(String addr) { try { address = NetUtils.createSocketAddr(addr); } catch (Exception e) { - LOG.warn("Exception in creating socket address " + addr, e); + LOG.warn("Exception in creating socket address {}", addr, e); return null; } return address; From 2b60df3746e48b242b618cd9fbc368b639dfb564 Mon Sep 17 00:00:00 2001 From: Xin Gao Date: Tue, 17 Jan 2023 10:07:33 -0800 Subject: [PATCH 4/4] Fix code style issues --- .../java/org/apache/hadoop/yarn/conf/YarnConfiguration.java | 2 +- .../yarn/client/TestConfiguredRMFailoverProxyProvider.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index afa968b2abbf1..dcfca715d5f45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -895,7 +895,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; - /** YARN DNS resolving related configs */ + /** YARN DNS resolving related configs. */ public static final String RESOLVE_RM_ADDRESS_NEEDED_KEY = RM_HA_PREFIX + "resolve-needed"; public static final boolean RESOLVE_RM_ADDRESS_NEEDED_DEFAULT = false; public static final String RESOLVE_RM_ADDRESS_KEY = RM_HA_PREFIX + "resolver.impl"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java index cd92261923201..ed307e4171335 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestConfiguredRMFailoverProxyProvider.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; -import org.junit.Before; import org.junit.Test; import org.apache.hadoop.conf.Configuration;